SupernodeHunter.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. */
  15. #include "crypto/AddressCalc.h"
  16. #include "dht/dhtcore/ReplySerializer.h"
  17. #include "subnode/SupernodeHunter.h"
  18. #include "subnode/AddrSet.h"
  19. #include "util/Identity.h"
  20. #include "util/events/Timeout.h"
  21. #include "util/AddrTools.h"
  22. #include "util/events/Time.h"
  23. #include "switch/LabelSplicer.h"
  24. #include <inttypes.h>
  25. #define CYCLE_MS 3000
  26. struct SupernodeHunter_pvt
  27. {
  28. struct SupernodeHunter pub;
  29. /** Nodes which are authorized to be our supernode. */
  30. struct AddrSet* authorizedSnodes;
  31. /** Our peers, DO NOT TOUCH, changed from in SubnodePathfinder. */
  32. struct AddrSet* myPeerAddrs;
  33. struct AddrSet* blacklist;
  34. // Number of the next peer to ping in the peers AddrSet
  35. int nextPeer;
  36. // Will be set to the best known supernode possibility
  37. struct Address snodeCandidate;
  38. bool snodePathUpdated;
  39. struct Allocator* alloc;
  40. struct Log* log;
  41. struct MsgCore* msgCore;
  42. EventBase_t* base;
  43. struct SwitchPinger* sp;
  44. struct Address* myAddress;
  45. String* selfAddrStr;
  46. struct ReachabilityCollector* rc;
  47. Identity
  48. };
  49. struct Query
  50. {
  51. struct SupernodeHunter_pvt* snp;
  52. // If this is a findNode request, this is the search target, if it's a getPeers it's null.
  53. struct Address* searchTar;
  54. int64_t sendTime;
  55. bool isGetRoute;
  56. Identity
  57. };
  58. int SupernodeHunter_addSnode(struct SupernodeHunter* snh, struct Address* snodeAddr)
  59. {
  60. struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) snh);
  61. int length0 = snp->authorizedSnodes->length;
  62. AddrSet_add(snp->authorizedSnodes, snodeAddr, AddrSet_Match_ADDRESS_ONLY);
  63. if (snp->authorizedSnodes->length == length0) {
  64. return SupernodeHunter_addSnode_EXISTS;
  65. }
  66. return 0;
  67. }
  68. int SupernodeHunter_listSnodes(struct SupernodeHunter* snh,
  69. struct Address*** outP,
  70. struct Allocator* alloc)
  71. {
  72. struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) snh);
  73. struct Address** out = Allocator_calloc(alloc, sizeof(char*), snp->authorizedSnodes->length);
  74. for (int i = 0; i < snp->authorizedSnodes->length; i++) {
  75. out[i] = AddrSet_get(snp->authorizedSnodes, i);
  76. }
  77. *outP = out;
  78. return snp->authorizedSnodes->length;
  79. }
  80. int SupernodeHunter_removeSnode(struct SupernodeHunter* snh, struct Address* toRemove)
  81. {
  82. struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) snh);
  83. int length0 = snp->authorizedSnodes->length;
  84. AddrSet_remove(snp->authorizedSnodes, toRemove, AddrSet_Match_ADDRESS_ONLY);
  85. if (snp->authorizedSnodes->length == length0) {
  86. return SupernodeHunter_removeSnode_NONEXISTANT;
  87. }
  88. return 0;
  89. }
  90. static struct Address* getPeerByNpn(struct SupernodeHunter_pvt* snp, int npn)
  91. {
  92. npn = npn % snp->myPeerAddrs->length;
  93. int i = npn;
  94. do {
  95. struct Address* peer = AddrSet_get(snp->myPeerAddrs, i);
  96. if (peer && peer->protocolVersion > 19) { return peer; }
  97. i = (i + 1) % snp->myPeerAddrs->length;
  98. } while (i != npn);
  99. return NULL;
  100. }
  101. static void adoptSupernode2(Dict* msg, struct Address* src, struct MsgCore_Promise* prom)
  102. {
  103. struct Query* q = Identity_check((struct Query*) prom->userData);
  104. struct SupernodeHunter_pvt* snp = Identity_check(q->snp);
  105. if (!src) {
  106. Log_debug(snp->log, "timeout sending to %s",
  107. Address_toString(prom->target, prom->alloc)->bytes);
  108. // If we're in this state and it doesn't work, we're going to drop the snode and
  109. // go back to the beginning because while there's a possibility of a lost packet,
  110. // it's a bigger possibility that we don't have a working path and we'd better
  111. // try another one.
  112. AddrSet_add(snp->blacklist, prom->target, AddrSet_Match_BOTH);
  113. Bits_memset(&snp->snodeCandidate, 0, Address_SIZE);
  114. snp->snodePathUpdated = false;
  115. return;
  116. }
  117. Log_debug(snp->log, "Reply from %s", Address_toString(src, prom->alloc)->bytes);
  118. int64_t* snodeRecvTime = Dict_getIntC(msg, "recvTime");
  119. if (!snodeRecvTime) {
  120. Log_info(snp->log, "getRoute reply with no timeStamp, bad snode");
  121. return;
  122. }
  123. Log_debug(snp->log, "\n\nSupernode location confirmed [%s]\n\n",
  124. Address_toString(src, prom->alloc)->bytes);
  125. if (snp->pub.snodeIsReachable) {
  126. // If while we were searching, the outside code declared that indeed the snode
  127. // is reachable, we will not try to change their snode.
  128. } else if (snp->pub.onSnodeChange) {
  129. Bits_memcpy(&snp->pub.snodeAddr, src, Address_SIZE);
  130. snp->pub.snodeIsReachable = (
  131. AddrSet_indexOf(snp->authorizedSnodes, src, AddrSet_Match_ADDRESS_ONLY) != -1
  132. ) ? 2 : 1;
  133. snp->pub.onSnodeChange(&snp->pub, q->sendTime, *snodeRecvTime);
  134. } else {
  135. Log_warn(snp->log, "onSnodeChange is not set");
  136. }
  137. }
  138. static void adoptSupernode(struct SupernodeHunter_pvt* snp, struct Address* candidate)
  139. {
  140. struct MsgCore_Promise* qp = MsgCore_createQuery(snp->msgCore, 0, snp->alloc);
  141. struct Query* q = Allocator_calloc(qp->alloc, sizeof(struct Query), 1);
  142. Identity_set(q);
  143. q->snp = snp;
  144. q->sendTime = Time_currentTimeMilliseconds();
  145. Dict* msg = qp->msg = Dict_new(qp->alloc);
  146. qp->cb = adoptSupernode2;
  147. qp->userData = q;
  148. qp->target = Address_clone(candidate, qp->alloc);
  149. // NOTE: we don't immediately request a path because the RS doesn't know about us
  150. // quite yet, so it will tell us it doesn't know a path, so we need to ping it
  151. // and take it on faith until we get some announcements announced.
  152. Log_debug(snp->log, "Pinging snode [%s]", Address_toString(qp->target, qp->alloc)->bytes);
  153. Dict_putStringCC(msg, "sq", "pn", qp->alloc);
  154. Assert_true(AddressCalc_validAddress(candidate->ip6.bytes));
  155. return;
  156. }
  157. static void updateSnodePath2(Dict* msg, struct Address* src, struct MsgCore_Promise* prom)
  158. {
  159. struct Query* q = Identity_check((struct Query*) prom->userData);
  160. struct SupernodeHunter_pvt* snp = Identity_check(q->snp);
  161. if (!src) {
  162. String* addrStr = Address_toString(prom->target, prom->alloc);
  163. Log_debug(snp->log, "timeout sending to %s", addrStr->bytes);
  164. return;
  165. }
  166. int64_t* snodeRecvTime = Dict_getIntC(msg, "recvTime");
  167. if (!snodeRecvTime) {
  168. Log_info(snp->log, "getRoute reply with no timeStamp, bad snode");
  169. return;
  170. }
  171. struct Address_List* al = ReplySerializer_parse(src, msg, snp->log, false, prom->alloc);
  172. if (!al || al->length == 0) {
  173. Log_debug(snp->log, "Requesting route to snode [%s], it doesn't know one",
  174. Address_toString(prom->target, prom->alloc)->bytes);
  175. return;
  176. }
  177. Log_debug(snp->log, "Supernode path updated with [%s]",
  178. Address_toString(&al->elems[0], prom->alloc)->bytes);
  179. snp->snodePathUpdated = true;
  180. if (!Bits_memcmp(&snp->pub.snodeAddr, &al->elems[0], Address_SIZE)) {
  181. Log_debug(snp->log, "Requestes route to snode [%s], the one we have is fine",
  182. Address_toString(prom->target, prom->alloc)->bytes);
  183. return;
  184. }
  185. Bits_memcpy(&snp->pub.snodeAddr, &al->elems[0], Address_SIZE);
  186. Bits_memcpy(&snp->snodeCandidate, &al->elems[0], Address_SIZE);
  187. AddrSet_flush(snp->blacklist);
  188. if (snp->pub.onSnodeChange) {
  189. snp->pub.snodeIsReachable = (
  190. AddrSet_indexOf(snp->authorizedSnodes, src, AddrSet_Match_ADDRESS_ONLY) != -1
  191. ) ? 2 : 1;
  192. snp->pub.onSnodeChange(&snp->pub, q->sendTime, *snodeRecvTime);
  193. }
  194. }
  195. static void updateSnodePath(struct SupernodeHunter_pvt* snp)
  196. {
  197. struct MsgCore_Promise* qp = MsgCore_createQuery(snp->msgCore, 0, snp->alloc);
  198. struct Query* q = Allocator_calloc(qp->alloc, sizeof(struct Query), 1);
  199. Identity_set(q);
  200. q->snp = snp;
  201. q->sendTime = Time_currentTimeMilliseconds();
  202. Dict* msg = qp->msg = Dict_new(qp->alloc);
  203. qp->cb = updateSnodePath2;
  204. qp->userData = q;
  205. qp->target = Address_clone(&snp->pub.snodeAddr, qp->alloc);
  206. Log_debug(snp->log, "Update snode [%s] path", Address_toString(qp->target, qp->alloc)->bytes);
  207. Dict_putStringCC(msg, "sq", "gr", qp->alloc);
  208. String* src = String_newBinary(snp->myAddress->ip6.bytes, 16, qp->alloc);
  209. Dict_putStringC(msg, "src", src, qp->alloc);
  210. String* target = String_newBinary(snp->pub.snodeAddr.ip6.bytes, 16, qp->alloc);
  211. Dict_putStringC(msg, "tar", target, qp->alloc);
  212. }
  213. static void queryForAuthorized(struct SupernodeHunter_pvt* snp, struct Address* snode)
  214. {
  215. /*
  216. struct MsgCore_Promise* qp = MsgCore_createQuery(snp->msgCore, 0, snp->alloc);
  217. struct Query* q = Allocator_calloc(qp->alloc, sizeof(struct Query), 1);
  218. Identity_set(q);
  219. q->snp = snp;
  220. q->sendTime = Time_currentTimeMilliseconds();
  221. Dict* msg = qp->msg = Dict_new(qp->alloc);
  222. qp->cb = onReply;
  223. qp->userData = q;
  224. qp->target = candidate;
  225. Log_debug(snp->log, "Pinging snode [%s]", Address_toString(qp->target, qp->alloc)->bytes);
  226. Dict_putStringCC(msg, "sq", "gr", qp->alloc);
  227. */
  228. }
  229. static void peerResponseOK(struct SwitchPinger_Response* resp, struct SupernodeHunter_pvt* snp)
  230. {
  231. ReachabilityCollector_lagSample(snp->rc, resp->label, resp->milliseconds);
  232. struct Address snode = {0};
  233. Bits_memcpy(&snode, &resp->snode, sizeof(struct Address));
  234. if (!snode.path) {
  235. uint8_t label[20];
  236. AddrTools_printPath(label, resp->label);
  237. Log_debug(snp->log, "Peer [%s] reports no supernode", label);
  238. return;
  239. }
  240. uint64_t path = LabelSplicer_splice(snode.path, resp->label);
  241. if (path == UINT64_MAX) {
  242. Log_debug(snp->log, "Supernode path could not be spliced");
  243. return;
  244. }
  245. snode.path = path;
  246. struct Address peerAddr = { .path = resp->label };
  247. int i = AddrSet_indexOf(snp->myPeerAddrs, &peerAddr, AddrSet_Match_LABEL_ONLY);
  248. if (i == -1) {
  249. Log_info(snp->log, "We got a snode reply from a node which is not in peer list");
  250. return;
  251. }
  252. struct Address* peer = AddrSet_get(snp->myPeerAddrs, i);
  253. struct Address* firstPeer = getPeerByNpn(snp, 0);
  254. if (!firstPeer) {
  255. Log_info(snp->log, "All peers have gone away while packet was outstanding");
  256. return;
  257. }
  258. // 1.
  259. // If we have looped around and queried all of our peers returning to the first and we have
  260. // still not found an snode in our authorized snodes list, we should simply accept this one.
  261. if (!snp->pub.snodeIsReachable &&
  262. snp->myPeerAddrs->length > 1 &&
  263. snp->nextPeer >= snp->myPeerAddrs->length &&
  264. Address_isSameIp(firstPeer, peer))
  265. {
  266. if (!snp->snodeCandidate.path) {
  267. Log_info(snp->log, "No snode candidate found [%s]",
  268. Address_toStringKey(&snp->snodeCandidate, resp->ping->pingAlloc)->bytes);
  269. snp->nextPeer = 0;
  270. AddrSet_flush(snp->blacklist);
  271. return;
  272. }
  273. Log_debug(snp->log, "Peer [%s] has proposed we use supernode [%s] we will accept it",
  274. Address_toString(peer, resp->ping->pingAlloc)->bytes,
  275. Address_toString(&snp->snodeCandidate, resp->ping->pingAlloc)->bytes);
  276. adoptSupernode(snp, &snp->snodeCandidate);
  277. return;
  278. }
  279. // 2.
  280. // If this snode is one of our authorized snodes OR if we have none defined, accept this one.
  281. if (AddrSet_indexOf(snp->blacklist, &snode, AddrSet_Match_BOTH) > -1) {
  282. Log_debug(snp->log, "Peer [%s] [%" PRIx64 "] has proposed supernode [%s] "
  283. "but it is blacklisted, continue",
  284. Address_toString(peer, resp->ping->pingAlloc)->bytes,
  285. resp->label,
  286. Address_toString(&snode, resp->ping->pingAlloc)->bytes);
  287. } else if (!snp->authorizedSnodes->length ||
  288. AddrSet_indexOf(snp->authorizedSnodes, &snode, AddrSet_Match_ADDRESS_ONLY) > -1)
  289. {
  290. Address_getPrefix(&snode);
  291. Log_debug(snp->log, "Peer [%s] has proposed supernode [%s] and %s so we will use it",
  292. Address_toString(peer, resp->ping->pingAlloc)->bytes,
  293. Address_toString(&snode, resp->ping->pingAlloc)->bytes,
  294. (snp->authorizedSnodes->length) ? "it is authorized" : "we have none authorized");
  295. adoptSupernode(snp, &snode);
  296. return;
  297. } else if (!snp->snodeCandidate.path) {
  298. Log_debug(snp->log, "Peer [%s] has proposed supernode [%s], we're not using it yet "
  299. "but we will store it as a candidate.",
  300. Address_toString(peer, resp->ping->pingAlloc)->bytes,
  301. Address_toString(&snp->snodeCandidate, resp->ping->pingAlloc)->bytes);
  302. Bits_memcpy(&snp->snodeCandidate, &snode, sizeof(struct Address));
  303. Address_getPrefix(&snp->snodeCandidate);
  304. }
  305. // 3.
  306. // If this snode is not one of our authorized snodes, query it for all of our authorized snodes.
  307. queryForAuthorized(snp, &snode);
  308. }
  309. static void peerResponse(struct SwitchPinger_Response* resp, void* userData)
  310. {
  311. struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) userData);
  312. char* err = "";
  313. switch (resp->res) {
  314. case SwitchPinger_Result_OK: peerResponseOK(resp, snp); return;
  315. case SwitchPinger_Result_LABEL_MISMATCH: err = "LABEL_MISMATCH"; break;
  316. case SwitchPinger_Result_WRONG_DATA: err = "WRONG_DATA"; break;
  317. case SwitchPinger_Result_ERROR_RESPONSE: err = "ERROR_RESPONSE"; break;
  318. case SwitchPinger_Result_LOOP_ROUTE: err = "LOOP_ROUTE"; break;
  319. case SwitchPinger_Result_TIMEOUT: err = "TIMEOUT"; break;
  320. default: err = "unknown error"; break;
  321. }
  322. Log_debug(snp->log, "Error sending snp query to peer [%" PRIx64 "] [%s]",
  323. resp->label, err);
  324. }
  325. static void probePeerCycle(void* vsn)
  326. {
  327. struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) vsn);
  328. if (snp->pub.snodeIsReachable && !snp->snodePathUpdated) {
  329. updateSnodePath(snp);
  330. }
  331. if (snp->pub.snodeIsReachable > 1) { return; }
  332. if (snp->pub.snodeIsReachable && !snp->authorizedSnodes->length) { return; }
  333. if (!snp->myPeerAddrs->length) { return; }
  334. //Log_debug(snp->log, "probePeerCycle()");
  335. if (AddrSet_indexOf(snp->authorizedSnodes, snp->myAddress, AddrSet_Match_ADDRESS_ONLY) != -1) {
  336. Log_info(snp->log, "Self is specified as supernode, pinging...");
  337. adoptSupernode(snp, snp->myAddress);
  338. return;
  339. }
  340. struct Address* peer = getPeerByNpn(snp, snp->nextPeer);
  341. if (!peer) {
  342. Log_info(snp->log, "No peer found who is version >= 20");
  343. return;
  344. }
  345. struct SwitchPinger_Ping* p =
  346. SwitchPinger_newPing(peer->path, String_CONST(""), 3000, peerResponse, snp->alloc, snp->sp);
  347. Assert_true(p);
  348. Log_debug(snp->log, "Querying peer [%s] [%d] total [%d], blacklist size [%d]",
  349. Address_toString(peer, p->pingAlloc)->bytes,
  350. snp->nextPeer,
  351. snp->myPeerAddrs->length,
  352. snp->blacklist->length);
  353. snp->nextPeer++;
  354. p->type = SwitchPinger_Type_GETSNODE;
  355. if (snp->pub.snodeIsReachable) {
  356. Bits_memcpy(&p->snode, &snp->pub.snodeAddr, sizeof(struct Address));
  357. }
  358. p->onResponseContext = snp;
  359. }
  360. static void onSnodeUnreachable(struct SupernodeHunter* snh,
  361. int64_t sendTime,
  362. int64_t snodeRecvTime)
  363. {
  364. struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) snh);
  365. Log_debug(snp->log, "Supernode unreachable.");
  366. snp->snodePathUpdated = false;
  367. // Snode unreachable, we need also reset peer snode candidate
  368. Bits_memset(&snp->snodeCandidate, 0, Address_SIZE);
  369. }
  370. struct SupernodeHunter* SupernodeHunter_new(struct Allocator* allocator,
  371. struct Log* log,
  372. EventBase_t* base,
  373. struct SwitchPinger* sp,
  374. struct AddrSet* peers,
  375. struct MsgCore* msgCore,
  376. struct Address* myAddress,
  377. struct ReachabilityCollector* rc)
  378. {
  379. struct Allocator* alloc = Allocator_child(allocator);
  380. struct SupernodeHunter_pvt* out =
  381. Allocator_calloc(alloc, sizeof(struct SupernodeHunter_pvt), 1);
  382. Identity_set(out);
  383. out->authorizedSnodes = AddrSet_new(alloc);
  384. out->blacklist = AddrSet_new(alloc);
  385. out->myPeerAddrs = peers;
  386. out->base = base;
  387. //out->rand = rand;
  388. //out->nodes = AddrSet_new(alloc);
  389. //out->timeSnodeCalled = Time_currentTimeMilliseconds();
  390. //out->snodeCandidates = AddrSet_new(alloc);
  391. out->log = log;
  392. out->alloc = alloc;
  393. out->msgCore = msgCore;
  394. out->myAddress = myAddress;
  395. out->rc = rc;
  396. out->selfAddrStr = String_newBinary(myAddress->ip6.bytes, 16, alloc);
  397. out->sp = sp;
  398. out->snodePathUpdated = false;
  399. out->pub.onSnodeUnreachable = onSnodeUnreachable;
  400. Timeout_setInterval(probePeerCycle, out, CYCLE_MS, base, alloc);
  401. return &out->pub;
  402. }