/* vim: set expandtab ts=4 sw=4: */
/*
* You may redistribute this program and/or modify it under the terms of
* the GNU General Public License as published by the Free Software Foundation,
* either version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
#include // for String_vprintf()
#include "admin/Admin.h"
#include "admin/AdminLog.h"
#include "benc/Dict.h"
#include "benc/List.h"
#include "benc/String.h"
#include "crypto/random/Random.h"
#include "io/Writer.h"
#include "util/log/Log.h"
#include "util/log/Log_impl.h"
#include "util/Hex.h"
#include "util/Identity.h"
#include "util/events/Time.h"
#include "util/events/Timeout.h"
#define MAX_SUBSCRIPTIONS 64
#define FILE_NAME_COUNT 32
struct Subscription
{
/** The log level to match against, all higher levels will also be matched. */
enum Log_Level logLevel;
/** The line number within the file or 0 to match all lines. */
int lineNum;
/** The name of the file to match against or null to match any file. */
const char* file;
/** True if file can be compared with pointer comparison instead of strcmp. */
bool internalFile;
/**
* Dropped messages because they are being sent too fast for UDP interface to handle.
* Reset when the pipes unclog an a message is sent reporting the number of dropped messages.
*/
int dropped;
/** The transaction ID of the message which solicited this stream of logs. */
String* txid;
/** A hopefully unique (random) number identifying this stream. */
String* streamId;
/** An allocator which will live during the lifecycle of the Subscription */
struct Allocator* alloc;
};
struct AdminLog
{
struct Log pub;
struct Subscription subscriptions[MAX_SUBSCRIPTIONS];
int subscriptionCount;
/** non-zero if we are logging at this very moment (reentrent logging is not allowed!) */
int logging;
struct Timeout* unpause;
struct Admin* admin;
struct Allocator* alloc;
struct Random* rand;
struct EventBase* base;
Identity
};
static inline bool isMatch(struct Subscription* subscription,
struct AdminLog* logger,
enum Log_Level logLevel,
const char* file,
int line)
{
if (subscription->file) {
if (subscription->file == file) {
// fall through
} else if (!subscription->internalFile && !CString_strcmp(file, subscription->file)) {
// It's the same name but so we'll swap the name for the internal name and then
// it can be compared quickly with a pointer comparison.
subscription->file = file;
subscription->internalFile = true;
} else {
return false;
}
}
if (logLevel < subscription->logLevel) {
return false;
}
if (subscription->lineNum && line != subscription->lineNum) {
return false;
}
return true;
}
static String* STREAM_ID = String_CONST_SO("streamId");
static String* TIME = String_CONST_SO("time");
static String* LEVEL = String_CONST_SO("level");
static String* STR_FILE = String_CONST_SO("file");
static String* LINE = String_CONST_SO("line");
static String* MESSAGE = String_CONST_SO("message");
static Dict* makeLogMessage(struct Subscription* subscription,
struct AdminLog* logger,
enum Log_Level logLevel,
const char* file,
uint32_t line,
String* message,
struct Allocator* alloc)
{
int64_t now = (int64_t) Time_currentTimeSeconds();
Dict* out = Dict_new(alloc);
Dict_putString(out, STREAM_ID, subscription->streamId, alloc);
Dict_putInt(out, TIME, now, alloc);
Dict_putString(out, LEVEL, String_new(Log_nameForLevel(logLevel), alloc), alloc);
Dict_putString(out, STR_FILE, String_new(file, alloc), alloc);
Dict_putInt(out, LINE, line, alloc);
Dict_putString(out, MESSAGE, message, alloc);
return out;
}
static void removeSubscription(struct AdminLog* log, struct Subscription* sub)
{
Allocator_free(sub->alloc);
log->subscriptionCount--;
if (log->subscriptionCount == 0 || sub == &log->subscriptions[log->subscriptionCount]) {
return;
}
Bits_memcpy(sub,
&log->subscriptions[log->subscriptionCount],
sizeof(struct Subscription));
}
static void unpause(void* vAdminLog)
{
struct AdminLog* log = Identity_check((struct AdminLog*) vAdminLog);
// dirty reentrence.
Assert_true(!log->logging);
bool noneDropped = true;
for (int i = log->subscriptionCount - 1; i >= 0; i--) {
int dropped = log->subscriptions[i].dropped;
if (!dropped) { continue; }
noneDropped = false;
log->subscriptions[i].dropped = 0;
Log_warn((struct Log*) log,
"UDPInterface cannot handle the logging, [%d] messages dropped", dropped);
if (log->subscriptions[i].dropped) {
// oh well, we'll try again later.
log->subscriptions[i].dropped += dropped;
}
}
if (noneDropped && false) {
Timeout_clearTimeout(log->unpause);
log->unpause = NULL;
}
}
static void doLog(struct Log* genericLog,
enum Log_Level logLevel,
const char* fullFilePath,
int line,
const char* format,
va_list args)
{
struct AdminLog* log = Identity_check((struct AdminLog*) genericLog);
String* message = NULL;
struct Allocator* logLineAlloc = NULL;
if (log->logging) { return; }
log->logging++;
for (int i = log->subscriptionCount - 1; i >= 0; i--) {
if (!isMatch(&log->subscriptions[i], log, logLevel, fullFilePath, line)) { continue; }
if (log->subscriptions[i].dropped) {
log->subscriptions[i].dropped++;
continue;
}
if (!message) {
logLineAlloc = Allocator_child(log->alloc);
message = String_vprintf(logLineAlloc, format, args);
// Strip all of the annoying \n marks in the log entries.
if (message->len > 0 && message->bytes[message->len - 1] == '\n') {
message->len--;
}
}
Dict* d = makeLogMessage(&log->subscriptions[i],
log,
logLevel,
fullFilePath,
line,
message,
logLineAlloc);
int ret = Admin_sendMessage(d, log->subscriptions[i].txid, log->admin);
if (ret == Admin_sendMessage_CHANNEL_CLOSED) {
removeSubscription(log, &log->subscriptions[i]);
} else if (ret) {
log->subscriptions[i].dropped++;
if (!log->unpause) {
log->unpause = Timeout_setInterval(unpause, log, 10, log->base, log->alloc);
}
}
}
if (logLineAlloc) {
Allocator_free(logLineAlloc);
}
Assert_true(!--log->logging);
}
static void subscribe(Dict* args, void* vcontext, String* txid, struct Allocator* requestAlloc)
{
struct AdminLog* log = Identity_check((struct AdminLog*) vcontext);
String* levelName = Dict_getStringC(args, "level");
enum Log_Level level = (levelName) ? Log_levelForName(levelName->bytes) : Log_Level_DEBUG;
int64_t* lineNumPtr = Dict_getIntC(args, "line");
String* fileStr = Dict_getStringC(args, "file");
if (fileStr && !fileStr->len) { fileStr = NULL; }
char* error = "2+2=5";
if (level == Log_Level_INVALID) {
level = Log_Level_KEYS;
}
if (lineNumPtr && *lineNumPtr < 0) {
error = "Invalid line number, must be positive or 0 to signify any line is acceptable.";
} else if (log->subscriptionCount >= MAX_SUBSCRIPTIONS) {
error = "Max subscription count reached.";
} else {
struct Subscription* sub = &log->subscriptions[log->subscriptionCount];
Bits_memset(sub, 0, sizeof(struct Subscription));
sub->logLevel = level;
sub->alloc = Allocator_child(log->alloc);
String* fileStrCpy = String_clone(fileStr, sub->alloc);
sub->file = (fileStrCpy) ? fileStrCpy->bytes : NULL;
sub->lineNum = (lineNumPtr) ? *lineNumPtr : 0;
sub->txid = String_clone(txid, sub->alloc);
uint8_t streamId[8];
Random_bytes(log->rand, streamId, 8);
uint8_t streamIdHex[20];
Hex_encode(streamIdHex, 20, streamId, 8);
sub->streamId = String_new(streamIdHex, sub->alloc);
Dict response = Dict_CONST(
String_CONST("error"), String_OBJ(String_CONST("none")), Dict_CONST(
String_CONST("streamId"), String_OBJ(sub->streamId), NULL
));
Admin_sendMessage(&response, txid, log->admin);
log->subscriptionCount++;
return;
}
Dict response = Dict_CONST(
String_CONST("error"), String_OBJ(String_CONST(error)), NULL
);
Admin_sendMessage(&response, txid, log->admin);
}
static void unsubscribe(Dict* args, void* vcontext, String* txid, struct Allocator* requestAlloc)
{
struct AdminLog* log = Identity_check((struct AdminLog*) vcontext);
String* streamIdHex = Dict_getStringC(args, "streamId");
uint8_t streamId[8];
char* error = NULL;
if (streamIdHex->len != 16 || Hex_decode(streamId, 8, (uint8_t*)streamIdHex->bytes, 16) != 8) {
error = "Invalid streamId.";
} else {
error = "No such subscription.";
for (int i = 0; i < (int)log->subscriptionCount; i++) {
if (String_equals(streamIdHex, log->subscriptions[i].streamId)) {
removeSubscription(log, &log->subscriptions[i]);
error = "none";
break;
}
}
}
Dict response = Dict_CONST(
String_CONST("error"), String_OBJ(String_CONST(error)), NULL
);
Admin_sendMessage(&response, txid, log->admin);
}
static void logMany(Dict* args, void* vcontext, String* txid, struct Allocator* alloc)
{
struct AdminLog* log = Identity_check((struct AdminLog*) vcontext);
int64_t* countPtr = Dict_getIntC(args, "count");
uint32_t count = *countPtr;
for (uint32_t i = 0; i < count; i++) {
Log_debug((struct Log*)log, "This is message number [%d] of total [%d]", i, count);
}
Dict response = Dict_CONST(
String_CONST("error"), String_OBJ(String_CONST("none")), NULL
);
Admin_sendMessage(&response, txid, log->admin);
}
static void subscriptions(Dict* args, void* vcontext, String* txid, struct Allocator* alloc)
{
struct AdminLog* log = Identity_check((struct AdminLog*) vcontext);
Dict* msg = Dict_new(alloc);
List* entries = List_new(alloc);
Dict_putListC(msg, "entries", entries, alloc);
for (int i = 0; i < (int)log->subscriptionCount; i++) {
Dict* entry = Dict_new(alloc);
struct Subscription* sub = &log->subscriptions[i];
Dict_putString(entry, LEVEL, String_new(Log_nameForLevel(sub->logLevel), alloc), alloc);
if (sub->file) {
Dict_putString(entry, STR_FILE, String_new(sub->file, alloc), alloc);
}
Dict_putInt(entry, LINE, sub->lineNum, alloc);
Dict_putIntC(entry, "dropped", sub->dropped, alloc);
Dict_putIntC(entry, "internalFile", sub->internalFile, alloc);
Dict_putStringC(entry, "streamId", sub->streamId, alloc);
List_addDict(entries, entry, alloc);
}
Admin_sendMessage(msg, txid, log->admin);
}
struct Log* AdminLog_registerNew(struct Admin* admin,
struct Allocator* alloc,
struct Random* rand,
struct EventBase* base)
{
struct AdminLog* log = Allocator_clone(alloc, (&(struct AdminLog) {
.pub = {
.print = doLog
},
.admin = admin,
.alloc = alloc,
.rand = rand,
.base = base
}));
Identity_set(log);
Admin_registerFunction("AdminLog_subscribe", subscribe, log, true,
((struct Admin_FunctionArg[]) {
{ .name = "level", .required = 0, .type = "String" },
{ .name = "line", .required = 0, .type = "Int" },
{ .name = "file", .required = 0, .type = "String" }
}), admin);
Admin_registerFunction("AdminLog_unsubscribe", unsubscribe, log, true,
((struct Admin_FunctionArg[]) {
{ .name = "streamId", .required = 1, .type = "String" }
}), admin);
Admin_registerFunction("AdminLog_subscriptions", subscriptions, log, true, NULL, admin);
Admin_registerFunction("AdminLog_logMany", logMany, log, true,
((struct Admin_FunctionArg[]) {
{ .name = "count", .required = 1, .type = "Int" }
}), admin);
return &log->pub;
}