Преглед изворни кода

Merge branch 'update_snode_path' of git://github.com/madafoo/cjdns into crashey

Caleb James DeLisle пре 7 година
родитељ
комит
4f2a61427d

+ 3 - 0
subnode/ReachabilityAnnouncer.c

@@ -475,6 +475,9 @@ static void onReplyTimeout(struct ReachabilityAnnouncer_pvt* rap, struct Address
     Allocator_free(mow->alloc);
     if (!Bits_memcmp(snodeAddr, &rap->snode, Address_SIZE)) {
         rap->snh->snodeIsReachable = false;
+        if (rap->snh->onSnodeUnreachable) {
+            rap->snh->onSnodeUnreachable(rap->snh, 0, 0);
+        }
     }
 }
 

+ 3 - 0
subnode/SubnodePathfinder.c

@@ -133,6 +133,9 @@ static Iface_DEFUN switchErr(struct Message* msg, struct SubnodePathfinder_pvt*
         Log_debug(pf->log, "switch err from active snode [%s] type [%s][%d]",
             pathStr, Error_strerror(err), err);
         pf->pub.snh->snodeIsReachable = false;
+        if (pf->pub.snh->onSnodeUnreachable) {
+            pf->pub.snh->onSnodeUnreachable(pf->pub.snh, 0, 0);
+        }
     }
 
     return NULL;

+ 72 - 0
subnode/SupernodeHunter.c

@@ -41,6 +41,8 @@ struct SupernodeHunter_pvt
     // Will be set to the best known supernode possibility
     struct Address snodeCandidate;
 
+    bool snodePathUpdated;
+
     struct Allocator* alloc;
 
     struct Log* log;
@@ -170,6 +172,59 @@ static void adoptSupernode(struct SupernodeHunter_pvt* snp, struct Address* cand
     return;
 }
 
+static void updateSnodePath2(Dict* msg, struct Address* src, struct MsgCore_Promise* prom)
+{
+    struct Query* q = Identity_check((struct Query*) prom->userData);
+    struct SupernodeHunter_pvt* snp = Identity_check(q->snp);
+
+    if (!src) {
+        String* addrStr = Address_toString(prom->target, prom->alloc);
+        Log_debug(snp->log, "timeout sending to %s", addrStr->bytes);
+        return;
+    }
+    int64_t* snodeRecvTime = Dict_getIntC(msg, "recvTime");
+    if (!snodeRecvTime) {
+        Log_info(snp->log, "getRoute reply with no timeStamp, bad snode");
+        return;
+    }
+    struct Address_List* al = ReplySerializer_parse(src, msg, snp->log, false, prom->alloc);
+    if (!al || al->length == 0) { return; }
+    Log_debug(snp->log, "Supernode path updated with[%s]",
+                        Address_toString(&al->elems[0], prom->alloc)->bytes);
+
+    snp->snodePathUpdated = true;
+    if (!Bits_memcmp(&snp->pub.snodeAddr, &al->elems[0], Address_SIZE)) {
+        return;
+    }
+    Bits_memcpy(&snp->pub.snodeAddr, &al->elems[0], Address_SIZE);
+    Bits_memcpy(&snp->snodeCandidate, &al->elems[0], Address_SIZE);
+    if (snp->pub.onSnodeChange) {
+        snp->pub.snodeIsReachable = (AddrSet_indexOf(snp->authorizedSnodes, src) != -1) ? 2 : 1;
+        snp->pub.onSnodeChange(&snp->pub, q->sendTime, *snodeRecvTime);
+    }
+}
+
+static void updateSnodePath(struct SupernodeHunter_pvt* snp)
+{
+    struct MsgCore_Promise* qp = MsgCore_createQuery(snp->msgCore, 0, snp->alloc);
+    struct Query* q = Allocator_calloc(qp->alloc, sizeof(struct Query), 1);
+    Identity_set(q);
+    q->snp = snp;
+    q->sendTime = Time_currentTimeMilliseconds(snp->base);
+
+    Dict* msg = qp->msg = Dict_new(qp->alloc);
+    qp->cb = updateSnodePath2;
+    qp->userData = q;
+    qp->target = Address_clone(&snp->pub.snodeAddr, qp->alloc);;
+
+    Log_debug(snp->log, "Update snode [%s] path", Address_toString(qp->target, qp->alloc)->bytes);
+    Dict_putStringCC(msg, "sq", "gr", qp->alloc);
+    String* src = String_newBinary(snp->myAddress->ip6.bytes, 16, qp->alloc);
+    Dict_putStringC(msg, "src", src, qp->alloc);
+    String* target = String_newBinary(snp->pub.snodeAddr.ip6.bytes, 16, qp->alloc);
+    Dict_putStringC(msg, "tar", target, qp->alloc);
+}
+
 static void queryForAuthorized(struct SupernodeHunter_pvt* snp, struct Address* snode)
 {
     /*
@@ -264,6 +319,10 @@ static void probePeerCycle(void* vsn)
 {
     struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) vsn);
 
+    if (snp->pub.snodeIsReachable && !snp->snodePathUpdated) {
+        updateSnodePath(snp);
+    }
+
     if (snp->pub.snodeIsReachable > 1) { return; }
     if (snp->pub.snodeIsReachable && !snp->authorizedSnodes->length) { return; }
     if (!snp->peers->length) { return; }
@@ -295,6 +354,17 @@ static void probePeerCycle(void* vsn)
     p->onResponseContext = snp;
 }
 
+static void onSnodeUnreachable(struct SupernodeHunter* snh,
+                               int64_t sendTime,
+                               int64_t snodeRecvTime)
+{
+    struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) snh);
+    Log_debug(snp->log, "Supernode unreachable.");
+    snp->snodePathUpdated = false;
+    // Snode unreachable, we need also reset peer snode candidate
+    Bits_memset(&snp->snodeCandidate, 0, Address_SIZE);
+}
+
 struct SupernodeHunter* SupernodeHunter_new(struct Allocator* allocator,
                                             struct Log* log,
                                             struct EventBase* base,
@@ -320,6 +390,8 @@ struct SupernodeHunter* SupernodeHunter_new(struct Allocator* allocator,
     out->myAddress = myAddress;
     out->selfAddrStr = String_newBinary(myAddress->ip6.bytes, 16, alloc);
     out->sp = sp;
+    out->snodePathUpdated = false;
+    out->pub.onSnodeUnreachable = onSnodeUnreachable;
     Timeout_setInterval(probePeerCycle, out, CYCLE_MS, base, alloc);
     return &out->pub;
 }

+ 1 - 0
subnode/SupernodeHunter.h

@@ -45,6 +45,7 @@ struct SupernodeHunter
     struct Address snodeAddr;
 
     SupernodeHunter_Callback onSnodeChange;
+    SupernodeHunter_Callback onSnodeUnreachable;
     void* userData;
 };