ソースを参照

work in progress

Caleb James DeLisle 9 年 前
コミット
8399b37d14

+ 6 - 1
admin/angel/Core.c

@@ -34,6 +34,7 @@
 #endif
 #include "net/InterfaceController_admin.h"
 #include "interface/addressable/PacketHeaderToUDPAddrIface.h"
+#include "interface/ASynchronizer.h"
 #include "interface/FramingIface.h"
 #include "memory/Allocator.h"
 #include "memory/MallocAllocator.h"
@@ -187,7 +188,11 @@ void Core_init(struct Allocator* alloc,
     Iface_plumb(&nc->tunAdapt->ipTunnelIf, &ipTunnel->tunInterface);
     Iface_plumb(&nc->upper->ipTunnelIf, &ipTunnel->nodeInterface);
 
-    Pathfinder_register(alloc, logger, eventBase, rand, admin, nc->ee);
+    // The link between the Pathfinder and the core needs to be asynchronous.
+    struct Pathfinder* pf = Pathfinder_register(alloc, logger, eventBase, rand, admin);
+    struct ASynchronizer* pfAsync = ASynchronizer_new(alloc, eventBase, logger);
+    Iface_plumb(&pfAsync->ifA, &pf->eventIf);
+    EventEmitter_regPathfinderIface(nc->ee, &pfAsync->ifB);
 
     // ------------------- Register RPC functions ----------------------- //
     InterfaceController_admin_register(nc->ifController, admin, alloc);

+ 1 - 1
contrib/c/sybilsim.c

@@ -190,7 +190,7 @@ static struct NodeContext* startNode(char* nodeName,
 
     struct AddrIfaceAdapter* adminClientIface = AddrIfaceAdapter_new(node->alloc);
     struct AddrIfaceAdapter* adminIface = AddrIfaceAdapter_new(node->alloc);
-    struct ASynchronizer* asyncer = ASynchronizer_new(node->alloc, ctx->base);
+    struct ASynchronizer* asyncer = ASynchronizer_new(node->alloc, ctx->base, ctx->logger);
     Iface_plumb(&asyncer->ifA, &adminClientIface->inputIf);
     Iface_plumb(&asyncer->ifB, &adminIface->inputIf);
 

+ 25 - 34
dht/Pathfinder.c

@@ -32,6 +32,7 @@
 #include "util/AddrTools.h"
 #include "util/events/Timeout.h"
 #include "wire/Error.h"
+#include "wire/PFChan.h"
 #include "util/CString.h"
 
 ///////////////////// [ Address ][ content... ]
@@ -41,18 +42,12 @@
 struct Pathfinder_pvt
 {
     struct Pathfinder pub;
-    struct Iface eventIf;
     struct DHTModule dhtModule;
     struct Allocator* alloc;
     struct Log* log;
     struct EventBase* base;
     struct Random* rand;
     struct Admin* admin;
-    struct EventEmitter* ee;
-
-    // hack
-    struct Node_Two asyncNode;
-    struct Timeout* asyncTo;
 
     #define Pathfinder_pvt_state_INITIALIZING 0
     #define Pathfinder_pvt_state_RUNNING 1
@@ -107,7 +102,7 @@ static int incomingFromDHT(struct DHTMessage* dmessage, void* vpf)
     }
     //Log_debug(pf->log, "send DHT request");
 
-    Iface_send(&pf->eventIf, msg);
+    Iface_send(&pf->pub.eventIf, msg);
     return 0;
 }
 
@@ -133,7 +128,7 @@ static Iface_DEFUN sendNode(struct Message* msg,
         ((struct PFChan_Node*) msg->bytes)->path_be = 0;
     }
     Message_push32(msg, PFChan_Pathfinder_NODE, NULL);
-    return Iface_next(&pf->eventIf, msg);
+    return Iface_next(&pf->pub.eventIf, msg);
 }
 
 static void onBestPathChange(void* vPathfinder, struct Node_Two* node)
@@ -255,13 +250,6 @@ static Iface_DEFUN switchErr(struct Message* msg, struct Pathfinder_pvt* pf)
     return NULL;
 }
 
-static void asyncRespond(void* vPathfinder)
-{
-    struct Pathfinder_pvt* pf = Identity_check((struct Pathfinder_pvt*) vPathfinder);
-    pf->asyncTo = NULL;
-    onBestPathChange(pf, &pf->asyncNode);
-}
-
 static Iface_DEFUN searchReq(struct Message* msg, struct Pathfinder_pvt* pf)
 {
     uint8_t addr[16];
@@ -272,9 +260,8 @@ static Iface_DEFUN searchReq(struct Message* msg, struct Pathfinder_pvt* pf)
     Log_debug(pf->log, "Search req [%s]", printedAddr);
 
     struct Node_Two* node = NodeStore_nodeForAddr(pf->nodeStore, addr);
-    if (node && !pf->asyncTo) {
-        Bits_memcpyConst(&pf->asyncNode, node, sizeof(struct Node_Two));
-        pf->asyncTo = Timeout_setTimeout(asyncRespond, pf, 0, pf->base, pf->alloc);
+    if (node) {
+        onBestPathChange(pf, node);
     } else {
         SearchRunner_search(addr, 20, 3, pf->searchRunner, pf->alloc);
     }
@@ -367,7 +354,7 @@ static Iface_DEFUN handlePing(struct Message* msg, struct Pathfinder_pvt* pf)
 {
     Log_debug(pf->log, "Received ping");
     Message_push32(msg, PFChan_Pathfinder_PONG, NULL);
-    return Iface_next(&pf->eventIf, msg);
+    return Iface_next(&pf->pub.eventIf, msg);
 }
 
 static Iface_DEFUN handlePong(struct Message* msg, struct Pathfinder_pvt* pf)
@@ -399,7 +386,7 @@ static Iface_DEFUN incomingMsg(struct Message* msg, struct Pathfinder_pvt* pf)
 
     if (dht.pleaseRespond) {
         // what a beautiful hack, see incomingFromDHT
-        return Iface_next(&pf->eventIf, msg);
+        return Iface_next(&pf->pub.eventIf, msg);
     } else if (!version && addr.protocolVersion) {
         return sendNode(msg, &addr, 0xfffffff0, pf);
     }
@@ -409,7 +396,7 @@ static Iface_DEFUN incomingMsg(struct Message* msg, struct Pathfinder_pvt* pf)
 
 static Iface_DEFUN incomingFromEventIf(struct Message* msg, struct Iface* eventIf)
 {
-    struct Pathfinder_pvt* pf = Identity_containerOf(eventIf, struct Pathfinder_pvt, eventIf);
+    struct Pathfinder_pvt* pf = Identity_containerOf(eventIf, struct Pathfinder_pvt, pub.eventIf);
     enum PFChan_Core ev = Message_pop32(msg, NULL);
     if (Pathfinder_pvt_state_INITIALIZING == pf->state) {
         Assert_true(ev == PFChan_Core_CONNECT);
@@ -437,38 +424,42 @@ static void sendEvent(struct Pathfinder_pvt* pf, enum PFChan_Pathfinder ev, void
     struct Message* msg = Message_new(0, 512+size, alloc);
     Message_push(msg, data, size, NULL);
     Message_push32(msg, ev, NULL);
-    Iface_send(&pf->eventIf, msg);
+    Iface_send(&pf->pub.eventIf, msg);
     Allocator_free(alloc);
 }
 
+static void init(void* vpf)
+{
+    struct Pathfinder_pvt* pf = Identity_check((struct Pathfinder_pvt*) vpf);
+    struct PFChan_Pathfinder_Connect conn = {
+        .superiority_be = Endian_hostToBigEndian32(1),
+        .version_be = Endian_hostToBigEndian32(Version_CURRENT_PROTOCOL)
+    };
+    CString_strncpy(conn.userAgent, "Cjdns internal pathfinder", 64);
+    sendEvent(pf, PFChan_Pathfinder_CONNECT, &conn, PFChan_Pathfinder_Connect_SIZE);
+}
+
 struct Pathfinder* Pathfinder_register(struct Allocator* alloc,
                                        struct Log* log,
                                        struct EventBase* base,
                                        struct Random* rand,
-                                       struct Admin* admin,
-                                       struct EventEmitter* ee)
+                                       struct Admin* admin)
 {
     struct Pathfinder_pvt* pf = Allocator_calloc(alloc, sizeof(struct Pathfinder_pvt), 1);
+    Identity_set(pf);
     pf->alloc = alloc;
     pf->log = log;
     pf->base = base;
     pf->rand = rand;
     pf->admin = admin;
-    pf->ee = ee;
-    Identity_set(pf);
 
-    pf->eventIf.send = incomingFromEventIf;
-    EventEmitter_regPathfinderIface(ee, &pf->eventIf);
+    pf->pub.eventIf.send = incomingFromEventIf;
 
     pf->dhtModule.context = pf;
     pf->dhtModule.handleOutgoing = incomingFromDHT;
 
-    struct PFChan_Pathfinder_Connect conn = {
-        .superiority_be = Endian_hostToBigEndian32(1),
-        .version_be = Endian_hostToBigEndian32(Version_CURRENT_PROTOCOL)
-    };
-    CString_strncpy(conn.userAgent, "Cjdns internal pathfinder", 64);
-    sendEvent(pf, PFChan_Pathfinder_CONNECT, &conn, PFChan_Pathfinder_Connect_SIZE);
+    // This needs to be done asynchronously so the pf can be plumbed to the core
+    Timeout_setTimeout(init, pf, 0, base, alloc);
 
     return &pf->pub;
 }

+ 2 - 4
dht/Pathfinder.h

@@ -20,20 +20,18 @@
 #include "util/events/EventBase.h"
 #include "crypto/random/Random.h"
 #include "admin/Admin.h"
-#include "net/EventEmitter.h"
 #include "util/Linker.h"
 Linker_require("dht/Pathfinder.c")
 
 struct Pathfinder
 {
-    int unused;
+    struct Iface eventIf;
 };
 
 struct Pathfinder* Pathfinder_register(struct Allocator* alloc,
                                        struct Log* logger,
                                        struct EventBase* base,
                                        struct Random* rand,
-                                       struct Admin* admin,
-                                       struct EventEmitter* ee);
+                                       struct Admin* admin);
 
 #endif

+ 7 - 1
interface/ASynchronizer.c

@@ -17,6 +17,8 @@
 #include "memory/Allocator.h"
 #include "util/Identity.h"
 #include "util/events/Timeout.h"
+#include "util/log/Log.h"
+#include "util/Hex.h"
 
 #define MAX_DRY_CYCLES 16
 
@@ -30,6 +32,7 @@ struct ASynchronizer_pvt
     struct ASynchronizer pub;
     struct Allocator* alloc;
     struct EventBase* base;
+    struct Log* log;
 
     struct Allocator* cycleAlloc;
     struct ArrayList_Messages* msgsToA;
@@ -104,12 +107,15 @@ static Iface_DEFUN fromB(struct Message* msg, struct Iface* ifB)
     return NULL;
 }
 
-struct ASynchronizer* ASynchronizer_new(struct Allocator* alloc, struct EventBase* base)
+struct ASynchronizer* ASynchronizer_new(struct Allocator* alloc,
+                                        struct EventBase* base,
+                                        struct Log* log)
 {
     struct ASynchronizer_pvt* ctx = Allocator_calloc(alloc, sizeof(struct ASynchronizer_pvt), 1);
     Identity_set(ctx);
     ctx->alloc = alloc;
     ctx->base = base;
+    ctx->log = log;
     ctx->pub.ifA.send = fromA;
     ctx->pub.ifB.send = fromB;
     return &ctx->pub;

+ 4 - 1
interface/ASynchronizer.h

@@ -17,6 +17,7 @@
 
 #include "interface/Iface.h"
 #include "util/events/EventBase.h"
+#include "util/log/Log.h"
 #include "memory/Allocator.h"
 #include "util/Linker.h"
 Linker_require("interface/ASynchronizer.c")
@@ -27,6 +28,8 @@ struct ASynchronizer
     struct Iface ifB;
 };
 
-struct ASynchronizer* ASynchronizer_new(struct Allocator* alloc, struct EventBase* base);
+struct ASynchronizer* ASynchronizer_new(struct Allocator* alloc,
+                                        struct EventBase* base,
+                                        struct Log* log);
 
 #endif

+ 2 - 0
net/SessionManager.c

@@ -367,6 +367,7 @@ static void checkTimedOutSessions(struct SessionManager_pvt* sm)
             // Session is not in idle state and requires a search
             // But we're only going to trigger one search per cycle.
             if (searchTriggered) { continue; }
+            debugSession0(sm->log, sess, "triggering search");
             triggerSearch(sm, sess->pub.caSession->herIp6);
             sess->pub.lastSearchTime = now;
             searchTriggered = true;
@@ -374,6 +375,7 @@ static void checkTimedOutSessions(struct SessionManager_pvt* sm)
 
         // Session is in idle state or doesn't need a search right now, check if it's timed out.
         if (now - sess->pub.timeOfLastIn < sm->pub.sessionTimeoutMilliseconds) {
+            debugSession0(sm->log, sess, "ended");
             sendSession(sess, sess->pub.sendSwitchLabel, 0xffffffff, PFChan_Core_SESSION_ENDED);
             Map_OfSessionsByIp6_remove(i, &sm->ifaceMap);
             Allocator_free(sess->alloc);

+ 13 - 4
test/Beacon_test.c

@@ -44,6 +44,7 @@ struct TwoNodes
     struct TestFramework* nodeA;
     struct Iface tunA;
     int messageFrom;
+    bool beaconsSent;
 
     struct Timeout* checkLinkageTimeout;
     struct Log* logger;
@@ -93,6 +94,18 @@ static void checkLinkage(void* vTwoNodes)
 {
     struct TwoNodes* ctx = Identity_check((struct TwoNodes*) vTwoNodes);
 
+    if (!ctx->beaconsSent) {
+        if (Pathfinder_getNodeStore(ctx->nodeA->pathfinder) &&
+            Pathfinder_getNodeStore(ctx->nodeB->pathfinder))
+        {
+            Log_debug(ctx->logger, "Linking A and B");
+            TestFramework_linkNodes(ctx->nodeB, ctx->nodeA, true);
+            ctx->beaconsSent = true;
+        }
+        return;
+    }
+
+
     if (Pathfinder_getNodeStore(ctx->nodeA->pathfinder)->nodeCount < 1) {
         notLinkedYet(ctx);
         return;
@@ -131,10 +144,6 @@ static void start(struct Allocator* alloc,
     //"ipv6": "fc1f:5b96:e1c5:625d:afde:2523:a7fa:383a",
 
 
-
-    Log_debug(a->logger, "Linking A and B");
-    TestFramework_linkNodes(b, a, true);
-
     struct TwoNodes* out = Allocator_calloc(alloc, sizeof(struct TwoNodes), 1);
     Identity_set(out);
     out->tunB.send = incomingTunB;

+ 5 - 1
test/TestFramework.c

@@ -28,6 +28,7 @@
 #include "net/SwitchPinger.h"
 #include "net/ControlHandler.h"
 #include "net/InterfaceController.h"
+#include "interface/ASynchronizer.h"
 #include "interface/Iface.h"
 #include "tunnel/IpTunnel.h"
 #include "net/EventEmitter.h"
@@ -118,7 +119,10 @@ struct TestFramework* TestFramework_setUp(char* privateKey,
 
     struct NetCore* nc = NetCore_new(privateKey, allocator, base, rand, logger);
 
-    struct Pathfinder* pf = Pathfinder_register(allocator, logger, base, rand, NULL, nc->ee);
+    struct Pathfinder* pf = Pathfinder_register(allocator, logger, base, rand, NULL);
+    struct ASynchronizer* pfAsync = ASynchronizer_new(allocator, base, logger);
+    Iface_plumb(&pfAsync->ifA, &pf->eventIf);
+    EventEmitter_regPathfinderIface(nc->ee, &pfAsync->ifB);
 
     struct TestFramework* tf = Allocator_calloc(allocator, sizeof(struct TestFramework), 1);
     Identity_set(tf);