123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468 |
- /* vim: set expandtab ts=4 sw=4: */
- /*
- * You may redistribute this program and/or modify it under the terms of
- * the GNU General Public License as published by the Free Software Foundation,
- * either version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <https://www.gnu.org/licenses/>.
- */
- #include "crypto/AddressCalc.h"
- #include "crypto/Key.h"
- #include "dht/dhtcore/ReplySerializer.h"
- #include "subnode/SupernodeHunter.h"
- #include "subnode/AddrSet.h"
- #include "util/Identity.h"
- #include "util/platform/Sockaddr.h"
- #include "util/events/Timeout.h"
- #include "util/AddrTools.h"
- #include "util/events/Time.h"
- #include "switch/LabelSplicer.h"
- #define CYCLE_MS 3000
- struct SupernodeHunter_pvt
- {
- struct SupernodeHunter pub;
- /** Nodes which are authorized to be our supernode. */
- struct AddrSet* authorizedSnodes;
- /** Our peers, DO NOT TOUCH, changed from in SubnodePathfinder. */
- struct AddrSet* myPeerAddrs;
- struct AddrSet* blacklist;
- // Number of the next peer to ping in the peers AddrSet
- int nextPeer;
- // Will be set to the best known supernode possibility
- struct Address snodeCandidate;
- bool snodePathUpdated;
- struct Allocator* alloc;
- struct Log* log;
- struct MsgCore* msgCore;
- struct EventBase* base;
- struct SwitchPinger* sp;
- struct Address* myAddress;
- String* selfAddrStr;
- struct ReachabilityCollector* rc;
- Identity
- };
- struct Query
- {
- struct SupernodeHunter_pvt* snp;
- // If this is a findNode request, this is the search target, if it's a getPeers it's null.
- struct Address* searchTar;
- int64_t sendTime;
- bool isGetRoute;
- Identity
- };
- int SupernodeHunter_addSnode(struct SupernodeHunter* snh, struct Address* snodeAddr)
- {
- struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) snh);
- int length0 = snp->authorizedSnodes->length;
- AddrSet_add(snp->authorizedSnodes, snodeAddr, AddrSet_Match_ADDRESS_ONLY);
- if (snp->authorizedSnodes->length == length0) {
- return SupernodeHunter_addSnode_EXISTS;
- }
- return 0;
- }
- int SupernodeHunter_listSnodes(struct SupernodeHunter* snh,
- struct Address*** outP,
- struct Allocator* alloc)
- {
- struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) snh);
- struct Address** out = Allocator_calloc(alloc, sizeof(char*), snp->authorizedSnodes->length);
- for (int i = 0; i < snp->authorizedSnodes->length; i++) {
- out[i] = AddrSet_get(snp->authorizedSnodes, i);
- }
- *outP = out;
- return snp->authorizedSnodes->length;
- }
- int SupernodeHunter_removeSnode(struct SupernodeHunter* snh, struct Address* toRemove)
- {
- struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) snh);
- int length0 = snp->authorizedSnodes->length;
- AddrSet_remove(snp->authorizedSnodes, toRemove, AddrSet_Match_ADDRESS_ONLY);
- if (snp->authorizedSnodes->length == length0) {
- return SupernodeHunter_removeSnode_NONEXISTANT;
- }
- return 0;
- }
- static struct Address* getPeerByNpn(struct SupernodeHunter_pvt* snp, int npn)
- {
- npn = npn % snp->myPeerAddrs->length;
- int i = npn;
- do {
- struct Address* peer = AddrSet_get(snp->myPeerAddrs, i);
- if (peer && peer->protocolVersion > 19) { return peer; }
- i = (i + 1) % snp->myPeerAddrs->length;
- } while (i != npn);
- return NULL;
- }
- static void adoptSupernode2(Dict* msg, struct Address* src, struct MsgCore_Promise* prom)
- {
- struct Query* q = Identity_check((struct Query*) prom->userData);
- struct SupernodeHunter_pvt* snp = Identity_check(q->snp);
- if (!src) {
- Log_debug(snp->log, "timeout sending to %s",
- Address_toString(prom->target, prom->alloc)->bytes);
- // If we're in this state and it doesn't work, we're going to drop the snode and
- // go back to the beginning because while there's a possibility of a lost packet,
- // it's a bigger possibility that we don't have a working path and we'd better
- // try another one.
- AddrSet_add(snp->blacklist, prom->target, AddrSet_Match_BOTH);
- Bits_memset(&snp->snodeCandidate, 0, Address_SIZE);
- snp->snodePathUpdated = false;
- return;
- }
- Log_debug(snp->log, "Reply from %s", Address_toString(src, prom->alloc)->bytes);
- int64_t* snodeRecvTime = Dict_getIntC(msg, "recvTime");
- if (!snodeRecvTime) {
- Log_info(snp->log, "getRoute reply with no timeStamp, bad snode");
- return;
- }
- Log_debug(snp->log, "\n\nSupernode location confirmed [%s]\n\n",
- Address_toString(src, prom->alloc)->bytes);
- if (snp->pub.snodeIsReachable) {
- // If while we were searching, the outside code declared that indeed the snode
- // is reachable, we will not try to change their snode.
- } else if (snp->pub.onSnodeChange) {
- Bits_memcpy(&snp->pub.snodeAddr, src, Address_SIZE);
- snp->pub.snodeIsReachable = (
- AddrSet_indexOf(snp->authorizedSnodes, src, AddrSet_Match_ADDRESS_ONLY) != -1
- ) ? 2 : 1;
- snp->pub.onSnodeChange(&snp->pub, q->sendTime, *snodeRecvTime);
- } else {
- Log_warn(snp->log, "onSnodeChange is not set");
- }
- }
- static void adoptSupernode(struct SupernodeHunter_pvt* snp, struct Address* candidate)
- {
- struct MsgCore_Promise* qp = MsgCore_createQuery(snp->msgCore, 0, snp->alloc);
- struct Query* q = Allocator_calloc(qp->alloc, sizeof(struct Query), 1);
- Identity_set(q);
- q->snp = snp;
- q->sendTime = Time_currentTimeMilliseconds(snp->base);
- Dict* msg = qp->msg = Dict_new(qp->alloc);
- qp->cb = adoptSupernode2;
- qp->userData = q;
- qp->target = Address_clone(candidate, qp->alloc);
- // NOTE: we don't immediately request a path because the RS doesn't know about us
- // quite yet, so it will tell us it doesn't know a path, so we need to ping it
- // and take it on faith until we get some announcements announced.
- Log_debug(snp->log, "Pinging snode [%s]", Address_toString(qp->target, qp->alloc)->bytes);
- Dict_putStringCC(msg, "sq", "pn", qp->alloc);
- Assert_true(AddressCalc_validAddress(candidate->ip6.bytes));
- return;
- }
- static void updateSnodePath2(Dict* msg, struct Address* src, struct MsgCore_Promise* prom)
- {
- struct Query* q = Identity_check((struct Query*) prom->userData);
- struct SupernodeHunter_pvt* snp = Identity_check(q->snp);
- if (!src) {
- String* addrStr = Address_toString(prom->target, prom->alloc);
- Log_debug(snp->log, "timeout sending to %s", addrStr->bytes);
- return;
- }
- int64_t* snodeRecvTime = Dict_getIntC(msg, "recvTime");
- if (!snodeRecvTime) {
- Log_info(snp->log, "getRoute reply with no timeStamp, bad snode");
- return;
- }
- struct Address_List* al = ReplySerializer_parse(src, msg, snp->log, false, prom->alloc);
- if (!al || al->length == 0) {
- Log_debug(snp->log, "Requesting route to snode [%s], it doesn't know one",
- Address_toString(prom->target, prom->alloc)->bytes);
- return;
- }
- Log_debug(snp->log, "Supernode path updated with [%s]",
- Address_toString(&al->elems[0], prom->alloc)->bytes);
- snp->snodePathUpdated = true;
- if (!Bits_memcmp(&snp->pub.snodeAddr, &al->elems[0], Address_SIZE)) {
- Log_debug(snp->log, "Requestes route to snode [%s], the one we have is fine",
- Address_toString(prom->target, prom->alloc)->bytes);
- return;
- }
- Bits_memcpy(&snp->pub.snodeAddr, &al->elems[0], Address_SIZE);
- Bits_memcpy(&snp->snodeCandidate, &al->elems[0], Address_SIZE);
- AddrSet_flush(snp->blacklist);
- if (snp->pub.onSnodeChange) {
- snp->pub.snodeIsReachable = (
- AddrSet_indexOf(snp->authorizedSnodes, src, AddrSet_Match_ADDRESS_ONLY) != -1
- ) ? 2 : 1;
- snp->pub.onSnodeChange(&snp->pub, q->sendTime, *snodeRecvTime);
- }
- }
- static void updateSnodePath(struct SupernodeHunter_pvt* snp)
- {
- struct MsgCore_Promise* qp = MsgCore_createQuery(snp->msgCore, 0, snp->alloc);
- struct Query* q = Allocator_calloc(qp->alloc, sizeof(struct Query), 1);
- Identity_set(q);
- q->snp = snp;
- q->sendTime = Time_currentTimeMilliseconds(snp->base);
- Dict* msg = qp->msg = Dict_new(qp->alloc);
- qp->cb = updateSnodePath2;
- qp->userData = q;
- qp->target = Address_clone(&snp->pub.snodeAddr, qp->alloc);;
- Log_debug(snp->log, "Update snode [%s] path", Address_toString(qp->target, qp->alloc)->bytes);
- Dict_putStringCC(msg, "sq", "gr", qp->alloc);
- String* src = String_newBinary(snp->myAddress->ip6.bytes, 16, qp->alloc);
- Dict_putStringC(msg, "src", src, qp->alloc);
- String* target = String_newBinary(snp->pub.snodeAddr.ip6.bytes, 16, qp->alloc);
- Dict_putStringC(msg, "tar", target, qp->alloc);
- }
- static void queryForAuthorized(struct SupernodeHunter_pvt* snp, struct Address* snode)
- {
- /*
- struct MsgCore_Promise* qp = MsgCore_createQuery(snp->msgCore, 0, snp->alloc);
- struct Query* q = Allocator_calloc(qp->alloc, sizeof(struct Query), 1);
- Identity_set(q);
- q->snp = snp;
- q->sendTime = Time_currentTimeMilliseconds(snp->base);
- Dict* msg = qp->msg = Dict_new(qp->alloc);
- qp->cb = onReply;
- qp->userData = q;
- qp->target = candidate;
- Log_debug(snp->log, "Pinging snode [%s]", Address_toString(qp->target, qp->alloc)->bytes);
- Dict_putStringCC(msg, "sq", "gr", qp->alloc);
- */
- }
- static void peerResponseOK(struct SwitchPinger_Response* resp, struct SupernodeHunter_pvt* snp)
- {
- ReachabilityCollector_lagSample(snp->rc, resp->label, resp->milliseconds);
- struct Address snode;
- Bits_memcpy(&snode, &resp->snode, sizeof(struct Address));
- if (!snode.path) {
- uint8_t label[20];
- AddrTools_printPath(label, resp->label);
- Log_debug(snp->log, "Peer [%s] reports no supernode", label);
- return;
- }
- uint64_t path = LabelSplicer_splice(snode.path, resp->label);
- if (path == UINT64_MAX) {
- Log_debug(snp->log, "Supernode path could not be spliced");
- return;
- }
- snode.path = path;
- struct Address peerAddr = { .path = resp->label };
- int i = AddrSet_indexOf(snp->myPeerAddrs, &peerAddr, AddrSet_Match_LABEL_ONLY);
- if (i == -1) {
- Log_info(snp->log, "We got a snode reply from a node which is not in peer list");
- return;
- }
- struct Address* peer = AddrSet_get(snp->myPeerAddrs, i);
- struct Address* firstPeer = getPeerByNpn(snp, 0);
- if (!firstPeer) {
- Log_info(snp->log, "All peers have gone away while packet was outstanding");
- return;
- }
- // 1.
- // If we have looped around and queried all of our peers returning to the first and we have
- // still not found an snode in our authorized snodes list, we should simply accept this one.
- if (!snp->pub.snodeIsReachable &&
- snp->myPeerAddrs->length > 1 &&
- snp->nextPeer >= snp->myPeerAddrs->length &&
- Address_isSameIp(firstPeer, peer))
- {
- if (!snp->snodeCandidate.path) {
- Log_info(snp->log, "No snode candidate found [%s]",
- Address_toStringKey(&snp->snodeCandidate, resp->ping->pingAlloc)->bytes);
- snp->nextPeer = 0;
- AddrSet_flush(snp->blacklist);
- return;
- }
- Log_debug(snp->log, "Peer [%s] has proposed we use supernode [%s] we will accept it",
- Address_toString(peer, resp->ping->pingAlloc)->bytes,
- Address_toString(&snp->snodeCandidate, resp->ping->pingAlloc)->bytes);
- adoptSupernode(snp, &snp->snodeCandidate);
- return;
- }
- // 2.
- // If this snode is one of our authorized snodes OR if we have none defined, accept this one.
- if (AddrSet_indexOf(snp->blacklist, &snode, AddrSet_Match_BOTH) > -1) {
- Log_debug(snp->log, "Peer [%s] [%" PRIx64 "] has proposed supernode [%s] "
- "but it is blacklisted, continue",
- Address_toString(peer, resp->ping->pingAlloc)->bytes,
- resp->label,
- Address_toString(&snode, resp->ping->pingAlloc)->bytes);
- } else if (!snp->authorizedSnodes->length ||
- AddrSet_indexOf(snp->authorizedSnodes, &snode, AddrSet_Match_ADDRESS_ONLY) > -1)
- {
- Address_getPrefix(&snode);
- Log_debug(snp->log, "Peer [%s] has proposed supernode [%s] and %s so we will use it",
- Address_toString(peer, resp->ping->pingAlloc)->bytes,
- Address_toString(&snode, resp->ping->pingAlloc)->bytes,
- (snp->authorizedSnodes->length) ? "it is authorized" : "we have none authorized");
- adoptSupernode(snp, &snode);
- return;
- } else if (!snp->snodeCandidate.path) {
- Log_debug(snp->log, "Peer [%s] has proposed supernode [%s], we're not using it yet "
- "but we will store it as a candidate.",
- Address_toString(peer, resp->ping->pingAlloc)->bytes,
- Address_toString(&snp->snodeCandidate, resp->ping->pingAlloc)->bytes);
- Bits_memcpy(&snp->snodeCandidate, &snode, sizeof(struct Address));
- Address_getPrefix(&snp->snodeCandidate);
- }
- // 3.
- // If this snode is not one of our authorized snodes, query it for all of our authorized snodes.
- queryForAuthorized(snp, &snode);
- }
- static void peerResponse(struct SwitchPinger_Response* resp, void* userData)
- {
- struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) userData);
- char* err = "";
- switch (resp->res) {
- case SwitchPinger_Result_OK: peerResponseOK(resp, snp); return;
- case SwitchPinger_Result_LABEL_MISMATCH: err = "LABEL_MISMATCH"; break;
- case SwitchPinger_Result_WRONG_DATA: err = "WRONG_DATA"; break;
- case SwitchPinger_Result_ERROR_RESPONSE: err = "ERROR_RESPONSE"; break;
- case SwitchPinger_Result_LOOP_ROUTE: err = "LOOP_ROUTE"; break;
- case SwitchPinger_Result_TIMEOUT: err = "TIMEOUT"; break;
- default: err = "unknown error"; break;
- }
- Log_debug(snp->log, "Error sending snp query to peer [%" PRIx64 "] [%s]",
- resp->label, err);
- }
- static void probePeerCycle(void* vsn)
- {
- struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) vsn);
- if (snp->pub.snodeIsReachable && !snp->snodePathUpdated) {
- updateSnodePath(snp);
- }
- if (snp->pub.snodeIsReachable > 1) { return; }
- if (snp->pub.snodeIsReachable && !snp->authorizedSnodes->length) { return; }
- if (!snp->myPeerAddrs->length) { return; }
- //Log_debug(snp->log, "probePeerCycle()");
- if (AddrSet_indexOf(snp->authorizedSnodes, snp->myAddress, AddrSet_Match_ADDRESS_ONLY) != -1) {
- Log_info(snp->log, "Self is specified as supernode, pinging...");
- adoptSupernode(snp, snp->myAddress);
- return;
- }
- struct Address* peer = getPeerByNpn(snp, snp->nextPeer);
- if (!peer) {
- Log_info(snp->log, "No peer found who is version >= 20");
- return;
- }
- struct SwitchPinger_Ping* p =
- SwitchPinger_newPing(peer->path, String_CONST(""), 3000, peerResponse, snp->alloc, snp->sp);
- Assert_true(p);
- Log_debug(snp->log, "Querying peer [%s] [%d] total [%d], blacklist size [%d]",
- Address_toString(peer, p->pingAlloc)->bytes,
- snp->nextPeer,
- snp->myPeerAddrs->length,
- snp->blacklist->length);
- snp->nextPeer++;
- p->type = SwitchPinger_Type_GETSNODE;
- if (snp->pub.snodeIsReachable) {
- Bits_memcpy(&p->snode, &snp->pub.snodeAddr, sizeof(struct Address));
- }
- p->onResponseContext = snp;
- }
- static void onSnodeUnreachable(struct SupernodeHunter* snh,
- int64_t sendTime,
- int64_t snodeRecvTime)
- {
- struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) snh);
- Log_debug(snp->log, "Supernode unreachable.");
- snp->snodePathUpdated = false;
- // Snode unreachable, we need also reset peer snode candidate
- Bits_memset(&snp->snodeCandidate, 0, Address_SIZE);
- }
- struct SupernodeHunter* SupernodeHunter_new(struct Allocator* allocator,
- struct Log* log,
- struct EventBase* base,
- struct SwitchPinger* sp,
- struct AddrSet* peers,
- struct MsgCore* msgCore,
- struct Address* myAddress,
- struct ReachabilityCollector* rc)
- {
- struct Allocator* alloc = Allocator_child(allocator);
- struct SupernodeHunter_pvt* out =
- Allocator_calloc(alloc, sizeof(struct SupernodeHunter_pvt), 1);
- Identity_set(out);
- out->authorizedSnodes = AddrSet_new(alloc);
- out->blacklist = AddrSet_new(alloc);
- out->myPeerAddrs = peers;
- out->base = base;
- //out->rand = rand;
- //out->nodes = AddrSet_new(alloc);
- //out->timeSnodeCalled = Time_currentTimeMilliseconds(base);
- //out->snodeCandidates = AddrSet_new(alloc);
- out->log = log;
- out->alloc = alloc;
- out->msgCore = msgCore;
- out->myAddress = myAddress;
- out->rc = rc;
- out->selfAddrStr = String_newBinary(myAddress->ip6.bytes, 16, alloc);
- out->sp = sp;
- out->snodePathUpdated = false;
- out->pub.onSnodeUnreachable = onSnodeUnreachable;
- Timeout_setInterval(probePeerCycle, out, CYCLE_MS, base, alloc);
- return &out->pub;
- }
|