Просмотр исходного кода

more robust logging, although only about 100 messages can be logged at a time without drops...

Caleb James DeLisle 9 лет назад
Родитель
Сommit
c7eed6b146
5 измененных файлов с 58 добавлено и 17 удалено
  1. 4 8
      admin/Admin.c
  2. 44 4
      admin/AdminLog.c
  3. 7 0
      interface/InterfaceController.c
  4. 1 1
      memory/Allocator.c
  5. 2 4
      util/events/libuv/UDPAddrInterface.c

+ 4 - 8
admin/Admin.c

@@ -182,7 +182,6 @@ int Admin_sendMessage(Dict* message, String* txid, struct Admin* admin)
     // if this is an async call, check if we've got any input from that client.
     // if the client is nresponsive then fail the call so logs don't get sent
     // out forever after a disconnection.
-    struct Allocator* alloc;
     if (!admin->currentRequest) {
         struct Sockaddr* addrPtr = (struct Sockaddr*) &addr.addr;
         int index = Map_LastMessageTimeByAddr_indexForKey(&addrPtr, &admin->map);
@@ -190,11 +189,10 @@ int Admin_sendMessage(Dict* message, String* txid, struct Admin* admin)
         if (index < 0 || checkAddress(admin, index, now)) {
             return -1;
         }
-        alloc = Allocator_child(admin->allocator);
-    } else {
-        alloc = admin->currentRequest->alloc;
     }
 
+    struct Allocator* alloc = Allocator_child(admin->allocator);
+
     // Bounce back the user-supplied txid.
     String userTxid = {
         .bytes = txid->bytes + admin->addrLen,
@@ -206,9 +204,7 @@ int Admin_sendMessage(Dict* message, String* txid, struct Admin* admin)
 
     int ret = sendBenc(message, &addr.addr, alloc, admin);
 
-    if (!admin->currentRequest) {
-        Allocator_free(alloc);
-    }
+    Allocator_free(alloc);
 
     return ret;
 }
@@ -227,7 +223,7 @@ static inline bool authValid(Dict* message, struct Message* messageBytes, struct
         return false;
     }
 
-    uint8_t* hashPtr = (uint8_t*) CString_strstr((char*) messageBytes->bytes, submittedHash->bytes);
+    uint8_t* hashPtr = CString_strstr(messageBytes->bytes, submittedHash->bytes);
 
     if (!hashPtr || !admin->password) {
         return false;

+ 44 - 4
admin/AdminLog.c

@@ -27,6 +27,7 @@
 #include "util/Hex.h"
 #include "util/Identity.h"
 #include "util/events/Time.h"
+#include "util/events/Timeout.h"
 
 #define MAX_SUBSCRIPTIONS 64
 #define FILE_NAME_COUNT 32
@@ -45,6 +46,12 @@ struct Subscription
     /** True if file can be compared with pointer comparison instead of strcmp. */
     bool internalFile;
 
+    /**
+     * Dropped messages because they are being sent too fast for UDP interface to handle.
+     * Reset when the pipes unclog an a message is sent reporting the number of dropped messages.
+     */
+    int dropped;
+
     /** The transaction ID of the message which solicited this stream of logs. */
     String* txid;
 
@@ -59,11 +66,13 @@ struct AdminLog
 {
     struct Log pub;
     struct Subscription subscriptions[MAX_SUBSCRIPTIONS];
-    uint32_t subscriptionCount;
+    int subscriptionCount;
 
     /** non-zero if we are logging at this very moment (reentrent logging is not allowed!) */
     int logging;
 
+    struct Timeout* unpause;
+
     struct Admin* admin;
     struct Allocator* alloc;
     struct Random* rand;
@@ -147,6 +156,30 @@ static void removeSubscription(struct AdminLog* log, struct Subscription* sub)
                      sizeof(struct Subscription));
 }
 
+static void unpause(void* vAdminLog)
+{
+    struct AdminLog* log = Identity_check((struct AdminLog*) vAdminLog);
+    // dirty reentrence.
+    Assert_true(!log->logging);
+    bool noneDropped = true;
+    for (int i = log->subscriptionCount - 1; i >= 0; i--) {
+        int dropped = log->subscriptions[i].dropped;
+        if (!dropped) { continue; }
+        noneDropped = false;
+        log->subscriptions[i].dropped = 0;
+        Log_warn((struct Log*) log,
+                 "UDPInterface cannot handle the logging, [%d] messages dropped", dropped);
+        if (log->subscriptions[i].dropped) {
+            // oh well, we'll try again later.
+            log->subscriptions[i].dropped += dropped;
+        }
+    }
+    if (noneDropped && false) {
+        Timeout_clearTimeout(log->unpause);
+        log->unpause = NULL;
+    }
+}
+
 static void doLog(struct Log* genericLog,
                   enum Log_Level logLevel,
                   const char* fullFilePath,
@@ -161,8 +194,12 @@ static void doLog(struct Log* genericLog,
     if (log->logging) { return; }
     log->logging++;
 
-    for (int i = 0; i < (int)log->subscriptionCount; i++) {
+    for (int i = log->subscriptionCount - 1; i >= 0; i--) {
         if (!isMatch(&log->subscriptions[i], log, logLevel, fullFilePath, line)) { continue; }
+        if (log->subscriptions[i].dropped) {
+            log->subscriptions[i].dropped++;
+            continue;
+        }
         if (!message) {
             logLineAlloc = Allocator_child(log->alloc);
             message = makeLogMessage(&log->subscriptions[i],
@@ -175,7 +212,10 @@ static void doLog(struct Log* genericLog,
                                      logLineAlloc);
         }
         if (Admin_sendMessage(message, log->subscriptions[i].txid, log->admin)) {
-            removeSubscription(log, &log->subscriptions[i]);
+            log->subscriptions[i].dropped++;
+            if (!log->unpause) {
+                log->unpause = Timeout_setInterval(unpause, log, 10, log->base, log->alloc);
+            }
         }
     }
     if (logLineAlloc) {
@@ -283,8 +323,8 @@ static void subscriptions(Dict* args, void* vcontext, String* txid, struct Alloc
             Dict_putString(entry, STR_FILE, String_new(sub->file, alloc), alloc);
         }
         Dict_putInt(entry, LINE, sub->lineNum, alloc);
+        Dict_putInt(entry, String_CONST("dropped"), sub->dropped, alloc);
         Dict_putInt(entry, String_CONST("internalFile"), sub->internalFile, alloc);
-        Dict_putString(entry, String_CONST("_txid"), sub->txid, alloc);
         Dict_putString(entry, String_CONST("streamId"), sub->streamId, alloc);
         List_addDict(entries, entry, alloc);
     }

+ 7 - 0
interface/InterfaceController.c

@@ -390,6 +390,13 @@ static uint8_t sendFromSwitch(struct Message* msg, struct Interface* switchIf)
         ret = Interface_sendMessage(ep->cryptoAuthIf, msg);
     }
 
+    // TODO(cjd): this is not quite right
+    // We don't always trust the UDP interface to be accurate
+    // short spurious failures and packet-backup should not cause us to treat a link as dead
+    if (ret == Error_UNDELIVERABLE) {
+        ret = 0;
+    }
+
     // If this node is unresponsive then return an error.
     if (ret || now - ep->timeOfLastMessage > ic->unresponsiveAfterMilliseconds) {
         return ret ? ret : Error_UNDELIVERABLE;

+ 1 - 1
memory/Allocator.c

@@ -92,7 +92,7 @@ static void failure(struct Allocator_pvt* context,
                     const char* fileName,
                     int lineNum)
 {
-    Allocator_snapshot(&context->pub, 0);
+    Allocator_snapshot(&context->pub, 1);
     Assert_failure("%s:%d Fatal error: [%s]", fileName, lineNum, message);
 }
 

+ 2 - 4
util/events/libuv/UDPAddrInterface.c

@@ -81,8 +81,7 @@ static uint8_t sendMessage(struct Message* m, struct Interface* iface)
 
     if (context->queueLen > UDPAddrInterface_MAX_QUEUE) {
         Log_warn(context->logger, "DROP Maximum queue length reached");
-        return Error_NONE;
-        //return Error_UNDELIVERABLE;
+        return Error_UNDELIVERABLE;
     }
 
     // This allocator will hold the message allocator in existance after it is freed.
@@ -119,8 +118,7 @@ static uint8_t sendMessage(struct Message* m, struct Interface* iface)
         Log_info(context->logger, "DROP Failed writing to UDPAddrInterface [%s]",
                  uv_strerror(ret));
         Allocator_free(req->alloc);
-        return Error_NONE;
-        //return Error_UNDELIVERABLE;
+        return Error_UNDELIVERABLE;
     }
     context->queueLen += m->length;