Browse Source

commit work prior to sed-rename

Caleb James DeLisle 9 years ago
parent
commit
46505f11bd

+ 1 - 1
admin/angel/Core.c

@@ -369,7 +369,7 @@ void Core_init(struct Allocator* alloc,
     }
 
     // EventEmitter
-    struct EventEmitter* eventEmitter = EventEmitter_new(alloc);
+    struct EventEmitter* eventEmitter = EventEmitter_new(alloc, logger);
 
     // CryptoAuth
     struct Address* addr = Allocator_calloc(alloc, sizeof(struct Address), 1);

+ 5 - 1
interface/Interface.h

@@ -87,6 +87,9 @@ static inline uint8_t Interface_sendMessage(struct Interface* iface, struct Mess
 
 
 
+typedef int Interface_Ret;
+
+#define Interface_RET 0
 
 struct Interface_Two;
 
@@ -94,7 +97,8 @@ struct Interface_Two;
  * @param thisInterface the interface which contains the sendMessage function pointer.
  * @param message the message
  */
-typedef int (* Interface_Callback2)(struct Interface_Two* thisInterface, struct Message* message);
+typedef Interface_RetVal (* Interface_Callback2)(struct Interface_Two* thisInterface,
+                                                 struct Message* message);
 
 struct Interface_Two
 {

+ 36 - 28
interface/InterfaceController.c

@@ -155,16 +155,15 @@ struct InterfaceController_pvt
     /** Switch for adding nodes when they are discovered. */
     struct SwitchCore* const switchCore;
 
-    struct Router* const router;
-
     struct Random* const rand;
 
-    struct RumorMill* const rumorMill;
-
     struct Log* const logger;
 
     struct EventBase* const eventBase;
 
+    /** For communicating with the Pathfinder. */
+    struct Interface_Two eventEmitterIf;
+
     /** After this number of milliseconds, a neoghbor will be regarded as unresponsive. */
     uint32_t unresponsiveAfterMilliseconds;
 
@@ -196,11 +195,21 @@ struct InterfaceController_pvt
     Identity
 };
 
-//---------------//
-
-static inline struct InterfaceController_pvt* ifcontrollerForPeer(struct Peer* ep)
+static void sendEvent(struct InterfaceController_pvt* ic, enum Event_Core ev, struct Peer* p)
 {
-    return Identity_check(ep->ici->ic);
+        /**
+     * Emitted when a peer connects (becomes state ESTABLISHED) or
+     * emitted for every peer if Event_Pathfinder_PEERS is sent.
+     * (emitted by: InterfaceController.c)
+     */
+    Event_Core_PEER,
+
+    /**
+     * Emitted when a peer disconnects (or becomes state UNRESPONSIVE)
+     * (emitted by: InterfaceController.c)
+     */
+    Event_Core_PEER_GONE,
+
 }
 
 static void onPingResponse(struct SwitchPinger_Response* resp, void* onResponseContext)
@@ -209,7 +218,7 @@ static void onPingResponse(struct SwitchPinger_Response* resp, void* onResponseC
         return;
     }
     struct Peer* ep = Identity_check((struct Peer*) onResponseContext);
-    struct InterfaceController_pvt* ic = ifcontrollerForPeer(ep);
+    struct InterfaceController_pvt* ic = Identity_check(ep->ici->ic);
 
     ep->addr.protocolVersion = resp->version;
 
@@ -260,7 +269,7 @@ static void onPingResponse(struct SwitchPinger_Response* resp, void* onResponseC
  */
 static void sendPing(struct Peer* ep)
 {
-    struct InterfaceController_pvt* ic = ifcontrollerForPeer(ep);
+    struct InterfaceController_pvt* ic = Identity_check(ep->ici->ic);
 
     ep->pingCount++;
 
@@ -306,16 +315,6 @@ static void iciPing(struct Iface* ici, struct InterfaceController_pvt* ic)
                 // because it causes the RumorMill to be filled with this node over and over.
                 continue;
             }
-
-            struct Node_Link* link = Router_linkForPath(ic->router, ep->addr.path);
-            // It exists, it's parent is the self-node, and it's label is equal to the switchLabel.
-            if (link
-                && Node_getBestParent(link->child)
-                && Node_getBestParent(link->child)->parent->address.path == 1
-                && Node_getBestParent(link->child)->cannonicalLabel == ep->addr.path)
-            {
-                continue;
-            }
         }
 
         #ifdef Log_DEBUG
@@ -398,7 +397,7 @@ static void moveEndpointIfNeeded(struct Peer* ep)
 static uint8_t receivedAfterCryptoAuth(struct Message* msg, struct Interface* cryptoAuthIf)
 {
     struct Peer* ep = Identity_check((struct Peer*) cryptoAuthIf->receiverContext);
-    struct InterfaceController_pvt* ic = ifcontrollerForPeer(ep);
+    struct InterfaceController_pvt* ic = Identity_check(ep->ici->ic);
 
     // nonce added by the CryptoAuth session.
     Message_pop(msg, NULL, 4, NULL);
@@ -459,7 +458,7 @@ static uint8_t sendFromSwitch(struct Message* msg, struct Interface* switchIf)
 
     ep->bytesOut += msg->length;
 
-    struct InterfaceController_pvt* ic = ifcontrollerForPeer(ep);
+    struct InterfaceController_pvt* ic = Identity_check(ep->ici->ic);
     uint8_t ret;
     uint64_t now = Time_currentTimeMilliseconds(ic->eventBase);
     if (now - ep->timeOfLastMessage > ic->unresponsiveAfterMilliseconds) {
@@ -498,7 +497,7 @@ static int closeInterface(struct Allocator_OnFreeJob* job)
 {
     struct Peer* toClose = Identity_check((struct Peer*) job->userData);
 
-    struct InterfaceController_pvt* ic = ifcontrollerForPeer(toClose);
+    struct InterfaceController_pvt* ic = Identity_check(ep->ici->ic);
 
     // flush the peer from the table...
     Router_disconnectedPeer(ic->router, toClose->addr.path);
@@ -1010,25 +1009,31 @@ int InterfaceController_disconnectPeer(struct InterfaceController* ifController,
     return InterfaceController_disconnectPeer_NOTFOUND;
 }
 
+static int incomingFromEventEmitterIf(struct Interface_Two* eventEmitterIf, struct Message* msg)
+{
+    struct InterfaceController_pvt* ic =
+         Identity_containerOf(eventEmitterIf, struct InterfaceController_pvt, eventEmitterIf);
+    Assert_true(Message_pop32(msg, NULL) == Event_Pathfinder_PEERS);
+    
+}
+
 struct InterfaceController* InterfaceController_new(struct CryptoAuth* ca,
                                                     struct SwitchCore* switchCore,
-                                                    struct Router* router,
-                                                    struct RumorMill* rumorMill,
                                                     struct Log* logger,
                                                     struct EventBase* eventBase,
                                                     struct SwitchPinger* switchPinger,
                                                     struct Random* rand,
-                                                    struct Allocator* allocator)
+                                                    struct Allocator* allocator,
+                                                    struct EventEmitter* ee)
 {
     struct InterfaceController_pvt* out =
         Allocator_malloc(allocator, sizeof(struct InterfaceController_pvt));
     Bits_memcpyConst(out, (&(struct InterfaceController_pvt) {
         .allocator = allocator,
         .ca = ca,
+        .ee = ee,
         .rand = rand,
         .switchCore = switchCore,
-        .router = router,
-        .rumorMill = rumorMill,
         .logger = logger,
         .eventBase = eventBase,
         .switchPinger = switchPinger,
@@ -1051,6 +1056,9 @@ struct InterfaceController* InterfaceController_new(struct CryptoAuth* ca,
 
     out->icis = ArrayList_OfIfaces_new(allocator);
 
+    out->eventEmitterIf.send = incomingFromEventEmitterIf;
+    EventEmitter_regCore(ee, &out->eventEmitterIf, Event_Pathfinder_PEERS);
+
     // Add the beaconing password.
     Random_bytes(rand, out->beacon.password, Headers_Beacon_PASSWORD_LEN);
     String strPass = { .bytes=(char*)out->beacon.password, .len=Headers_Beacon_PASSWORD_LEN };

+ 22 - 37
net/ControlHandler.c

@@ -27,13 +27,14 @@ struct ControlHandler_pvt
     struct Allocator* alloc;
     struct Router* router;
     struct Address* myAddr;
+    struct Interface_Two eventIface;
     Identity
 };
 
 /**
  * Expects [ Ctrl ][ Error ][ cause SwitchHeader ][ cause handle ][ cause etc.... ]
  */
-#define handleError_MIN_SIZE (Control_HEADER_SIZE + Control_Error_MIN_SIZE + SwitchHeader_SIZE + 4)
+#define handleError_MIN_SIZE (Control_Header_SIZE + Control_Error_MIN_SIZE + SwitchHeader_SIZE + 4)
 static int handleError(struct ControlHandler_pvt* ch,
                        struct Message* msg,
                        uint64_t label,
@@ -43,32 +44,15 @@ static int handleError(struct ControlHandler_pvt* ch,
         Log_info(ch->log, "DROP runt error packet from [%s]", labelStr);
         return 0;
     }
-    struct Control* ctrl = (struct Control*) msg->bytes;
-    struct Control_Error* err = &ctrl->content.error;
-    struct SwitchHeader* causeHeader = &err->cause;
-    uint64_t causeLabel = Endian_bigEndianToHost64(causeHeader->label_be);
-    uint32_t errorType = Endian_bigEndianToHost32(err->errorType_be);
-
-    if (!NumberCompress_isOneHop(label)) {
-        Router_brokenLink(ch->router, label, causeLabel);
-    } else if (errorType == Error_UNDELIVERABLE) {
-        // this is our own InterfaceController complaining
-        // because the node isn't responding to pings.
-        return 0;
-    }
-
-    Log_debug(ch->log, "error packet from [%s] [%s]%s",
-              labelStr,
-              Error_strerror(errorType),
-              ((err->causeHandle == 0xffffffff) ? " caused by ctrl" : ""));
-
-    return 0;
+    Message_shift(msg, SwitchHeader_SIZE + 4, NULL);
+    Message_push32(msg, Event_Core_SWITCH_ERR, NULL);
+    return Interface_sent(&ch->eventIf, msg);
 }
 
 /**
  * Expects [ SwitchHeader ][ Ctrl ][ (key)Ping ][ data etc.... ]
  */
-#define handlePing_MIN_SIZE (Control_HEADER_SIZE + Control_Ping_MIN_SIZE)
+#define handlePing_MIN_SIZE (Control_Header_SIZE + Control_Ping_MIN_SIZE)
 static int handlePing(struct ControlHandler_pvt* ch,
                       struct Message* msg,
                       uint64_t label,
@@ -81,7 +65,7 @@ static int handlePing(struct ControlHandler_pvt* ch,
     }
 
     struct Control* ctrl = (struct Control*) msg->bytes;
-    Message_shift(msg, -Control_HEADER_SIZE, NULL);
+    Message_shift(msg, -Control_Header_SIZE, NULL);
 
     // Ping and keyPing share version location
     struct Control_Ping* ping = (struct Control_Ping*) msg->bytes;
@@ -109,7 +93,7 @@ static int handlePing(struct ControlHandler_pvt* ch,
 
         struct Control_KeyPing* keyPing = (struct Control_KeyPing*) msg->bytes;
         keyPing->magic = Control_KeyPong_MAGIC;
-        ctrl->type_be = Control_KEYPONG_be;
+        ctrl->header.type_be = Control_KEYPONG_be;
         Bits_memcpyConst(keyPing->key, ch->myAddr->key, 32);
 
     } else if (messageType_be == Control_PING_be) {
@@ -119,7 +103,7 @@ static int handlePing(struct ControlHandler_pvt* ch,
             return 0;
         }
         ping->magic = Control_Pong_MAGIC;
-        ctrl->type_be = Control_PONG_be;
+        ctrl->header.type_be = Control_PONG_be;
 
     } else {
         Assert_failure("2+2=5");
@@ -127,10 +111,10 @@ static int handlePing(struct ControlHandler_pvt* ch,
 
     ping->version_be = Endian_hostToBigEndian32(Version_CURRENT_PROTOCOL);
 
-    Message_shift(msg, Control_HEADER_SIZE, NULL);
+    Message_shift(msg, Control_Header_SIZE, NULL);
 
-    ctrl->checksum_be = 0;
-    ctrl->checksum_be = Checksum_engine(msg->bytes, msg->length);
+    ctrl->header.checksum_be = 0;
+    ctrl->header.checksum_be = Checksum_engine(msg->bytes, msg->length);
 
     Message_push32(msg, 0xffffffff, NULL);
 
@@ -166,7 +150,7 @@ static int incomingFromCore(struct Interface_Two* coreIf, struct Message* msg)
     AddrTools_printPath(labelStr, label);
     Log_debug(ch->log, "ctrl packet from [%s]", labelStr);
 
-    if (msg->length < SwitchHeader_SIZE + 4 + Control_HEADER_SIZE) {
+    if (msg->length < 4 + Control_Header_SIZE) {
         Log_info(ch->log, "DROP runt ctrl packet from [%s]", labelStr);
         return Error_NONE;
     }
@@ -180,15 +164,15 @@ static int incomingFromCore(struct Interface_Two* coreIf, struct Message* msg)
 
     struct Control* ctrl = (struct Control*) msg->bytes;
 
-    if (ctrl->type_be == Control_ERROR_be) {
+    if (ctrl->header.type_be == Control_ERROR_be) {
         return handleError(ch, msg, label, labelStr);
 
-    } else if (ctrl->type_be == Control_KEYPING_be
-            || ctrl->type_be == Control_PING_be) {
-        return handlePing(ch, msg, label, labelStr, ctrl->type_be);
+    } else if (ctrl->header.type_be == Control_KEYPING_be
+            || ctrl->header.type_be == Control_PING_be) {
+        return handlePing(ch, msg, label, labelStr, ctrl->header.type_be);
 
-    } else if (ctrl->type_be == Control_KEYPONG_be
-            || ctrl->type_be == Control_PONG_be) {
+    } else if (ctrl->header.type_be == Control_KEYPONG_be
+            || ctrl->header.type_be == Control_PONG_be) {
             Log_debug(ch->log, "got switch pong from [%s]", labelStr);
             // Shift back over the header
             Message_shift(msg, 4 + SwitchHeader_SIZE, NULL);
@@ -197,7 +181,7 @@ static int incomingFromCore(struct Interface_Two* coreIf, struct Message* msg)
     }
 
     Log_info(ch->log, "DROP control packet of unknown type from [%s], type [%d]",
-             labelStr, Endian_bigEndianToHost16(ctrl->type_be));
+             labelStr, Endian_bigEndianToHost16(ctrl->header.type_be));
 
     return 0;
 }
@@ -212,7 +196,7 @@ static int incomingFromSwitchPinger(struct Interface_Two* switchPingerIf, struct
 
 struct ControlHandler* ControlHandler_new(struct Allocator* alloc,
                                           struct Log* logger,
-                                          struct Router* router,
+                                          struct EventEmitter* ee,
                                           struct Address* myAddr)
 {
     struct ControlHandler_pvt* ch = Allocator_calloc(alloc, sizeof(struct ControlHandler_pvt), 1);
@@ -222,6 +206,7 @@ struct ControlHandler* ControlHandler_new(struct Allocator* alloc,
     ch->myAddr = myAddr;
     ch->pub.coreIf.send = incomingFromCore;
     ch->pub.switchPingerIf.send = incomingFromSwitchPinger;
+    EventEmitter_regCore(ee, Event_Core_INVALID, &ch->eventIface)
     Identity_set(ch);
     return &ch->pub;
 }

+ 2 - 2
net/ControlHandler.h

@@ -18,7 +18,7 @@
 #include "interface/Interface.h"
 #include "memory/Allocator.h"
 #include "util/log/Log.h"
-#include "dht/dhtcore/Router.h"
+#include "net/EventEmitter.h"
 #include "util/Linker.h"
 Linker_require("net/ControlHandler.c")
 
@@ -36,7 +36,7 @@ struct ControlHandler
 
 struct ControlHandler* ControlHandler_new(struct Allocator* alloc,
                                           struct Log* logger,
-                                          struct Router* router,
+                                          struct EventEmitter* ee,
                                           struct Address* myAddr);
 
 #endif

+ 275 - 21
net/Event.h

@@ -15,46 +15,300 @@
 #ifndef Event_H
 #define Event_H
 
-enum Event
+#include "util/Assert.h"
+#include "wire/RouteHeader.h"
+#include "wire/DataHeader.h"
+#include "wire/SwitchHeader.h"
+#include "wire/Control.h"
+
+struct Event_Ip6
 {
-    Event_SEARCH,
+    uint8_t ipv6[16];
+};
+#define Event_Ip6_SIZE 16
+Assert_compileTime(sizeof(struct Event_Ip6) == Event_Ip6_SIZE);
+
+struct Event_Node
+{
+    uint8_t ip6[16];
+
+    uint8_t publicKey[32];
+
+    uint64_t path_be;
 
+    /** Quality of path represented by switch label (0:best ffffffff:worst) */
+    uint32_t metric_be;
+
+    uint32_t version_be;
+};
+#define Event_Node_SIZE 64
+Assert_compileTime(sizeof(struct Event_Node) == Event_Node_SIZE);
+
+struct Event_Msg
+{
+    struct RouteHeader route;
+    struct DataHeader data;
+    // ...content follows...
+};
+#pragma GCC poison Event_Msg_SIZE // Contains more data...
+
+struct Event_Ping
+{
+    uint64_t cookie;
+};
+#define Event_Ping_SIZE 8
+Assert_compileTime(sizeof(struct Event_Ping) == Event_Ping_SIZE);
+
+struct Event_Pathfinder_Connect
+{
     /**
-     * Acknoledges the search has begun and a SEARCH_END is guaranteed to be sent later.
-     * contains a SearchHeader.
+     * See Event_Pathfinder_Superiority for more information about this field.
+     * It is recommended that you pass zero at first and then pass a higher number once your table
+     * has populated. Use an Event_Pathfinder_SUPERIORITY event to alter it.
      */
-    Event_SEARCH_BEGIN,
+    uint32_t superiority_be;
 
-    /** End of search, contains a SearchHeader. */
-    Event_SEARCH_END,
+    /** Protocol version of the pathfinder. */
+    uint32_t version_be;
 
-    Event_DISCOVERY,
+    /** Description of the pathfinder in ASCII text. */
+    uint8_t userAgent[64];
+};
+#define Event_Pathfinder_Connect_SIZE 72
+Assert_compileTime(sizeof(struct Event_Pathfinder_Connect) == Event_Pathfinder_Connect_SIZE);
 
-    /** Must be the last entry, never emitted. */
-    Event_INVALID
+/**
+ * Sending a Event_Pathfinder_SUPERIORITY event will cause Event_Core_PATHFINDER event to be sent
+ * again for your pathfinder with the updated superiority. Superiority is a way for multiple
+ * connected pathfinders to operate together by ones which have lower superiority switching
+ * to an "idle" mode, relying on the responses to the DHT pings and searches sent by the higher
+ * superiority pathfinder.
+ */
+struct Event_Pathfinder_Superiority
+{
+    uint32_t superiority_be;
 };
+#define Event_Pathfinder_Superiority_SIZE 4
+Assert_compileTime(
+    sizeof(struct Event_Pathfinder_Superiority) == Event_Pathfinder_Superiority_SIZE);
 
-struct Event_SearchHeader
+enum Event_Pathfinder
 {
-    enum Event event_be;
+    /**
+     * Must be emitted before any other messages.
+     * (Received by: EventEmitter.c)
+     */
+    Event_Pathfinder_CONNECT,
 
-    uint8_t ipv6[16];
+    /**
+     * See Event_Pathfinder_Superiority for more information about this event.
+     * (Received by: EventEmitter.c)
+     */
+    Event_Pathfinder_SUPERIORITY,
+
+    /**
+     * Emit to indicate the discovery of a node or a new best path to the node.
+     * To reduce traffic load, first request all sessions and then only emit for nodes for which
+     * there are active sessions.
+     * (Received by: SessionManager.c)
+     */
+    Event_Pathfinder_NODE,
+
+    /**
+     * Send a DHT message to another node.
+     * TODO
+     */
+    Event_Pathfinder_SENDMSG,
+
+    /**
+     * Event_Pathfinder_PING will elicit an Event_Core_PONG
+     * (Received by: EventEmitter.c)
+     */
+    Event_Pathfinder_PING,
+
+    /**
+     * Event_Pathfinder_PONG must be sent if core sends a Event_Core_PING
+     * (Received by: EventEmitter.c)
+     */
+    Event_Pathfinder_PONG,
+
+    // The following events have no content.
+
+    /**
+     * Get all sessions.
+     * TODO
+     */
+    Event_Pathfinder_SESSIONS,
+
+    /**
+     * Get all peers.
+     * (Received by: InterfaceController.c)
+     */
+    Event_Pathfinder_PEERS,
+
+    /**
+     * Get all registered pathfinders
+     * (Received by: EventEmitter.c)
+     */
+    Event_Pathfinder_PATHFINDERS,
+
+    Event_Pathfinder_INVALID
+};
+
+static const int Event_Pathfinder_SIZES[] = {
+    [Event_Pathfinder_CONNECT]      = 4 + Event_Pathfinder_Connect_SIZE,
+    [Event_Pathfinder_SUPERIORITY]  = 4 + Event_Pathfinder_Superiority_SIZE,
+    [Event_Pathfinder_NODE]         = 4 + Event_Node_SIZE,
+    [Event_Pathfinder_SENDMSG]      = -(4 + (int)sizeof(struct Event_Msg)),
+    [Event_Pathfinder_PING]         = 4 + Event_Ping_SIZE,
+    [Event_Pathfinder_PONG]         = 4 + Event_Ping_SIZE,
+    [Event_Pathfinder_SESSIONS]     = 4,
+    [Event_Pathfinder_PEERS]        = 4,
+    [Event_Pathfinder_PATHFINDERS]  = 4
 };
 
-struct Event_DiscoveryHeader
+struct Event_FromPathfinder
 {
-    enum Event event_be;
+    enum Event_Pathfinder event_be;
 
-    uint8_t ip6[16];
+    /* Number of the Pathfinder which sent this event, added by EventEmitter.c */
+    uint8_t target_be;
 
-    uint8_t publicKey[32];
+    union {
+        struct Event_Pathfinder_Connect connect;
+        struct Event_Pathfinder_Superiority superiority;
+        struct Event_Node node;
+        struct Event_Msg sendmsg;
+        struct Event_Ping ping;
+        struct Event_Ping pong;
+        uint8_t bytes[1];
+    } content;
+};
 
-    uint64_t path_be;
+//// ------------------------- Core Events ------------------------- ////
 
-    /** Quality of path represented by switch label (0:best ffffffff:worst) */
-    uint32_t metric_be;
+enum Event_Core
+{
+    /**
+     * Emitted when a pathfinder connects or if Event_Pathfinder_PATHFINDERS is sent.
+     * This message is guaranteed to be sent before any other message and the first instance
+     * refers to "you" the pathfinder which just connected.
+     * (emitted by: EventEmitter.c)
+     */
+    Event_Core_PATHFINDER,
 
-    uint32_t version_be;
+    /**
+     * Emitted when a pathfinder disconnects from the core
+     * (emitted by: EventEmitter.c)
+     */
+    Event_Core_PATHFINDER_GONE,
+
+    /**
+     * Emitted if the core wants the pathfinder to begin searching for a node.
+     * (emitted by: SessionManager.c)
+     */
+    Event_Core_SEARCH_REQ,
+
+    /**
+     * Emitted if a switch error is received, no matter what type of packet causes it.
+     * (emitted by: ControlHandler.c)
+     */
+    Event_Core_SWITCH_ERR,
+
+    /**
+     * Emitted when a peer connects (becomes state ESTABLISHED) or
+     * emitted for every peer if Event_Pathfinder_PEERS is sent.
+     * (emitted by: InterfaceController.c)
+     */
+    Event_Core_PEER,
+
+    /**
+     * Emitted when a peer disconnects (or becomes state UNRESPONSIVE)
+     * (emitted by: InterfaceController.c)
+     */
+    Event_Core_PEER_GONE,
+
+    /**
+     * Emitted if a new session begins,
+     * emitted for every active session of Event_Pathfinder_SESSIONS is sent.
+     * TODO
+     */ 
+    Event_Core_SESSION,
+
+    /** Emitted when a session ends. TODO */
+    Event_Core_SESSION_ENDED,
+
+    /** Emitted when SessionManager sees an incoming packet with a new path. TODO */
+    Event_Core_DISCOVERED_PATH,
+
+    /** Emitted for each incoming DHT message. TODO */
+    Event_Core_MSG,
+
+    /**
+     * Emitted from time to time in order to verify the pathfinder is alive.
+     * (emitted by: EventEmitter.c)
+     */
+    Event_Core_PING,
+
+    /**
+     * Will be emitted if the pathfinder emits an Event_Pathfinder_PING.
+     * (emitted by: EventEmitter.c)
+     */
+    Event_Core_PONG,
+
+    Event_Core_INVALID
+};
+
+struct Event_Core_Pathfinder
+{
+    /** See struct Event_Pathfinder_Superiority for more information */
+    uint32_t superiority_be;
+
+    /** The number of this pathfinder. */
+    uint32_t pathfinderId_be;
+
+    /** Description of the pathfinder in ASCII text. */
+    uint8_t userAgent[64];
+};
+#define Event_Core_Pathfinder_SIZE 72
+Assert_compileTime(sizeof(struct Event_Core_Pathfinder) == Event_Core_Pathfinder_SIZE);
+
+struct Event_Core_SwitchErr
+{
+    struct SwitchHeader sh;
+    uint32_t ffffffff;
+    struct Control_Header ctrlHeader;
+    struct Control_Error ctrlErr;
+};
+#pragma GCC poison Event_Core_SwitchErr_SIZE
+Assert_compileTime(sizeof(struct Event_Core_SwitchErr) ==
+    (SwitchHeader_SIZE + 4 + Control_Header_SIZE + Control_Error_MIN_SIZE));
+
+#define Event_FromCore_MAGIC 0x1337babe
+struct Event_FromCore
+{
+    enum Event_Core event_be;
+
+    /* Number of the Pathfinder to send this event to, 0xffffffff sends to all. */
+    uint8_t target_be;
+
+    union {
+        struct Event_Core_Pathfinder pathfinder;
+        struct Event_Core_Pathfinder pathfinderGone;
+        struct Event_Ip6 searchReq;
+        struct Event_Core_SwitchErr switchErr;
+        struct Event_Node peer;
+        struct Event_Node peerGone;
+        struct Event_Node session;
+        struct Event_Node sessionEnded;
+        struct Event_Node discoveredPath;
+        struct Event_Msg msg;
+        struct Event_Ping ping;
+        struct Event_Ping pong;
+        uint8_t bytes[4];
+    } content;
 };
+// Event_FromCore contains a union so it's size is not useful.
+#pragma GCC poison Event_FromCore_SIZE
 
 #endif

+ 215 - 16
net/EventEmitter.c

@@ -17,6 +17,7 @@
 #include "net/Event.h"
 #include "net/EventEmitter.h"
 #include "util/Identity.h"
+#include "util/log/Log.h"
 
 #include <stdbool.h>
 
@@ -24,20 +25,54 @@
 #define ArrayList_NAME Ifaces
 #include "util/ArrayList.h"
 
+#define PING_MAGIC 0x01234567
+
+struct Pathfinder
+{
+    struct EventEmitter_pvt* ee;
+    struct Interface_Two iface;
+    struct Allocator* alloc;
+    uint8_t userAgent[64];
+
+    uint32_t superiority;
+
+    uint32_t version;
+
+    #define Pathfinder_state_DISCONNECTED 0
+    #define Pathfinder_state_CONNECTED    1
+    #define Pathfinder_state_ERROR        2
+    uint16_t state;
+    uint16_t pathfinderId;
+
+    /**
+     * Number of bytes sent since the last ping, each ping contains this number at the time
+     * of pinging, then the number is incremented as more messages are sent, if it goes over
+     * 65535, the state is set to error.
+     */
+    uint32_t bytesSinceLastPing;
+
+    Identity
+};
+#define ArrayList_TYPE struct Pathfinder*
+#define ArrayList_NAME Pathfinders
+#include "util/ArrayList.h"
+
 struct EventEmitter_pvt
 {
     struct EventEmitter pub;
     struct Interface_Two trickIf;
     struct Allocator* alloc;
-    struct ArrayList_Ifaces* listTable[Event_INVALID];
+    struct Log* log;
+    struct ArrayList_Ifaces* listTable[Event_Pathfinder_INVALID];
+    struct ArrayList_Pathfinders* pathfinders;
     Identity
 };
 
-static struct ArrayList_Ifaces* getHandlers(struct EventEmitter_pvt* ee, enum Event ev, bool create)
+static struct ArrayList_Ifaces* getHandlers(struct EventEmitter_pvt* ee,
+                                            enum Event_Pathfinder ev,
+                                            bool create)
 {
-    // enums are signed D:
-    ev &= 0xffff;
-    if (ev >= Event_INVALID) { return NULL; }
+    if (ev < 0 || ev >= Event_Pathfinder_INVALID) { return NULL; }
     struct ArrayList_Ifaces* out = ee->listTable[ev];
     if (!out) {
         out = ee->listTable[ev] = ArrayList_Ifaces_new(ee->alloc);
@@ -45,12 +80,154 @@ static struct ArrayList_Ifaces* getHandlers(struct EventEmitter_pvt* ee, enum Ev
     return out;
 }
 
-static int incoming(struct Interface_Two* trickIf, struct Message* msg)
+static void sendToPathfinder(struct Pathfinder* pf, struct Message* msg)
+{
+    if (!pf || pf->state != Pathfinder_state_CONNECTED) { return; }
+    Interface_send(&pf->iface, msg);
+    if (pf->bytesSinceLastPing < 8192 && pf->bytesSinceLastPing + msg->length >= 8192) {
+        struct Message* ping = Message_new(0, 512, msg->alloc);
+        Message_push32(ping, pf->bytesSinceLastPing, NULL);
+        Message_push32(ping, PING_MAGIC, NULL);
+        Interface_send(&pf->iface, ping);
+    }
+    pf->bytesSinceLastPing += messageClone->length;
+}
+
+static int incomingFromCore(struct Interface_Two* trickIf, struct Message* msg)
 {
     struct EventEmitter_pvt* ee = Identity_containerOf(trickIf, struct EventEmitter_pvt, trickIf);
-    Assert_true(msg->length >= 4);
+    Assert_true(msg->length >= 8);
     Assert_true(!((uintptr_t)msg->bytes % 4) && "alignment");
-    enum Event ev = Message_pop32(msg, NULL);
+    enum Event_Core ev = Message_pop32(msg, NULL);
+    uint32_t pathfinderNum = Message_pop32(msg, NULL);
+    Message_push32(msg, ev, NULL);
+    if (pathfinderNum != 0xffffffff) {
+        struct Pathfinder* pf = ArrayList_Pathfinders_get(ee->pathfinders, i)[0];
+        sendToPathfinder(pf, msg);
+    } else {
+        for (int i = 0; i < ee->pathfinders->length; i++) {
+            struct Pathfinder* pf = ArrayList_Pathfinders_get(ee->pathfinders, i)[0];
+            if (!pf || pf->state != Pathfinder_state_CONNECTED) { continue; }
+            struct Message* messageClone = msg;
+            if (ee->pathfinders->length > i+1) {
+                // if only one handler, no need to copy the message...
+                messageClone = Message_clone(msg, msg->alloc);
+            }
+            sendToPathfinder(pf, messageClone);
+        }
+    }
+    return 0;
+}
+
+static struct Message* pathfinderMsg(enum Event_Core ev,
+                                     struct Pathfinder* pf,
+                                     struct Allocator* alloc)
+{
+    struct Message* msg = Message_new(Event_Core_Pathfinder_SIZE, 512, alloc);
+    struct Event_Core_Pathfinder* pathfinder = (struct Event_Core_Pathfinder*) msg->bytes;
+    pathfinder->superiority_be = Endian_hostToBigEndian32(pf->superiority);
+    pathfinder->pathfinderId_be = Endian_hostToBigEndian32(pf->pathfinderId);
+    Bits_memcpyConst(pathfinder->userAgent, pf->userAgent, 64);
+    Message_push32(msg, ev, NULL);
+    return msg;
+}
+
+static int handleFromPathfinder(enum Event_Pathfinder ev,
+                                struct Message* msg,
+                                struct EventEmitter_pvt* ee,
+                                struct Pathfinder* pf)
+{
+    switch (ev) {
+        default: return false;
+
+        case Event_Pathfinder_CONNECT: {
+            struct Event_Pathfinder_Connect connect;
+            Message_shift(msg, -8, NULL);
+            Message_pop(msg, &connect, Event_Pathfinder_Connect_SIZE, NULL);
+            pf->superiority = Endian_bigEndianToHost32(connect.superiority_be);
+            pf->version = Endian_bigEndianToHost32(connect.version_be);
+            Bits_memcpyConst(pf->userAgent, connect.userAgent, 64);
+            pf->state = Pathfinder_state_CONNECTED;
+            // fallthrough
+        }
+        case Event_Pathfinder_SUPERIORITY: {
+            if (Event_Pathfinder_SUPERIORITY == ev) {
+                Message_shift(msg, -8, NULL);
+                pf->superiority = Message_pop32(msg, NULL);
+            }
+            struct Message* resp = pathfinderMsg(Event_Core_PATHFINDER, pf, msg->alloc);
+            incomingFromCore(&ee->trickIf, resp);
+            break;
+        }
+
+        case Event_Pathfinder_PING: {
+            Interface_send(&pf->iface, msg);
+            break;
+        }
+        case Event_Pathfinder_PONG: {
+            Message_shift(msg, -8, NULL);
+            uint32_t cookie = Message_pop32(msg, NULL);
+            uint32_t count = Message_pop32(msg, NULL);
+            if (cookie != PING_MAGIC || count > pf->bytesSinceLastPing) {
+                pf->state = Pathfinder_state_ERROR;
+                struct Message* resp = pathfinderMsg(Event_Core_PATHFINDER_GONE, pf, msg->alloc);
+                incomingFromCore(&ee->trickIf, resp);
+            } else {
+                pf->bytesSinceLastPing -= count;
+            }
+            break;
+        }
+        case Event_Pathfinder_PATHFINDERS: {
+            for (int i = 0; i < ee->pathfinders->length; i++) {
+                struct Pathfinder* xpf = ArrayList_Pathfinders_get(ee->pathfinders, i)[0];
+                if (!xpf || xpf->state != Pathfinder_state_CONNECTED) { continue; }
+                struct Allocator* alloc = Allocator_child(msg->alloc);
+                struct Message* resp = pathfinderMsg(Event_Core_PATHFINDER, pf, alloc);
+                sendToPathfinder(pf, resp);
+                Allocator_free(alloc);
+            }
+            break;
+        }
+    }
+    return true;
+}
+
+static int incomingFromPathfinder(struct Interface_Two* iface, struct Message* msg)
+{
+    struct Pathfinder* pf = Identity_containerOf(iface, struct Pathfinder, iface);
+    struct EventEmitter_pvt* ee = Identity_check((struct EventEmitter_pvt*) pf->ee);
+    if (msg->length < 4) {
+        Log_debug(ee->log, "DROPPF runt");
+        return 0;
+    }
+    enum Event_Pathfinder ev = Message_pop32(msg, NULL);
+    Message_push32(msg, pf->pathfinderId, NULL);
+    Message_push32(msg, ev, NULL);
+    if (ev < 0 || ev >= Event_Pathfinder_INVALID) {
+        Log_debug(ee->log, "DROPPF invalid type [%d]", ev);
+        return 0;
+    }
+    if (Event_Pathfinder_SIZES[ev] < 0) {
+        if (-(Event_Pathfinder_SIZES[ev]) > msg->length) {
+            Log_debug(ee->log, "DROPPF runt");
+            return 0;
+        }
+    } else if (Event_Pathfinder_SIZES[ev] != msg->length) {
+        Log_debug(ee->log, "DROPPF incorrect length for type [%d] expected [%d] got [%d]",
+                  ev, Event_Pathfinder_SIZES[ev], msg->length);
+        return 0;
+    }
+
+    if (pf->state == Pathfinder_state_DISCONNECTED && ev != Event_Pathfinder_CONNECT) {
+        Log_debug(ee->log, "DROPPF disconnected and event != CONNECT event:[%d]", ev);
+        return 0;
+    } else if (pf->state == Pathfinder_state_CONNECTED) {
+        Log_debug(ee->log, "DROPPF error state");
+        return 0;
+    }
+
+    if (handleFromPathfinder(ev, msg, ee, pf)) { return 0; }
+
     struct ArrayList_Ifaces* handlers = getHandlers(ee, ev, false);
     if (!handlers) { return 0; }
     for (int i = 0; i < handlers->length; i++) {
@@ -59,30 +236,52 @@ static int incoming(struct Interface_Two* trickIf, struct Message* msg)
             // if only one handler, no need to copy the message...
             messageClone = Message_clone(msg, msg->alloc);
         }
-        struct Interface_Two** iface = ArrayList_Ifaces_get(handlers, i);
+        struct Interface_Two* iface = ArrayList_Ifaces_get(handlers, i)[0];
         // We have to call this manually because we don't have an interface handy which is
         // actually plumbed with this one.
-        Assert_true(iface[0]);
-        Assert_true(iface[0]->send);
-        iface[0]->send(iface[0], messageClone);
+        Assert_true(iface);
+        Assert_true(iface->send);
+        iface->send(iface, messageClone);
     }
     return 0;
 }
 
-void EventEmitter_regIface(struct EventEmitter* emitter, struct Interface_Two* iface, enum Event ev)
+void EventEmitter_regCore(struct EventEmitter* eventEmitter,
+                          struct Interface_Two* iface,
+                          enum Event_Pathfinder ev)
 {
-    struct EventEmitter_pvt* ee = Identity_check((struct EventEmitter_pvt*) emitter);
+    struct EventEmitter_pvt* ee = Identity_check((struct EventEmitter_pvt*) eventEmitter);
     iface->connectedIf = &ee->trickIf;
     struct ArrayList_Ifaces* l = getHandlers(ee, ev, true);
     if (!l) { return; }
     ArrayList_Ifaces_add(l, &iface);
 }
 
-struct EventEmitter* EventEmitter_new(struct Allocator* alloc)
+void EventEmitter_regPathfinderIface(struct EventEmitter* emitter, struct Interface_Two* iface)
+{
+    struct EventEmitter_pvt* ee = Identity_check((struct EventEmitter_pvt*) emitter);
+    struct Allocator* alloc = Allocator_child(ee->alloc);
+    struct Pathfinder* pf = Allocator_calloc(alloc, sizeof(struct Pathfinder), 1);
+    pf->ee = ee;
+    pf->iface.send = incomingFromPathfinder;
+    pf->alloc = alloc;
+    Interface_plumb(&pf->iface, iface);
+    Identity_set(pf);
+    int i = 0;
+    for (; i < ee->pathfinders->length; i++) {
+        struct Pathfinder* xpf = ArrayList_Pathfinders_get(ee->pathfinders, i)[0];
+        if (!xpf) { break; }
+    }
+    pf->pathfinderId = ArrayList_Pathfinders_put(ee->pathfinders, i, &pf);
+}
+
+struct EventEmitter* EventEmitter_new(struct Allocator* alloc, struct Log* log)
 {
     struct EventEmitter_pvt* ee = Allocator_calloc(alloc, sizeof(struct EventEmitter_pvt), 1);
+    ee->log = log;
     ee->alloc = alloc;
-    ee->trickIf.send = incoming;
+    ee->trickIf.send = incomingFromCore;
+    ee->pathfinders = ArrayList_Pathfinders_new(ee->alloc);
     Identity_set(ee);
     return &ee->pub;
 }

+ 8 - 3
net/EventEmitter.h

@@ -18,6 +18,7 @@
 #include "interface/Interface.h"
 #include "memory/Allocator.h"
 #include "net/Event.h"
+#include "util/log/Log.h"
 #include "util/Linker.h"
 Linker_require("net/EventEmitter.c")
 
@@ -29,10 +30,14 @@ struct EventEmitter
 /**
  * Register an interface to listen for and fire events.
  * The same interface may be registered multiple times.
- * If you only intend to fire events, just register with Event_INVALID.
+ * If you only intend to fire events, just register with Event_Pathfinder_INVALID.
  */
-void EventEmitter_regIface(struct EventEmitter* ee, struct Interface_Two* iface, enum Event ev);
+void EventEmitter_regCore(struct EventEmitter* ee,
+                          struct Interface_Two* iface,
+                          enum Event_Pathfinder ev);
 
-struct EventEmitter* EventEmitter_new(struct Allocator* alloc);
+void EventEmitter_regPathfinderIface(struct EventEmitter* ee, struct Interface_Two* iface);
+
+struct EventEmitter* EventEmitter_new(struct Allocator* alloc, struct Log* log);
 
 #endif

+ 70 - 88
net/SessionManager.c

@@ -22,13 +22,13 @@
 #include "util/events/Time.h"
 #include "util/Defined.h"
 #include "wire/RouteHeader.h"
+#include "util/events/Timeout.h"
 
 struct BufferedMessage
 {
     struct Message* msg;
     struct Allocator* alloc;
     uint32_t timeSent;
-    bool confirmed;
 };
 
 struct Ip6 {
@@ -67,7 +67,7 @@ struct SessionManager_pvt
         Log_debug(logger, "ver[%u] send[%d] recv[%u] ip[%s] path[%s] " message,        \
                   session->version,                                                    \
                   session->sendHandle,                                                 \
-                  Endian_hostToBigEndian32(session->receiveHandle_be),                 \
+                  session->receiveHandle,                                              \
                   ip,                                                                  \
                   path,                                                                \
                   __VA_ARGS__);                                                        \
@@ -118,6 +118,23 @@ static uint8_t incomingFromSwitchPostCryptoAuth(struct Message* msg, struct Inte
     uint8_t* pubKey = CryptoAuth_getHerPublicKey(session->internal);
     Bits_memcpyConst(header->publicKey, pubKey, 32);
 
+    uint64_t path = Endian_bigEndianToHost64(sh->label_be);
+    if (!session->sendSwitchLabel) {
+        session->sendSwitchLabel = path;
+    }
+    if (path != session->recvSwitchLabel) {
+        session->recvSwitchLabel = path;
+        struct Message* eventMsg = Message_new(Event_Node_SIZE, 512, msg->alloc);
+        struct Event_Node* node = (struct Event_Node*) eventMsg->bytes;
+        Bits_memcpyConst(node->ip6, session->ip6, 16);
+        Bits_memcpyConst(node->publicKey, pubKey, 32);
+        node->path_be = sh->label_be;
+        node->metric_be = 0xffffffff;
+        node->version_be = header->version_be;
+        Message_push32(eventMsg, Event_Core_SEARCH_REQ, NULL);
+        Interface_send(&bw->eventIf, eventMsg);
+    }
+
     Interface_send(&bw->pub.insideIf, msg);
     // Never return errors here because they can cause unencrypted stuff to be returned as an error.
     return 0;
@@ -196,18 +213,13 @@ static int incomingFromSwitchIf(struct Interface_Two* iface, struct Message* msg
     return 0;
 }
 
-/**
- * If an unsent buffer sits around for more than 120 seconds, it must be removed because
- * the search has somehow gone missing. If the search was not confirmed to have begun,
- * do not keep for more than 1 second.
- */
 static void checkTimedOutBuffers(void* vSessionManager)
 {
     struct SessionManager_pvt* bw = Identity_check((struct SessionManager_pvt*) vSessionManager);
     for (int i = 0; i < (int)bw->bufMap.count; i++) {
         struct BufferedMessage* buffered = bw->bufMap.values[i];
         uint64_t lag = Time_currentTimeSeconds(bw->eventBase) - buffered->timeSent;
-        if ((buffered->confirmed || lag < 1) && lag < 120) { continue; }
+        if (lag < 10) { continue; }
         Map_BufferedMessages_remove(i, &bw->bufMap);
         Allocator_free(buffered->alloc);
         i--;
@@ -224,8 +236,10 @@ static int needsLookup(struct SessionManager_pvt* bw, struct Message* msg)
     }
     int index = Map_BufferedMessages_indexForKey((struct Ip6*)header->ip6, &bw->bufMap);
     if (index > -1) {
-        Log_debug(bw->log, "DROP message which needs lookup because one is in progress");
-        return 0;
+        struct BufferedMessage* buffered = bw->bufMap.values[index];
+        Map_BufferedMessages_remove(index, &bw->bufMap);
+        Allocator_free(buffered->alloc);
+        Log_debug(bw->log, "DROP message which needs lookup because new one received");
     }
     if ((int)bw->bufMap.count >= bw->pub.maxBufferedMessages) {
         checkTimedOutBuffers(bw);
@@ -247,7 +261,7 @@ static int needsLookup(struct SessionManager_pvt* bw, struct Message* msg)
     struct Allocator* eventAlloc = Allocator_child(lookupAlloc);
     struct Message* eventMsg = Message_new(0, 512, eventAlloc);
     Message_push(eventMsg, header->ip6, 16, NULL);
-    Message_push32(eventMsg, Event_SEARCH, NULL);
+    Message_push32(eventMsg, Event_Core_SEARCH_REQ, NULL);
     Interface_send(&bw->eventIf, eventMsg);
     Allocator_free(eventAlloc);
 
@@ -296,7 +310,7 @@ static int readyToSend(struct SessionManager_pvt* bw,
     CryptoAuth_resetIfTimeout(sess->internal);
     if (CryptoAuth_getState(sess->internal) < CryptoAuth_HANDSHAKE3) {
         // Put the handle into the message so that it's authenticated.
-        Message_push(msg, &sess->receiveHandle_be, 4, NULL);
+        Message_push32(msg, sess->receiveHandle, NULL);
 
         // Copy back the SwitchHeader so it is not clobbered.
         Message_shift(msg, (CryptoHeader_SIZE + SwitchHeader_SIZE), NULL);
@@ -336,8 +350,8 @@ static int incomingFromInsideIf(struct Interface_Two* iface, struct Message* msg
 
     if (header->sh.label_be) {
         // fallthrough
-    } else if (sess->knownSwitchLabel) {
-        header->sh.label_be = Endian_hostToBigEndian64(sess->knownSwitchLabel);
+    } else if (sess->sendSwitchLabel) {
+        header->sh.label_be = Endian_hostToBigEndian64(sess->sendSwitchLabel);
     } else {
         return needsLookup(bw, msg);
     }
@@ -345,6 +359,7 @@ static int incomingFromInsideIf(struct Interface_Two* iface, struct Message* msg
     return readyToSend(bw, sess, msg);
 }
 
+/* too good to toss!
 static uint32_t getEffectiveMetric(uint64_t nowMilliseconds,
                                    uint32_t metricHalflifeMilliseconds,
                                    uint32_t metric,
@@ -372,81 +387,48 @@ static uint32_t getEffectiveMetric(uint64_t nowMilliseconds,
 
     return UINT32_MAX - out;
 }
+*/
 
-static int incomingFromEventIf(struct Interface_Two* iface, struct Message* msg)
+static Interface_Ret sessions(struct Interface_Two* iface, struct Message* msg)
 {
-    struct SessionManager_pvt* bw = Identity_containerOf(iface, struct SessionManager_pvt, eventIf);
-    enum Event ev = Message_pop32(msg, NULL);
-
-    struct Ip6 ip6;
-    Message_pop(msg, &ip6, 16, NULL);
-    int index = Map_BufferedMessages_indexForKey(&ip6, &bw->bufMap);
-
-    if (ev == Event_DISCOVERY) {
-        struct SessionTable_Session* sess;
-        if (index == -1) {
-            sess = SessionTable_sessionForIp6(ip6.bytes, bw->pub.sessionTable);
-            // If we discovered a node we're not interested in ...
-            if (!sess) { return 0; }
-            Message_pop(msg, NULL, 32, NULL);
-        } else {
-            uint8_t publicKey[32];
-            Message_pop(msg, publicKey, 32, NULL);
-            sess = SessionTable_getSession(ip6.bytes, publicKey, bw->pub.sessionTable);
-        }
+    
+}
 
-        uint64_t path = Message_pop64(msg, NULL);
-        uint32_t metric = Message_pop32(msg, NULL);
-        sess->version = Message_pop32(msg, NULL);
+static Interface_Ret incomingFromEventIf(struct Interface_Two* iface, struct Message* msg)
+{
+    struct SessionManager_pvt* bw = Identity_containerOf(iface, struct SessionManager_pvt, eventIf);
+    enum Event_Pathfinder ev = Message_pop32(msg, NULL);
+    uint32_t sourcePf = Message_pop32(msg, NULL);
+    if (ev == Event_Pathfinder_SESSIONS) {
         Assert_true(!msg->length);
-
-        uint64_t now = Time_currentTimeMilliseconds(bw->eventBase);
-
-        if (!sess->knownSwitchLabel) {
-            sess->knownSwitchLabel = path;
-            sess->metric = metric;
-            sess->timeDiscovered = now;
-        } else {
-            uint32_t effectiveMetric = getEffectiveMetric(now,
-                                                          bw->pub.metricHalflifeMilliseconds,
-                                                          sess->metric,
-                                                          sess->timeDiscovered);
-            if (metric < effectiveMetric) {
-                sess->knownSwitchLabel = path;
-                sess->metric = metric;
-                sess->timeDiscovered = now;
-            }
-        }
-
-        // Send what's on the buffer...
-        if (index > -1) {
-            struct BufferedMessage* bm = bw->bufMap.values[index];
-            readyToSend(bw, sess, bm->msg);
-            Map_BufferedMessages_remove(index, &bw->bufMap);
-            Allocator_free(bm->alloc);
-        }
-        return 0;
+        return sessions(bw, sourcePf);
     }
+    Assert_true(ev == Event_Pathfinder_NODE);
 
-    // It has to be a SEARCH_BEGIN or SEARCH_END so it must be exhaused.
+    struct Event_Node node;
+    Message_pop(msg, &node, Event_Node_SIZE, NULL);
     Assert_true(!msg->length);
+    int index = Map_BufferedMessages_indexForKey((struct Ip6*)node.ip6, &bw->bufMap);
+    struct SessionTable_Session* sess;
+    if (index == -1) {
+        sess = SessionTable_sessionForIp6(node.ip6, bw->pub.sessionTable);
+        // If we discovered a node we're not interested in ...
+        if (!sess) { return Interface_RET; }
+    } else {
+        sess = SessionTable_getSession(node.ip6, node.publicKey, bw->pub.sessionTable);
+    }
 
-    if (index == -1) { return 0; }
-    struct BufferedMessage* bm = bw->bufMap.values[index];
-    if (ev == Event_SEARCH_BEGIN) {
-        bm->confirmed = true;
-        return 0;
-    } else if (ev == Event_SEARCH_END) {
-        if (Defined(Log_DEBUG)) {
-            uint8_t ipStr[40];
-            AddrTools_printIp(ipStr, ip6.bytes);
-            Log_debug(bw->log, "DROP buffered packet to [%s] because search found nothing", ipStr);
-        }
+    sess->sendSwitchLabel = Endian_bigEndianToHost64(node.path_be);
+    sess->version = Endian_bigEndianToHost64(node.version_be);
+
+    // Send what's on the buffer...
+    if (index > -1) {
+        struct BufferedMessage* bm = bw->bufMap.values[index];
+        readyToSend(bw, sess, bm->msg);
         Map_BufferedMessages_remove(index, &bw->bufMap);
         Allocator_free(bm->alloc);
-        return 0;
     }
-    Assert_failure("2+2=5");
+    return Interface_RET;
 }
 
 struct SessionManager* SessionManager_new(struct Allocator* alloc,
@@ -460,7 +442,6 @@ struct SessionManager* SessionManager_new(struct Allocator* alloc,
     bw->alloc = alloc;
     bw->pub.switchIf.send = incomingFromSwitchIf;
     bw->pub.insideIf.send = incomingFromInsideIf;
-    bw->eventIf.send = incomingFromEventIf;
     bw->bufMap.allocator = alloc;
     bw->log = log;
     bw->ca = cryptoAuth;
@@ -469,17 +450,18 @@ struct SessionManager* SessionManager_new(struct Allocator* alloc,
     bw->pub.metricHalflifeMilliseconds = SessionManager_METRIC_HALFLIFE_MILLISECONDS_DEFAULT;
     bw->pub.maxBufferedMessages = SessionManager_MAX_BUFFERED_MESSAGES_DEFAULT;
 
-    EventEmitter_regIface(ee, &bw->eventIf, Event_DISCOVERY);
-    EventEmitter_regIface(ee, &bw->eventIf, Event_SEARCH_BEGIN);
-    EventEmitter_regIface(ee, &bw->eventIf, Event_SEARCH_END);
+    bw->eventIf.send = incomingFromEventIf;
+    EventEmitter_regCore(ee, &bw->eventIf, Event_Pathfinder_NODE);
 
     bw->pub.sessionTable = SessionTable_new(incomingFromSwitchPostCryptoAuth,
-                                                readyToSendPostCryptoAuth,
-                                                bw,
-                                                eventBase,
-                                                cryptoAuth,
-                                                rand,
-                                                alloc);
+                                            readyToSendPostCryptoAuth,
+                                            bw,
+                                            eventBase,
+                                            cryptoAuth,
+                                            rand,
+                                            alloc);
+
+    Timeout_setInterval(checkTimedOutBuffers, bw, 10000, eventBase, alloc);
 
     Identity_set(bw);
 

+ 0 - 1
net/SessionManager.h

@@ -26,7 +26,6 @@
 Linker_require("net/SessionManager.c")
 
 /**
- * Called SessionManager because I can't think of what this should be called.
  * Purpose of this module is to take packets from "the inside" which contain ipv6 address and
  * skeleton switch header and find an appropriate CryptoAuth session for them or begin one.
  * If a key for this node cannot be found then the packet will be blocked and a search will be

+ 2 - 3
net/SessionManager_admin.c

@@ -101,15 +101,14 @@ static void sessionStats(Dict* args,
     struct Address addr;
     uint8_t* key = CryptoAuth_getHerPublicKey(session->internal);
     Bits_memcpyConst(addr.key, key, 32);
-    addr.path = session->knownSwitchLabel;
+    addr.path = session->sendSwitchLabel;
     addr.protocolVersion = session->version;
 
     Dict_putString(r, String_CONST("addr"), Address_toString(&addr, alloc), alloc);
 
     Dict_putString(r, String_CONST("publicKey"), Key_stringify(key, alloc), alloc);
     Dict_putInt(r, String_CONST("version"), session->version, alloc);
-    Dict_putInt(r, String_CONST("handle"),
-                Endian_bigEndianToHost32(session->receiveHandle_be), alloc);
+    Dict_putInt(r, String_CONST("handle"), session->receiveHandle, alloc);
     Dict_putInt(r, String_CONST("sendHandle"), session->sendHandle, alloc);
 
     Dict_putInt(r, String_CONST("timeOfLastIn"), session->timeOfLastIn, alloc);

+ 2 - 4
net/SessionTable.c

@@ -209,16 +209,14 @@ struct SessionTable_Session* SessionTable_getSession(uint8_t* lookupKey,
 
     int ifaceIndex = Map_OfSessionsByIp6_put((struct Ip6*)lookupKey, &ss, &sm->ifaceMap);
 
-    ss->pub.receiveHandle_be =
-        Endian_hostToBigEndian32(sm->ifaceMap.handles[ifaceIndex] + sm->first);
+    ss->pub.receiveHandle = sm->ifaceMap.handles[ifaceIndex] + sm->first;
 
     check(sm, ifaceIndex);
 
     return &Identity_check(sm->ifaceMap.values[ifaceIndex])->pub;
 }
 
-struct SessionTable_Session* SessionTable_sessionForHandle(uint32_t handle,
-                                                               struct SessionTable* sm)
+struct SessionTable_Session* SessionTable_sessionForHandle(uint32_t handle, struct SessionTable* sm)
 {
     int index = Map_OfSessionsByIp6_indexForHandle(handle - sm->first, &sm->ifaceMap);
     if (index < 0) { return NULL; }

+ 6 - 9
net/SessionTable.h

@@ -45,8 +45,8 @@ struct SessionTable_Session
     /** The CryptoAuth state as of the last message. See: CryptoAuth_getState() */
     int cryptoAuthState;
 
-    /** The handle which will be used to lookup this session on our side, big endian. */
-    uint32_t receiveHandle_be;
+    /** The handle which will be used to lookup this session on our side. */
+    uint32_t receiveHandle;
 
     /** The handle which we are expected to send to identify ourselves */
     uint32_t sendHandle;
@@ -57,14 +57,11 @@ struct SessionTable_Session
     /** The IPv6 address of the other node. */
     uint8_t ip6[16];
 
-    /** Some label which is known to go to this node... */
-    uint64_t knownSwitchLabel;
+    /** The best known switch label for reaching this node. */
+    uint64_t sendSwitchLabel;
 
-    /** Quality of switch label. */
-    uint32_t metric;
-
-    /** Milliseconds since the epoch when this switch label was discovered. */
-    uint32_t timeDiscovered;
+    /** The switch label which this node uses for reaching us. */
+    uint64_t recvSwitchLabel;
 };
 
 struct SessionTable_HandleList

+ 16 - 16
net/SwitchPinger.c

@@ -95,8 +95,8 @@ static int messageFromControlHandler(struct Interface_Two* iface, struct Message
     Assert_true(handle == 0xffffffff);
 
     struct Control* ctrl = (struct Control*) msg->bytes;
-    if (ctrl->type_be == Control_PONG_be) {
-        Message_shift(msg, -Control_HEADER_SIZE, NULL);
+    if (ctrl->header.type_be == Control_PONG_be) {
+        Message_shift(msg, -Control_Header_SIZE, NULL);
         ctx->error = Error_NONE;
         if (msg->length >= Control_Pong_MIN_SIZE) {
             struct Control_Ping* pongHeader = (struct Control_Ping*) msg->bytes;
@@ -111,8 +111,8 @@ static int messageFromControlHandler(struct Interface_Two* iface, struct Message
             return Error_INVALID;
         }
 
-    } else if (ctrl->type_be == Control_KEYPONG_be) {
-        Message_shift(msg, -Control_HEADER_SIZE, NULL);
+    } else if (ctrl->header.type_be == Control_KEYPONG_be) {
+        Message_shift(msg, -Control_Header_SIZE, NULL);
         ctx->error = Error_NONE;
         if (msg->length >= Control_KeyPong_HEADER_SIZE && msg->length <= Control_KeyPong_MAX_SIZE) {
             struct Control_KeyPing* pongHeader = (struct Control_KeyPing*) msg->bytes;
@@ -131,10 +131,10 @@ static int messageFromControlHandler(struct Interface_Two* iface, struct Message
             return Error_INVALID;
         }
 
-    } else if (ctrl->type_be == Control_ERROR_be) {
-        Message_shift(msg, -Control_HEADER_SIZE, NULL);
+    } else if (ctrl->header.type_be == Control_ERROR_be) {
+        Message_shift(msg, -Control_Header_SIZE, NULL);
         Assert_true((uint8_t*)&ctrl->content.error.errorType_be == msg->bytes);
-        if (msg->length < (Control_Error_HEADER_SIZE + SwitchHeader_SIZE + Control_HEADER_SIZE)) {
+        if (msg->length < (Control_Error_HEADER_SIZE + SwitchHeader_SIZE + Control_Header_SIZE)) {
             Log_debug(ctx->logger, "runt error packet");
             return Error_NONE;
         }
@@ -148,13 +148,13 @@ static int messageFromControlHandler(struct Interface_Two* iface, struct Message
 
         Log_debug(ctx->logger, "error [%s] was caused by our [%s]",
                   Error_strerror(ctx->error),
-                  Control_typeString(origCtrl->type_be));
+                  Control_typeString(origCtrl->header.type_be));
 
         int shift;
-        if (origCtrl->type_be == Control_PING_be) {
-            shift = -(Control_HEADER_SIZE + Control_Ping_HEADER_SIZE);
-        } else if (origCtrl->type_be == Control_KEYPING_be) {
-            shift = -(Control_HEADER_SIZE + Control_KeyPing_HEADER_SIZE);
+        if (origCtrl->header.type_be == Control_PING_be) {
+            shift = -(Control_Header_SIZE + Control_Ping_HEADER_SIZE);
+        } else if (origCtrl->header.type_be == Control_KEYPING_be) {
+            shift = -(Control_Header_SIZE + Control_KeyPing_HEADER_SIZE);
         } else {
             Assert_failure("problem in Ducttape.c");
         }
@@ -234,11 +234,11 @@ static void sendPing(String* data, void* sendPingContext)
         pingHeader->version_be = Endian_hostToBigEndian32(Version_CURRENT_PROTOCOL);
     }
 
-    Message_shift(msg, Control_HEADER_SIZE, NULL);
+    Message_shift(msg, Control_Header_SIZE, NULL);
     struct Control* ctrl = (struct Control*) msg->bytes;
-    ctrl->checksum_be = 0;
-    ctrl->type_be = (p->pub.keyPing) ? Control_KEYPING_be : Control_PING_be;
-    ctrl->checksum_be = Checksum_engine(msg->bytes, msg->length);
+    ctrl->header.checksum_be = 0;
+    ctrl->header.type_be = (p->pub.keyPing) ? Control_KEYPING_be : Control_PING_be;
+    ctrl->header.checksum_be = Checksum_engine(msg->bytes, msg->length);
 
     #ifdef Version_7_COMPAT
         if (0) {

+ 5 - 5
switch/SwitchCore.c

@@ -99,7 +99,7 @@ static inline void sendError7(struct SwitchInterface* iface,
 
     // Shift back so we can add another header.
     Message_shift(cause,
-                  SwitchHeader_SIZE + Control_HEADER_SIZE + Control_Error_HEADER_SIZE,
+                  SwitchHeader_SIZE + Control_Header_SIZE + Control_Error_HEADER_SIZE,
                   NULL);
     struct ErrorPacket7* err = (struct ErrorPacket7*) cause->bytes;
 
@@ -150,7 +150,7 @@ static inline void sendError8(struct SwitchInterface* iface,
 
     // Shift back so we can add another header.
     Message_shift(cause,
-                  SwitchHeader_SIZE + 4 + Control_HEADER_SIZE + Control_Error_HEADER_SIZE,
+                  SwitchHeader_SIZE + 4 + Control_Header_SIZE + Control_Error_HEADER_SIZE,
                   NULL);
     struct ErrorPacket8* err = (struct ErrorPacket8*) cause->bytes;
 
@@ -161,11 +161,11 @@ static inline void sendError8(struct SwitchInterface* iface,
     SwitchHeader_setCongestion(header, 0);
 
     err->handle = 0xffffffff;
-    err->ctrl.type_be = Control_ERROR_be;
+    err->ctrl.header.type_be = Control_ERROR_be;
     err->ctrl.content.error.errorType_be = Endian_hostToBigEndian32(code);
-    err->ctrl.checksum_be = 0;
+    err->ctrl.header.checksum_be = 0;
 
-    err->ctrl.checksum_be =
+    err->ctrl.header.checksum_be =
         Checksum_engine((uint8_t*) &err->ctrl, cause->length - SwitchHeader_SIZE - 4);
 
     sendMessage(iface, cause, logger);

+ 13 - 4
test/TestFramework.c

@@ -126,6 +126,10 @@ struct TestFramework* TestFramework_setUp(char* privateKey,
     struct SwitchCore* switchCore = SwitchCore_new(logger, allocator, base);
     struct CryptoAuth* ca = CryptoAuth_new(allocator, (uint8_t*)privateKey, base, logger, rand);
 
+
+// allocator, myAddress, logger, base, publicKey, rand
+    do {
+
     struct DHTModuleRegistry* registry = DHTModuleRegistry_new(allocator);
     ReplyModule_register(registry, allocator);
 
@@ -148,13 +152,18 @@ struct TestFramework* TestFramework_setUp(char* privateKey,
 
     SerializationModule_register(registry, logger, allocator);
 
+    struct Router* router = Router_new(routerModule, nodeStore, searchRunner, allocator);
+
+    } while (0);
+
     /*struct IpTunnel* ipTun = */IpTunnel_new(logger, base, allocator, rand, NULL);
 
-    struct Router* router = Router_new(routerModule, nodeStore, searchRunner, allocator);
 
-    struct EventEmitter* eventEmitter = EventEmitter_new(allocator);
 
-    struct SessionManager* sessionManager = SessionManager_new(allocator, base, ca, rand, logger, eventEmitter);
+    struct EventEmitter* eventEmitter = EventEmitter_new(allocator, logger);
+
+    struct SessionManager* sessionManager =
+        SessionManager_new(allocator, base, ca, rand, logger, eventEmitter);
     struct SwitchAdapter* switchAdapter = SwitchAdapter_new(allocator, logger);
     Interface_plumb(&switchAdapter->sessionManagerIf, &sessionManager->switchIf);
 
@@ -171,7 +180,7 @@ struct TestFramework* TestFramework_setUp(char* privateKey,
     Interface_plumb(&dhtCore->coreIf, &upper->dhtIf);
 
     struct ControlHandler* controlHandler =
-        ControlHandler_new(allocator, logger, router, myAddress);
+        ControlHandler_new(allocator, logger, eventEmitter, myAddress);
     Interface_plumb(&controlHandler->coreIf, &switchAdapter->controlIf);
 
     struct SwitchPinger* sp = SwitchPinger_new(base, rand, logger, myAddress, allocator);

+ 15 - 10
wire/Control.h

@@ -121,6 +121,19 @@ static inline char* Control_typeString(uint16_t type_be)
     }
 }
 
+struct Control_Header
+{
+    /**
+     * This should be the one's complement checksum
+     * of the control packet with 0'd checksum field.
+     */
+    uint16_t checksum_be;
+
+    /** The type of control message, eg: Control_ERROR. */
+    uint16_t type_be;
+};
+#define Control_Header_SIZE 4
+Assert_compileTime(sizeof(struct Control_Header) == Control_Header_SIZE);
 
 /**
  * A return message which is treated specially by switches.
@@ -135,17 +148,9 @@ static inline char* Control_typeString(uint16_t type_be)
  *  8 |                                                               |
  *
  */
-#define Control_HEADER_SIZE 4
 struct Control
 {
-    /**
-     * This should be the one's complement checksum
-     * of the control packet with 0'd checksum field.
-     */
-    uint16_t checksum_be;
-
-    /** The type of control message, eg: Control_ERROR. */
-    uint16_t type_be;
+    struct Control_Header header;
 
     union {
         struct Control_Error error;
@@ -159,6 +164,6 @@ struct Control
     } content;
 };
 // Control_KeyPing is the largest structure and thus defines the length of the "content" union.
-Assert_compileTime(sizeof(struct Control) == Control_HEADER_SIZE + Control_KeyPing_HEADER_SIZE);
+Assert_compileTime(sizeof(struct Control) == Control_Header_SIZE + Control_KeyPing_HEADER_SIZE);
 
 #endif