/* vim: set expandtab ts=4 sw=4: */ /* * You may redistribute this program and/or modify it under the terms of * the GNU General Public License as published by the Free Software Foundation, * either version 3 of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "admin/Admin.h" #include "benc/String.h" #include "benc/Int.h" #include "benc/Dict.h" #include "benc/serialization/BencSerializer.h" #include "benc/serialization/standard/StandardBencSerializer.h" #include "interface/addressable/AddrInterface.h" #include "io/Reader.h" #include "io/ArrayReader.h" #include "io/ArrayWriter.h" #include "io/Writer.h" #include "memory/Allocator.h" #include "memory/BufferAllocator.h" #include "util/Assert.h" #include "util/Bits.h" #include "util/Hex.h" #include "util/log/Log.h" #include "util/events/Time.h" #include "util/events/Timeout.h" #include "util/Identity.h" #include "util/platform/Sockaddr.h" #define string_strstr #define string_strcmp #define string_strlen #include "util/platform/libc/string.h" #include #include #include static String* TYPE = String_CONST_SO("type"); static String* REQUIRED = String_CONST_SO("required"); static String* STRING = String_CONST_SO("String"); static String* INTEGER = String_CONST_SO("Int"); static String* DICT = String_CONST_SO("Dict"); static String* LIST = String_CONST_SO("List"); static String* TXID = String_CONST_SO("txid"); /** Number of milliseconds before a session times out and outgoing messages are failed. */ #define TIMEOUT_MILLISECONDS 30000 /** map values for tracking time of last message by source address */ struct MapValue { /** time when the last incoming message was received. */ uint64_t timeOfLastMessage; /** used to allocate the memory for the key (Sockaddr) and value (this). */ struct Allocator* allocator; }; //////// generate time-of-last-message-by-address map #define Map_USE_HASH #define Map_USE_COMPARATOR #define Map_NAME LastMessageTimeByAddr #define Map_KEY_TYPE struct Sockaddr* #define Map_VALUE_TYPE struct MapValue* #include "util/Map.h" static inline uint32_t Map_LastMessageTimeByAddr_hash(struct Sockaddr** key) { uint32_t* k = (uint32_t*) *key; return k[ ((*key)->addrLen / 4)-1 ]; } static inline int Map_LastMessageTimeByAddr_compare(struct Sockaddr** keyA, struct Sockaddr** keyB) { return Bits_memcmp(*keyA, *keyB, (*keyA)->addrLen); } /////// end map struct Function { String* name; Admin_FUNCTION(call); void* context; bool needsAuth; Dict* args; }; struct Admin { struct EventBase* eventBase; struct Function* functions; int functionCount; struct Allocator* allocator; String* password; struct Log* logger; struct AddrInterface* iface; struct Map_LastMessageTimeByAddr map; /** non-zero if we are currently in an admin request. */ int inRequest; /** non-zero if this session able to receive asynchronous messages. */ int asyncEnabled; /** Length of addresses of clients which communicate with admin. */ uint32_t addrLen; Identity }; static uint8_t sendMessage(struct Message* message, struct Sockaddr* dest, struct Admin* admin) { // stack overflow when used with admin logger. //Log_keys(admin->logger, "sending message to angel [%s]", message->bytes); Message_push(message, dest, dest->addrLen); return admin->iface->generic.sendMessage(message, &admin->iface->generic); } static int sendBenc(Dict* message, struct Sockaddr* dest, struct Admin* admin) { struct Allocator* allocator; BufferAllocator_STACK(allocator, 256); #define SEND_MESSAGE_PADDING 32 uint8_t buff[Admin_MAX_RESPONSE_SIZE + SEND_MESSAGE_PADDING]; struct Writer* w = ArrayWriter_new(buff + SEND_MESSAGE_PADDING, Admin_MAX_RESPONSE_SIZE, allocator); StandardBencSerializer_get()->serializeDictionary(w, message); struct Message m = { .bytes = buff + SEND_MESSAGE_PADDING, .length = w->bytesWritten, .padding = SEND_MESSAGE_PADDING }; struct Allocator* alloc = Allocator_child(admin->allocator); struct Message* msg = Message_clone(&m, alloc); int out = sendMessage(msg, dest, admin); Allocator_free(alloc); return out; } /** * If no incoming data has been sent by this address in TIMEOUT_MILLISECONDS * then Admin_sendMessage() should fail so that it doesn't endlessly send * udp packets into outer space after a logging client disconnects. */ static int checkAddress(struct Admin* admin, int index, uint64_t now) { uint64_t diff = now - admin->map.values[index]->timeOfLastMessage; // check for backwards time if (diff > TIMEOUT_MILLISECONDS && diff < ((uint64_t)INT64_MAX)) { Allocator_free(admin->map.values[index]->allocator); Map_LastMessageTimeByAddr_remove(index, &admin->map); return -1; } return 0; } static void clearExpiredAddresses(void* vAdmin) { struct Admin* admin = Identity_cast((struct Admin*) vAdmin); uint64_t now = Time_currentTimeMilliseconds(admin->eventBase); int count = 0; for (int i = admin->map.count - 1; i >= 0; i--) { if (checkAddress(admin, i, now)) { count++; } } Log_debug(admin->logger, "Cleared [%d] expired sessions", count); } /** * public function to send responses */ int Admin_sendMessage(Dict* message, String* txid, struct Admin* admin) { if (!admin) { return 0; } Identity_check(admin); Assert_true(txid && txid->len >= admin->addrLen); struct Sockaddr_storage addr; Bits_memcpy(&addr, txid->bytes, admin->addrLen); // if this is an async call, check if we've got any input from that client. // if the client is nresponsive then fail the call so logs don't get sent // out forever after a disconnection. if (!admin->inRequest) { struct Sockaddr* addrPtr = (struct Sockaddr*) &addr.addr; int index = Map_LastMessageTimeByAddr_indexForKey(&addrPtr, &admin->map); uint64_t now = Time_currentTimeMilliseconds(admin->eventBase); if (index < 0 || checkAddress(admin, index, now)) { return -1; } } struct Allocator* allocator; BufferAllocator_STACK(allocator, 256); // Bounce back the user-supplied txid. String userTxid = { .bytes = txid->bytes + admin->addrLen, .len = txid->len - admin->addrLen }; if (txid->len > admin->addrLen) { Dict_putString(message, TXID, &userTxid, allocator); } return sendBenc(message, &addr.addr, admin); } static inline bool authValid(Dict* message, struct Message* messageBytes, struct Admin* admin) { String* cookieStr = Dict_getString(message, String_CONST("cookie")); uint32_t cookie = (cookieStr != NULL) ? strtoll(cookieStr->bytes, NULL, 10) : 0; if (!cookie) { int64_t* cookieInt = Dict_getInt(message, String_CONST("cookie")); cookie = (cookieInt) ? *cookieInt : 0; } uint64_t nowSecs = Time_currentTimeSeconds(admin->eventBase); String* submittedHash = Dict_getString(message, String_CONST("hash")); if (cookie > nowSecs || cookie < nowSecs - 20 || !submittedHash || submittedHash->len != 64) { return false; } uint8_t* hashPtr = (uint8_t*) strstr((char*) messageBytes->bytes, submittedHash->bytes); if (!hashPtr || !admin->password) { return false; } uint8_t passAndCookie[64]; snprintf((char*) passAndCookie, 64, "%s%u", admin->password->bytes, cookie); uint8_t hash[32]; crypto_hash_sha256(hash, passAndCookie, strlen((char*) passAndCookie)); Hex_encode(hashPtr, 64, hash, 32); crypto_hash_sha256(hash, messageBytes->bytes, messageBytes->length); Hex_encode(hashPtr, 64, hash, 32); return Bits_memcmp(hashPtr, submittedHash->bytes, 64) == 0; } static bool checkArgs(Dict* args, struct Function* func, String* txid, struct Admin* admin) { struct Dict_Entry* entry = *func->args; String* error = NULL; uint8_t buffer[1024]; struct Allocator* alloc = BufferAllocator_new(buffer, 1024); while (entry != NULL) { String* key = (String*) entry->key; Assert_true(entry->val->type == Object_DICT); Dict* value = entry->val->as.dictionary; entry = entry->next; if (*Dict_getInt(value, String_CONST("required")) == 0) { continue; } String* type = Dict_getString(value, String_CONST("type")); if ((type == STRING && !Dict_getString(args, key)) || (type == DICT && !Dict_getDict(args, key)) || (type == INTEGER && !Dict_getInt(args, key)) || (type == LIST && !Dict_getList(args, key))) { error = String_printf(alloc, "Entry [%s] is required and must be of type [%s]", key->bytes, type->bytes); break; } } if (error) { Dict d = Dict_CONST(String_CONST("error"), String_OBJ(error), NULL); Admin_sendMessage(&d, txid, admin); } return !error; } static void asyncEnabled(Dict* args, void* vAdmin, String* txid) { struct Admin* admin = Identity_cast((struct Admin*) vAdmin); int64_t enabled = admin->asyncEnabled; Dict d = Dict_CONST(String_CONST("asyncEnabled"), Int_OBJ(enabled), NULL); Admin_sendMessage(&d, txid, admin); } #define ENTRIES_PER_PAGE 8 static void availableFunctions(Dict* args, void* vAdmin, String* txid) { struct Admin* admin = Identity_cast((struct Admin*) vAdmin); int64_t* page = Dict_getInt(args, String_CONST("page")); uint32_t i = (page) ? *page * ENTRIES_PER_PAGE : 0; struct Allocator* tempAlloc = Allocator_child(admin->allocator); Dict* d = Dict_new(tempAlloc); Dict* functions = Dict_new(tempAlloc); int count = 0; for (; i < (uint32_t)admin->functionCount && count++ < ENTRIES_PER_PAGE; i++) { Dict_putDict(functions, admin->functions[i].name, admin->functions[i].args, tempAlloc); } String* more = String_CONST("more"); if (count >= ENTRIES_PER_PAGE) { Dict_putInt(d, more, 1, tempAlloc); } Dict_putDict(d, String_CONST("availableFunctions"), functions, tempAlloc); Admin_sendMessage(d, txid, admin); Allocator_free(tempAlloc); return; } static void handleRequest(Dict* messageDict, struct Message* message, struct Sockaddr* src, struct Allocator* allocator, struct Admin* admin) { String* query = Dict_getString(messageDict, String_CONST("q")); if (!query) { Log_info(admin->logger, "Got a non-query from admin interface"); return; } // txid becomes the user supplied txid combined with the channel num. String* userTxid = Dict_getString(messageDict, TXID); uint32_t txidlen = ((userTxid) ? userTxid->len : 0) + src->addrLen; String* txid = String_newBinary(NULL, txidlen, allocator); Bits_memcpy(txid->bytes, src, src->addrLen); if (userTxid) { Bits_memcpy(txid->bytes + src->addrLen, userTxid->bytes, userTxid->len); } // If they're asking for a cookie then lets give them one. String* cookie = String_CONST("cookie"); if (String_equals(query, cookie)) { Log_debug(admin->logger, "Got a request for a cookie"); Dict* d = Dict_new(allocator); char bytes[32]; snprintf(bytes, 32, "%u", (uint32_t) Time_currentTimeSeconds(admin->eventBase)); String* theCookie = &(String) { .len = strlen(bytes), .bytes = bytes }; Dict_putString(d, cookie, theCookie, allocator); Admin_sendMessage(d, txid, admin); return; } // If this is a permitted query, make sure the cookie is right. String* auth = String_CONST("auth"); bool authed = false; if (String_equals(query, auth)) { if (!authValid(messageDict, message, admin)) { Dict* d = Dict_new(allocator); Dict_putString(d, String_CONST("error"), String_CONST("Auth failed."), allocator); Admin_sendMessage(d, txid, admin); return; } query = Dict_getString(messageDict, String_CONST("aq")); authed = true; } // Then sent a valid authed query, lets track their address so they can receive // asynchronous messages. int index = Map_LastMessageTimeByAddr_indexForKey(&src, &admin->map); uint64_t now = Time_currentTimeMilliseconds(admin->eventBase); admin->asyncEnabled = 1; if (index >= 0) { admin->map.values[index]->timeOfLastMessage = now; } else if (authed) { struct Allocator* entryAlloc = Allocator_child(admin->allocator); struct MapValue* mv = Allocator_calloc(entryAlloc, sizeof(struct MapValue), 1); mv->timeOfLastMessage = now; mv->allocator = entryAlloc; struct Sockaddr* storedAddr = Sockaddr_clone(src, entryAlloc); Map_LastMessageTimeByAddr_put(&storedAddr, &mv, &admin->map); } else { admin->asyncEnabled = 0; } Dict* args = Dict_getDict(messageDict, String_CONST("args")); bool noFunctionsCalled = true; for (int i = 0; i < admin->functionCount; i++) { if (String_equals(query, admin->functions[i].name) && (authed || !admin->functions[i].needsAuth)) { if (checkArgs(args, &admin->functions[i], txid, admin)) { admin->functions[i].call(args, admin->functions[i].context, txid); } noFunctionsCalled = false; } } if (noFunctionsCalled) { Dict d = Dict_CONST( String_CONST("error"), String_OBJ(String_CONST("No functions matched your request, " "try Admin_availableFunctions()")), NULL ); Admin_sendMessage(&d, txid, admin); } return; } static void handleMessage(struct Message* message, struct Sockaddr* src, struct Allocator* alloc, struct Admin* admin) { #ifdef Log_KEYS uint8_t lastChar = message->bytes[message->length - 1]; message->bytes[message->length - 1] = '\0'; Log_keys(admin->logger, "Got message from [%s] [%s]", Sockaddr_print(src, alloc), message->bytes); message->bytes[message->length - 1] = lastChar; #endif // handle non empty message data if (message->length > Admin_MAX_REQUEST_SIZE) { #define TOO_BIG "d5:error16:Request too big.e" #define TOO_BIG_STRLEN (sizeof(TOO_BIG) - 1) Bits_memcpyConst(message->bytes, TOO_BIG, TOO_BIG_STRLEN); message->length = TOO_BIG_STRLEN; sendMessage(message, src, admin); return; } struct Reader* reader = ArrayReader_new(message->bytes, message->length, alloc); Dict messageDict; if (StandardBencSerializer_get()->parseDictionary(reader, alloc, &messageDict)) { Log_warn(admin->logger, "Unparsable data from [%s] content: [%s]", Sockaddr_print(src, alloc), message->bytes); return; } int amount = reader->bytesRead; if (amount < message->length) { Log_warn(admin->logger, "Message from [%s] contained garbage after byte [%d] content: [%s]", Sockaddr_print(src, alloc), amount - 1, message->bytes); return; } handleRequest(&messageDict, message, src, alloc, admin); } static uint8_t receiveMessage(struct Message* message, struct Interface* iface) { struct Admin* admin = Identity_cast((struct Admin*) iface->receiverContext); Assert_true(message->length >= (int)admin->addrLen); struct Sockaddr_storage addrStore = { .addr = { .addrLen = 0 } }; Message_pop(message, &addrStore, admin->addrLen); struct Allocator* alloc = Allocator_child(admin->allocator); admin->inRequest = 1; handleMessage(message, &addrStore.addr, alloc, admin); admin->inRequest = 0; Allocator_free(alloc); return 0; } void Admin_registerFunctionWithArgCount(char* name, Admin_FUNCTION(callback), void* callbackContext, bool needsAuth, struct Admin_FunctionArg* arguments, int argCount, struct Admin* admin) { if (!admin) { return; } Identity_check(admin); String* str = String_new(name, admin->allocator); admin->functions = Allocator_realloc(admin->allocator, admin->functions, sizeof(struct Function) * (admin->functionCount + 1)); struct Function* fu = &admin->functions[admin->functionCount]; admin->functionCount++; fu->name = str; fu->call = callback; fu->context = callbackContext; fu->needsAuth = needsAuth; fu->args = Dict_new(admin->allocator); for (int i = 0; arguments && i < argCount; i++) { // "type" must be one of: [ "String", "Int", "Dict", "List" ] String* type = NULL; if (!strcmp(arguments[i].type, STRING->bytes)) { type = STRING; } else if (!strcmp(arguments[i].type, INTEGER->bytes)) { type = INTEGER; } else if (!strcmp(arguments[i].type, DICT->bytes)) { type = DICT; } else if (!strcmp(arguments[i].type, LIST->bytes)) { type = LIST; } else { abort(); } Dict* arg = Dict_new(admin->allocator); Dict_putString(arg, TYPE, type, admin->allocator); Dict_putInt(arg, REQUIRED, arguments[i].required, admin->allocator); String* name = String_new(arguments[i].name, admin->allocator); Dict_putDict(fu->args, name, arg, admin->allocator); } } struct Admin* Admin_new(struct AddrInterface* iface, struct Allocator* alloc, struct Log* logger, struct EventBase* eventBase, String* password) { struct Admin* admin = Allocator_clone(alloc, (&(struct Admin) { .iface = iface, .allocator = alloc, .logger = logger, .eventBase = eventBase, .addrLen = iface->addr->addrLen, .map = { .allocator = alloc } })); Identity_set(admin); admin->password = String_clone(password, alloc); Timeout_setInterval(clearExpiredAddresses, admin, TIMEOUT_MILLISECONDS * 3, eventBase, alloc); iface->generic.receiveMessage = receiveMessage; iface->generic.receiverContext = admin; Admin_registerFunction("Admin_asyncEnabled", asyncEnabled, admin, false, NULL, admin); Admin_registerFunction("Admin_availableFunctions", availableFunctions, admin, false, ((struct Admin_FunctionArg[]) { { .name = "page", .required = 0, .type = "Int" } }), admin); return admin; }