SupernodeHunter.c 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. */
  15. #include "crypto/AddressCalc.h"
  16. #include "crypto/Key.h"
  17. #include "dht/dhtcore/ReplySerializer.h"
  18. #include "subnode/SupernodeHunter.h"
  19. #include "subnode/AddrSet.h"
  20. #include "util/Identity.h"
  21. #include "util/platform/Sockaddr.h"
  22. #include "util/events/Timeout.h"
  23. #include "util/AddrTools.h"
  24. #include "util/events/Time.h"
  25. #include "switch/LabelSplicer.h"
  26. #define CYCLE_MS 3000
  27. struct SupernodeHunter_pvt
  28. {
  29. struct SupernodeHunter pub;
  30. /** Nodes which are authorized to be our supernode. */
  31. struct AddrSet* authorizedSnodes;
  32. /** Our peers, DO NOT TOUCH, changed from in SubnodePathfinder. */
  33. struct AddrSet* myPeerAddrs;
  34. struct AddrSet* blacklist;
  35. // Number of the next peer to ping in the peers AddrSet
  36. int nextPeer;
  37. // Will be set to the best known supernode possibility
  38. struct Address snodeCandidate;
  39. bool snodePathUpdated;
  40. struct Allocator* alloc;
  41. struct Log* log;
  42. struct MsgCore* msgCore;
  43. struct EventBase* base;
  44. struct SwitchPinger* sp;
  45. struct Address* myAddress;
  46. String* selfAddrStr;
  47. struct ReachabilityCollector* rc;
  48. Identity
  49. };
  50. struct Query
  51. {
  52. struct SupernodeHunter_pvt* snp;
  53. // If this is a findNode request, this is the search target, if it's a getPeers it's null.
  54. struct Address* searchTar;
  55. int64_t sendTime;
  56. bool isGetRoute;
  57. Identity
  58. };
  59. int SupernodeHunter_addSnode(struct SupernodeHunter* snh, struct Address* snodeAddr)
  60. {
  61. struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) snh);
  62. int length0 = snp->authorizedSnodes->length;
  63. AddrSet_add(snp->authorizedSnodes, snodeAddr, AddrSet_Match_ADDRESS_ONLY);
  64. if (snp->authorizedSnodes->length == length0) {
  65. return SupernodeHunter_addSnode_EXISTS;
  66. }
  67. return 0;
  68. }
  69. int SupernodeHunter_listSnodes(struct SupernodeHunter* snh,
  70. struct Address*** outP,
  71. struct Allocator* alloc)
  72. {
  73. struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) snh);
  74. struct Address** out = Allocator_calloc(alloc, sizeof(char*), snp->authorizedSnodes->length);
  75. for (int i = 0; i < snp->authorizedSnodes->length; i++) {
  76. out[i] = AddrSet_get(snp->authorizedSnodes, i);
  77. }
  78. *outP = out;
  79. return snp->authorizedSnodes->length;
  80. }
  81. int SupernodeHunter_removeSnode(struct SupernodeHunter* snh, struct Address* toRemove)
  82. {
  83. struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) snh);
  84. int length0 = snp->authorizedSnodes->length;
  85. AddrSet_remove(snp->authorizedSnodes, toRemove, AddrSet_Match_ADDRESS_ONLY);
  86. if (snp->authorizedSnodes->length == length0) {
  87. return SupernodeHunter_removeSnode_NONEXISTANT;
  88. }
  89. return 0;
  90. }
  91. static struct Address* getPeerByNpn(struct SupernodeHunter_pvt* snp, int npn)
  92. {
  93. npn = npn % snp->myPeerAddrs->length;
  94. int i = npn;
  95. do {
  96. struct Address* peer = AddrSet_get(snp->myPeerAddrs, i);
  97. if (peer && peer->protocolVersion > 19) { return peer; }
  98. i = (i + 1) % snp->myPeerAddrs->length;
  99. } while (i != npn);
  100. return NULL;
  101. }
  102. static void adoptSupernode2(Dict* msg, struct Address* src, struct MsgCore_Promise* prom)
  103. {
  104. struct Query* q = Identity_check((struct Query*) prom->userData);
  105. struct SupernodeHunter_pvt* snp = Identity_check(q->snp);
  106. if (!src) {
  107. Log_debug(snp->log, "timeout sending to %s",
  108. Address_toString(prom->target, prom->alloc)->bytes);
  109. // If we're in this state and it doesn't work, we're going to drop the snode and
  110. // go back to the beginning because while there's a possibility of a lost packet,
  111. // it's a bigger possibility that we don't have a working path and we'd better
  112. // try another one.
  113. AddrSet_add(snp->blacklist, prom->target, AddrSet_Match_BOTH);
  114. Bits_memset(&snp->snodeCandidate, 0, Address_SIZE);
  115. snp->snodePathUpdated = false;
  116. return;
  117. }
  118. Log_debug(snp->log, "Reply from %s", Address_toString(src, prom->alloc)->bytes);
  119. int64_t* snodeRecvTime = Dict_getIntC(msg, "recvTime");
  120. if (!snodeRecvTime) {
  121. Log_info(snp->log, "getRoute reply with no timeStamp, bad snode");
  122. return;
  123. }
  124. Log_debug(snp->log, "\n\nSupernode location confirmed [%s]\n\n",
  125. Address_toString(src, prom->alloc)->bytes);
  126. if (snp->pub.snodeIsReachable) {
  127. // If while we were searching, the outside code declared that indeed the snode
  128. // is reachable, we will not try to change their snode.
  129. } else if (snp->pub.onSnodeChange) {
  130. Bits_memcpy(&snp->pub.snodeAddr, src, Address_SIZE);
  131. snp->pub.snodeIsReachable = (
  132. AddrSet_indexOf(snp->authorizedSnodes, src, AddrSet_Match_ADDRESS_ONLY) != -1
  133. ) ? 2 : 1;
  134. snp->pub.onSnodeChange(&snp->pub, q->sendTime, *snodeRecvTime);
  135. } else {
  136. Log_warn(snp->log, "onSnodeChange is not set");
  137. }
  138. }
  139. static void adoptSupernode(struct SupernodeHunter_pvt* snp, struct Address* candidate)
  140. {
  141. struct MsgCore_Promise* qp = MsgCore_createQuery(snp->msgCore, 0, snp->alloc);
  142. struct Query* q = Allocator_calloc(qp->alloc, sizeof(struct Query), 1);
  143. Identity_set(q);
  144. q->snp = snp;
  145. q->sendTime = Time_currentTimeMilliseconds(snp->base);
  146. Dict* msg = qp->msg = Dict_new(qp->alloc);
  147. qp->cb = adoptSupernode2;
  148. qp->userData = q;
  149. qp->target = Address_clone(candidate, qp->alloc);
  150. // NOTE: we don't immediately request a path because the RS doesn't know about us
  151. // quite yet, so it will tell us it doesn't know a path, so we need to ping it
  152. // and take it on faith until we get some announcements announced.
  153. Log_debug(snp->log, "Pinging snode [%s]", Address_toString(qp->target, qp->alloc)->bytes);
  154. Dict_putStringCC(msg, "sq", "pn", qp->alloc);
  155. Assert_true(AddressCalc_validAddress(candidate->ip6.bytes));
  156. return;
  157. }
  158. static void updateSnodePath2(Dict* msg, struct Address* src, struct MsgCore_Promise* prom)
  159. {
  160. struct Query* q = Identity_check((struct Query*) prom->userData);
  161. struct SupernodeHunter_pvt* snp = Identity_check(q->snp);
  162. if (!src) {
  163. String* addrStr = Address_toString(prom->target, prom->alloc);
  164. Log_debug(snp->log, "timeout sending to %s", addrStr->bytes);
  165. return;
  166. }
  167. int64_t* snodeRecvTime = Dict_getIntC(msg, "recvTime");
  168. if (!snodeRecvTime) {
  169. Log_info(snp->log, "getRoute reply with no timeStamp, bad snode");
  170. return;
  171. }
  172. struct Address_List* al = ReplySerializer_parse(src, msg, snp->log, false, prom->alloc);
  173. if (!al || al->length == 0) {
  174. Log_debug(snp->log, "Requesting route to snode [%s], it doesn't know one",
  175. Address_toString(prom->target, prom->alloc)->bytes);
  176. return;
  177. }
  178. Log_debug(snp->log, "Supernode path updated with [%s]",
  179. Address_toString(&al->elems[0], prom->alloc)->bytes);
  180. snp->snodePathUpdated = true;
  181. if (!Bits_memcmp(&snp->pub.snodeAddr, &al->elems[0], Address_SIZE)) {
  182. Log_debug(snp->log, "Requestes route to snode [%s], the one we have is fine",
  183. Address_toString(prom->target, prom->alloc)->bytes);
  184. return;
  185. }
  186. Bits_memcpy(&snp->pub.snodeAddr, &al->elems[0], Address_SIZE);
  187. Bits_memcpy(&snp->snodeCandidate, &al->elems[0], Address_SIZE);
  188. AddrSet_flush(snp->blacklist);
  189. if (snp->pub.onSnodeChange) {
  190. snp->pub.snodeIsReachable = (
  191. AddrSet_indexOf(snp->authorizedSnodes, src, AddrSet_Match_ADDRESS_ONLY) != -1
  192. ) ? 2 : 1;
  193. snp->pub.onSnodeChange(&snp->pub, q->sendTime, *snodeRecvTime);
  194. }
  195. }
  196. static void updateSnodePath(struct SupernodeHunter_pvt* snp)
  197. {
  198. struct MsgCore_Promise* qp = MsgCore_createQuery(snp->msgCore, 0, snp->alloc);
  199. struct Query* q = Allocator_calloc(qp->alloc, sizeof(struct Query), 1);
  200. Identity_set(q);
  201. q->snp = snp;
  202. q->sendTime = Time_currentTimeMilliseconds(snp->base);
  203. Dict* msg = qp->msg = Dict_new(qp->alloc);
  204. qp->cb = updateSnodePath2;
  205. qp->userData = q;
  206. qp->target = Address_clone(&snp->pub.snodeAddr, qp->alloc);;
  207. Log_debug(snp->log, "Update snode [%s] path", Address_toString(qp->target, qp->alloc)->bytes);
  208. Dict_putStringCC(msg, "sq", "gr", qp->alloc);
  209. String* src = String_newBinary(snp->myAddress->ip6.bytes, 16, qp->alloc);
  210. Dict_putStringC(msg, "src", src, qp->alloc);
  211. String* target = String_newBinary(snp->pub.snodeAddr.ip6.bytes, 16, qp->alloc);
  212. Dict_putStringC(msg, "tar", target, qp->alloc);
  213. }
  214. static void queryForAuthorized(struct SupernodeHunter_pvt* snp, struct Address* snode)
  215. {
  216. /*
  217. struct MsgCore_Promise* qp = MsgCore_createQuery(snp->msgCore, 0, snp->alloc);
  218. struct Query* q = Allocator_calloc(qp->alloc, sizeof(struct Query), 1);
  219. Identity_set(q);
  220. q->snp = snp;
  221. q->sendTime = Time_currentTimeMilliseconds(snp->base);
  222. Dict* msg = qp->msg = Dict_new(qp->alloc);
  223. qp->cb = onReply;
  224. qp->userData = q;
  225. qp->target = candidate;
  226. Log_debug(snp->log, "Pinging snode [%s]", Address_toString(qp->target, qp->alloc)->bytes);
  227. Dict_putStringCC(msg, "sq", "gr", qp->alloc);
  228. */
  229. }
  230. static void peerResponseOK(struct SwitchPinger_Response* resp, struct SupernodeHunter_pvt* snp)
  231. {
  232. ReachabilityCollector_lagSample(snp->rc, resp->label, resp->milliseconds);
  233. struct Address snode;
  234. Bits_memcpy(&snode, &resp->snode, sizeof(struct Address));
  235. if (!snode.path) {
  236. uint8_t label[20];
  237. AddrTools_printPath(label, resp->label);
  238. Log_debug(snp->log, "Peer [%s] reports no supernode", label);
  239. return;
  240. }
  241. uint64_t path = LabelSplicer_splice(snode.path, resp->label);
  242. if (path == UINT64_MAX) {
  243. Log_debug(snp->log, "Supernode path could not be spliced");
  244. return;
  245. }
  246. snode.path = path;
  247. struct Address peerAddr = { .path = resp->label };
  248. int i = AddrSet_indexOf(snp->myPeerAddrs, &peerAddr, AddrSet_Match_LABEL_ONLY);
  249. if (i == -1) {
  250. Log_info(snp->log, "We got a snode reply from a node which is not in peer list");
  251. return;
  252. }
  253. struct Address* peer = AddrSet_get(snp->myPeerAddrs, i);
  254. struct Address* firstPeer = getPeerByNpn(snp, 0);
  255. if (!firstPeer) {
  256. Log_info(snp->log, "All peers have gone away while packet was outstanding");
  257. return;
  258. }
  259. // 1.
  260. // If we have looped around and queried all of our peers returning to the first and we have
  261. // still not found an snode in our authorized snodes list, we should simply accept this one.
  262. if (!snp->pub.snodeIsReachable &&
  263. snp->myPeerAddrs->length > 1 &&
  264. snp->nextPeer >= snp->myPeerAddrs->length &&
  265. Address_isSameIp(firstPeer, peer))
  266. {
  267. if (!snp->snodeCandidate.path) {
  268. Log_info(snp->log, "No snode candidate found [%s]",
  269. Address_toStringKey(&snp->snodeCandidate, resp->ping->pingAlloc)->bytes);
  270. snp->nextPeer = 0;
  271. AddrSet_flush(snp->blacklist);
  272. return;
  273. }
  274. Log_debug(snp->log, "Peer [%s] has proposed we use supernode [%s] we will accept it",
  275. Address_toString(peer, resp->ping->pingAlloc)->bytes,
  276. Address_toString(&snp->snodeCandidate, resp->ping->pingAlloc)->bytes);
  277. adoptSupernode(snp, &snp->snodeCandidate);
  278. return;
  279. }
  280. // 2.
  281. // If this snode is one of our authorized snodes OR if we have none defined, accept this one.
  282. if (AddrSet_indexOf(snp->blacklist, &snode, AddrSet_Match_BOTH) > -1) {
  283. Log_debug(snp->log, "Peer [%s] [%" PRIx64 "] has proposed supernode [%s] "
  284. "but it is blacklisted, continue",
  285. Address_toString(peer, resp->ping->pingAlloc)->bytes,
  286. resp->label,
  287. Address_toString(&snode, resp->ping->pingAlloc)->bytes);
  288. } else if (!snp->authorizedSnodes->length ||
  289. AddrSet_indexOf(snp->authorizedSnodes, &snode, AddrSet_Match_ADDRESS_ONLY) > -1)
  290. {
  291. Address_getPrefix(&snode);
  292. Log_debug(snp->log, "Peer [%s] has proposed supernode [%s] and %s so we will use it",
  293. Address_toString(peer, resp->ping->pingAlloc)->bytes,
  294. Address_toString(&snode, resp->ping->pingAlloc)->bytes,
  295. (snp->authorizedSnodes->length) ? "it is authorized" : "we have none authorized");
  296. adoptSupernode(snp, &snode);
  297. return;
  298. } else if (!snp->snodeCandidate.path) {
  299. Log_debug(snp->log, "Peer [%s] has proposed supernode [%s], we're not using it yet "
  300. "but we will store it as a candidate.",
  301. Address_toString(peer, resp->ping->pingAlloc)->bytes,
  302. Address_toString(&snp->snodeCandidate, resp->ping->pingAlloc)->bytes);
  303. Bits_memcpy(&snp->snodeCandidate, &snode, sizeof(struct Address));
  304. Address_getPrefix(&snp->snodeCandidate);
  305. }
  306. // 3.
  307. // If this snode is not one of our authorized snodes, query it for all of our authorized snodes.
  308. queryForAuthorized(snp, &snode);
  309. }
  310. static void peerResponse(struct SwitchPinger_Response* resp, void* userData)
  311. {
  312. struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) userData);
  313. char* err = "";
  314. switch (resp->res) {
  315. case SwitchPinger_Result_OK: peerResponseOK(resp, snp); return;
  316. case SwitchPinger_Result_LABEL_MISMATCH: err = "LABEL_MISMATCH"; break;
  317. case SwitchPinger_Result_WRONG_DATA: err = "WRONG_DATA"; break;
  318. case SwitchPinger_Result_ERROR_RESPONSE: err = "ERROR_RESPONSE"; break;
  319. case SwitchPinger_Result_LOOP_ROUTE: err = "LOOP_ROUTE"; break;
  320. case SwitchPinger_Result_TIMEOUT: err = "TIMEOUT"; break;
  321. default: err = "unknown error"; break;
  322. }
  323. Log_debug(snp->log, "Error sending snp query to peer [%" PRIx64 "] [%s]",
  324. resp->label, err);
  325. }
  326. static void probePeerCycle(void* vsn)
  327. {
  328. struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) vsn);
  329. if (snp->pub.snodeIsReachable && !snp->snodePathUpdated) {
  330. updateSnodePath(snp);
  331. }
  332. if (snp->pub.snodeIsReachable > 1) { return; }
  333. if (snp->pub.snodeIsReachable && !snp->authorizedSnodes->length) { return; }
  334. if (!snp->myPeerAddrs->length) { return; }
  335. //Log_debug(snp->log, "probePeerCycle()");
  336. if (AddrSet_indexOf(snp->authorizedSnodes, snp->myAddress, AddrSet_Match_ADDRESS_ONLY) != -1) {
  337. Log_info(snp->log, "Self is specified as supernode, pinging...");
  338. adoptSupernode(snp, snp->myAddress);
  339. return;
  340. }
  341. struct Address* peer = getPeerByNpn(snp, snp->nextPeer);
  342. if (!peer) {
  343. Log_info(snp->log, "No peer found who is version >= 20");
  344. return;
  345. }
  346. struct SwitchPinger_Ping* p =
  347. SwitchPinger_newPing(peer->path, String_CONST(""), 3000, peerResponse, snp->alloc, snp->sp);
  348. Assert_true(p);
  349. Log_debug(snp->log, "Querying peer [%s] [%d] total [%d], blacklist size [%d]",
  350. Address_toString(peer, p->pingAlloc)->bytes,
  351. snp->nextPeer,
  352. snp->myPeerAddrs->length,
  353. snp->blacklist->length);
  354. snp->nextPeer++;
  355. p->type = SwitchPinger_Type_GETSNODE;
  356. if (snp->pub.snodeIsReachable) {
  357. Bits_memcpy(&p->snode, &snp->pub.snodeAddr, sizeof(struct Address));
  358. }
  359. p->onResponseContext = snp;
  360. }
  361. static void onSnodeUnreachable(struct SupernodeHunter* snh,
  362. int64_t sendTime,
  363. int64_t snodeRecvTime)
  364. {
  365. struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) snh);
  366. Log_debug(snp->log, "Supernode unreachable.");
  367. snp->snodePathUpdated = false;
  368. // Snode unreachable, we need also reset peer snode candidate
  369. Bits_memset(&snp->snodeCandidate, 0, Address_SIZE);
  370. }
  371. struct SupernodeHunter* SupernodeHunter_new(struct Allocator* allocator,
  372. struct Log* log,
  373. struct EventBase* base,
  374. struct SwitchPinger* sp,
  375. struct AddrSet* peers,
  376. struct MsgCore* msgCore,
  377. struct Address* myAddress,
  378. struct ReachabilityCollector* rc)
  379. {
  380. struct Allocator* alloc = Allocator_child(allocator);
  381. struct SupernodeHunter_pvt* out =
  382. Allocator_calloc(alloc, sizeof(struct SupernodeHunter_pvt), 1);
  383. Identity_set(out);
  384. out->authorizedSnodes = AddrSet_new(alloc);
  385. out->blacklist = AddrSet_new(alloc);
  386. out->myPeerAddrs = peers;
  387. out->base = base;
  388. //out->rand = rand;
  389. //out->nodes = AddrSet_new(alloc);
  390. //out->timeSnodeCalled = Time_currentTimeMilliseconds(base);
  391. //out->snodeCandidates = AddrSet_new(alloc);
  392. out->log = log;
  393. out->alloc = alloc;
  394. out->msgCore = msgCore;
  395. out->myAddress = myAddress;
  396. out->rc = rc;
  397. out->selfAddrStr = String_newBinary(myAddress->ip6.bytes, 16, alloc);
  398. out->sp = sp;
  399. out->snodePathUpdated = false;
  400. out->pub.onSnodeUnreachable = onSnodeUnreachable;
  401. Timeout_setInterval(probePeerCycle, out, CYCLE_MS, base, alloc);
  402. return &out->pub;
  403. }