1
0

SupernodeHunter.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470
  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. */
  15. #include "crypto/AddressCalc.h"
  16. #include "crypto/Key.h"
  17. #include "dht/dhtcore/ReplySerializer.h"
  18. #include "subnode/SupernodeHunter.h"
  19. #include "subnode/AddrSet.h"
  20. #include "util/Identity.h"
  21. #include "util/platform/Sockaddr.h"
  22. #include "util/events/Timeout.h"
  23. #include "util/AddrTools.h"
  24. #include "util/events/Time.h"
  25. #include "switch/LabelSplicer.h"
  26. #include <inttypes.h>
  27. #define CYCLE_MS 3000
  28. struct SupernodeHunter_pvt
  29. {
  30. struct SupernodeHunter pub;
  31. /** Nodes which are authorized to be our supernode. */
  32. struct AddrSet* authorizedSnodes;
  33. /** Our peers, DO NOT TOUCH, changed from in SubnodePathfinder. */
  34. struct AddrSet* myPeerAddrs;
  35. struct AddrSet* blacklist;
  36. // Number of the next peer to ping in the peers AddrSet
  37. int nextPeer;
  38. // Will be set to the best known supernode possibility
  39. struct Address snodeCandidate;
  40. bool snodePathUpdated;
  41. struct Allocator* alloc;
  42. struct Log* log;
  43. struct MsgCore* msgCore;
  44. EventBase_t* base;
  45. struct SwitchPinger* sp;
  46. struct Address* myAddress;
  47. String* selfAddrStr;
  48. struct ReachabilityCollector* rc;
  49. Identity
  50. };
  51. struct Query
  52. {
  53. struct SupernodeHunter_pvt* snp;
  54. // If this is a findNode request, this is the search target, if it's a getPeers it's null.
  55. struct Address* searchTar;
  56. int64_t sendTime;
  57. bool isGetRoute;
  58. Identity
  59. };
  60. int SupernodeHunter_addSnode(struct SupernodeHunter* snh, struct Address* snodeAddr)
  61. {
  62. struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) snh);
  63. int length0 = snp->authorizedSnodes->length;
  64. AddrSet_add(snp->authorizedSnodes, snodeAddr, AddrSet_Match_ADDRESS_ONLY);
  65. if (snp->authorizedSnodes->length == length0) {
  66. return SupernodeHunter_addSnode_EXISTS;
  67. }
  68. return 0;
  69. }
  70. int SupernodeHunter_listSnodes(struct SupernodeHunter* snh,
  71. struct Address*** outP,
  72. struct Allocator* alloc)
  73. {
  74. struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) snh);
  75. struct Address** out = Allocator_calloc(alloc, sizeof(char*), snp->authorizedSnodes->length);
  76. for (int i = 0; i < snp->authorizedSnodes->length; i++) {
  77. out[i] = AddrSet_get(snp->authorizedSnodes, i);
  78. }
  79. *outP = out;
  80. return snp->authorizedSnodes->length;
  81. }
  82. int SupernodeHunter_removeSnode(struct SupernodeHunter* snh, struct Address* toRemove)
  83. {
  84. struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) snh);
  85. int length0 = snp->authorizedSnodes->length;
  86. AddrSet_remove(snp->authorizedSnodes, toRemove, AddrSet_Match_ADDRESS_ONLY);
  87. if (snp->authorizedSnodes->length == length0) {
  88. return SupernodeHunter_removeSnode_NONEXISTANT;
  89. }
  90. return 0;
  91. }
  92. static struct Address* getPeerByNpn(struct SupernodeHunter_pvt* snp, int npn)
  93. {
  94. npn = npn % snp->myPeerAddrs->length;
  95. int i = npn;
  96. do {
  97. struct Address* peer = AddrSet_get(snp->myPeerAddrs, i);
  98. if (peer && peer->protocolVersion > 19) { return peer; }
  99. i = (i + 1) % snp->myPeerAddrs->length;
  100. } while (i != npn);
  101. return NULL;
  102. }
  103. static void adoptSupernode2(Dict* msg, struct Address* src, struct MsgCore_Promise* prom)
  104. {
  105. struct Query* q = Identity_check((struct Query*) prom->userData);
  106. struct SupernodeHunter_pvt* snp = Identity_check(q->snp);
  107. if (!src) {
  108. Log_debug(snp->log, "timeout sending to %s",
  109. Address_toString(prom->target, prom->alloc)->bytes);
  110. // If we're in this state and it doesn't work, we're going to drop the snode and
  111. // go back to the beginning because while there's a possibility of a lost packet,
  112. // it's a bigger possibility that we don't have a working path and we'd better
  113. // try another one.
  114. AddrSet_add(snp->blacklist, prom->target, AddrSet_Match_BOTH);
  115. Bits_memset(&snp->snodeCandidate, 0, Address_SIZE);
  116. snp->snodePathUpdated = false;
  117. return;
  118. }
  119. Log_debug(snp->log, "Reply from %s", Address_toString(src, prom->alloc)->bytes);
  120. int64_t* snodeRecvTime = Dict_getIntC(msg, "recvTime");
  121. if (!snodeRecvTime) {
  122. Log_info(snp->log, "getRoute reply with no timeStamp, bad snode");
  123. return;
  124. }
  125. Log_debug(snp->log, "\n\nSupernode location confirmed [%s]\n\n",
  126. Address_toString(src, prom->alloc)->bytes);
  127. if (snp->pub.snodeIsReachable) {
  128. // If while we were searching, the outside code declared that indeed the snode
  129. // is reachable, we will not try to change their snode.
  130. } else if (snp->pub.onSnodeChange) {
  131. Bits_memcpy(&snp->pub.snodeAddr, src, Address_SIZE);
  132. snp->pub.snodeIsReachable = (
  133. AddrSet_indexOf(snp->authorizedSnodes, src, AddrSet_Match_ADDRESS_ONLY) != -1
  134. ) ? 2 : 1;
  135. snp->pub.onSnodeChange(&snp->pub, q->sendTime, *snodeRecvTime);
  136. } else {
  137. Log_warn(snp->log, "onSnodeChange is not set");
  138. }
  139. }
  140. static void adoptSupernode(struct SupernodeHunter_pvt* snp, struct Address* candidate)
  141. {
  142. struct MsgCore_Promise* qp = MsgCore_createQuery(snp->msgCore, 0, snp->alloc);
  143. struct Query* q = Allocator_calloc(qp->alloc, sizeof(struct Query), 1);
  144. Identity_set(q);
  145. q->snp = snp;
  146. q->sendTime = Time_currentTimeMilliseconds();
  147. Dict* msg = qp->msg = Dict_new(qp->alloc);
  148. qp->cb = adoptSupernode2;
  149. qp->userData = q;
  150. qp->target = Address_clone(candidate, qp->alloc);
  151. // NOTE: we don't immediately request a path because the RS doesn't know about us
  152. // quite yet, so it will tell us it doesn't know a path, so we need to ping it
  153. // and take it on faith until we get some announcements announced.
  154. Log_debug(snp->log, "Pinging snode [%s]", Address_toString(qp->target, qp->alloc)->bytes);
  155. Dict_putStringCC(msg, "sq", "pn", qp->alloc);
  156. Assert_true(AddressCalc_validAddress(candidate->ip6.bytes));
  157. return;
  158. }
  159. static void updateSnodePath2(Dict* msg, struct Address* src, struct MsgCore_Promise* prom)
  160. {
  161. struct Query* q = Identity_check((struct Query*) prom->userData);
  162. struct SupernodeHunter_pvt* snp = Identity_check(q->snp);
  163. if (!src) {
  164. String* addrStr = Address_toString(prom->target, prom->alloc);
  165. Log_debug(snp->log, "timeout sending to %s", addrStr->bytes);
  166. return;
  167. }
  168. int64_t* snodeRecvTime = Dict_getIntC(msg, "recvTime");
  169. if (!snodeRecvTime) {
  170. Log_info(snp->log, "getRoute reply with no timeStamp, bad snode");
  171. return;
  172. }
  173. struct Address_List* al = ReplySerializer_parse(src, msg, snp->log, false, prom->alloc);
  174. if (!al || al->length == 0) {
  175. Log_debug(snp->log, "Requesting route to snode [%s], it doesn't know one",
  176. Address_toString(prom->target, prom->alloc)->bytes);
  177. return;
  178. }
  179. Log_debug(snp->log, "Supernode path updated with [%s]",
  180. Address_toString(&al->elems[0], prom->alloc)->bytes);
  181. snp->snodePathUpdated = true;
  182. if (!Bits_memcmp(&snp->pub.snodeAddr, &al->elems[0], Address_SIZE)) {
  183. Log_debug(snp->log, "Requestes route to snode [%s], the one we have is fine",
  184. Address_toString(prom->target, prom->alloc)->bytes);
  185. return;
  186. }
  187. Bits_memcpy(&snp->pub.snodeAddr, &al->elems[0], Address_SIZE);
  188. Bits_memcpy(&snp->snodeCandidate, &al->elems[0], Address_SIZE);
  189. AddrSet_flush(snp->blacklist);
  190. if (snp->pub.onSnodeChange) {
  191. snp->pub.snodeIsReachable = (
  192. AddrSet_indexOf(snp->authorizedSnodes, src, AddrSet_Match_ADDRESS_ONLY) != -1
  193. ) ? 2 : 1;
  194. snp->pub.onSnodeChange(&snp->pub, q->sendTime, *snodeRecvTime);
  195. }
  196. }
  197. static void updateSnodePath(struct SupernodeHunter_pvt* snp)
  198. {
  199. struct MsgCore_Promise* qp = MsgCore_createQuery(snp->msgCore, 0, snp->alloc);
  200. struct Query* q = Allocator_calloc(qp->alloc, sizeof(struct Query), 1);
  201. Identity_set(q);
  202. q->snp = snp;
  203. q->sendTime = Time_currentTimeMilliseconds();
  204. Dict* msg = qp->msg = Dict_new(qp->alloc);
  205. qp->cb = updateSnodePath2;
  206. qp->userData = q;
  207. qp->target = Address_clone(&snp->pub.snodeAddr, qp->alloc);
  208. Log_debug(snp->log, "Update snode [%s] path", Address_toString(qp->target, qp->alloc)->bytes);
  209. Dict_putStringCC(msg, "sq", "gr", qp->alloc);
  210. String* src = String_newBinary(snp->myAddress->ip6.bytes, 16, qp->alloc);
  211. Dict_putStringC(msg, "src", src, qp->alloc);
  212. String* target = String_newBinary(snp->pub.snodeAddr.ip6.bytes, 16, qp->alloc);
  213. Dict_putStringC(msg, "tar", target, qp->alloc);
  214. }
  215. static void queryForAuthorized(struct SupernodeHunter_pvt* snp, struct Address* snode)
  216. {
  217. /*
  218. struct MsgCore_Promise* qp = MsgCore_createQuery(snp->msgCore, 0, snp->alloc);
  219. struct Query* q = Allocator_calloc(qp->alloc, sizeof(struct Query), 1);
  220. Identity_set(q);
  221. q->snp = snp;
  222. q->sendTime = Time_currentTimeMilliseconds();
  223. Dict* msg = qp->msg = Dict_new(qp->alloc);
  224. qp->cb = onReply;
  225. qp->userData = q;
  226. qp->target = candidate;
  227. Log_debug(snp->log, "Pinging snode [%s]", Address_toString(qp->target, qp->alloc)->bytes);
  228. Dict_putStringCC(msg, "sq", "gr", qp->alloc);
  229. */
  230. }
  231. static void peerResponseOK(struct SwitchPinger_Response* resp, struct SupernodeHunter_pvt* snp)
  232. {
  233. ReachabilityCollector_lagSample(snp->rc, resp->label, resp->milliseconds);
  234. struct Address snode = {0};
  235. Bits_memcpy(&snode, &resp->snode, sizeof(struct Address));
  236. if (!snode.path) {
  237. uint8_t label[20];
  238. AddrTools_printPath(label, resp->label);
  239. Log_debug(snp->log, "Peer [%s] reports no supernode", label);
  240. return;
  241. }
  242. uint64_t path = LabelSplicer_splice(snode.path, resp->label);
  243. if (path == UINT64_MAX) {
  244. Log_debug(snp->log, "Supernode path could not be spliced");
  245. return;
  246. }
  247. snode.path = path;
  248. struct Address peerAddr = { .path = resp->label };
  249. int i = AddrSet_indexOf(snp->myPeerAddrs, &peerAddr, AddrSet_Match_LABEL_ONLY);
  250. if (i == -1) {
  251. Log_info(snp->log, "We got a snode reply from a node which is not in peer list");
  252. return;
  253. }
  254. struct Address* peer = AddrSet_get(snp->myPeerAddrs, i);
  255. struct Address* firstPeer = getPeerByNpn(snp, 0);
  256. if (!firstPeer) {
  257. Log_info(snp->log, "All peers have gone away while packet was outstanding");
  258. return;
  259. }
  260. // 1.
  261. // If we have looped around and queried all of our peers returning to the first and we have
  262. // still not found an snode in our authorized snodes list, we should simply accept this one.
  263. if (!snp->pub.snodeIsReachable &&
  264. snp->myPeerAddrs->length > 1 &&
  265. snp->nextPeer >= snp->myPeerAddrs->length &&
  266. Address_isSameIp(firstPeer, peer))
  267. {
  268. if (!snp->snodeCandidate.path) {
  269. Log_info(snp->log, "No snode candidate found [%s]",
  270. Address_toStringKey(&snp->snodeCandidate, resp->ping->pingAlloc)->bytes);
  271. snp->nextPeer = 0;
  272. AddrSet_flush(snp->blacklist);
  273. return;
  274. }
  275. Log_debug(snp->log, "Peer [%s] has proposed we use supernode [%s] we will accept it",
  276. Address_toString(peer, resp->ping->pingAlloc)->bytes,
  277. Address_toString(&snp->snodeCandidate, resp->ping->pingAlloc)->bytes);
  278. adoptSupernode(snp, &snp->snodeCandidate);
  279. return;
  280. }
  281. // 2.
  282. // If this snode is one of our authorized snodes OR if we have none defined, accept this one.
  283. if (AddrSet_indexOf(snp->blacklist, &snode, AddrSet_Match_BOTH) > -1) {
  284. Log_debug(snp->log, "Peer [%s] [%" PRIx64 "] has proposed supernode [%s] "
  285. "but it is blacklisted, continue",
  286. Address_toString(peer, resp->ping->pingAlloc)->bytes,
  287. resp->label,
  288. Address_toString(&snode, resp->ping->pingAlloc)->bytes);
  289. } else if (!snp->authorizedSnodes->length ||
  290. AddrSet_indexOf(snp->authorizedSnodes, &snode, AddrSet_Match_ADDRESS_ONLY) > -1)
  291. {
  292. Address_getPrefix(&snode);
  293. Log_debug(snp->log, "Peer [%s] has proposed supernode [%s] and %s so we will use it",
  294. Address_toString(peer, resp->ping->pingAlloc)->bytes,
  295. Address_toString(&snode, resp->ping->pingAlloc)->bytes,
  296. (snp->authorizedSnodes->length) ? "it is authorized" : "we have none authorized");
  297. adoptSupernode(snp, &snode);
  298. return;
  299. } else if (!snp->snodeCandidate.path) {
  300. Log_debug(snp->log, "Peer [%s] has proposed supernode [%s], we're not using it yet "
  301. "but we will store it as a candidate.",
  302. Address_toString(peer, resp->ping->pingAlloc)->bytes,
  303. Address_toString(&snp->snodeCandidate, resp->ping->pingAlloc)->bytes);
  304. Bits_memcpy(&snp->snodeCandidate, &snode, sizeof(struct Address));
  305. Address_getPrefix(&snp->snodeCandidate);
  306. }
  307. // 3.
  308. // If this snode is not one of our authorized snodes, query it for all of our authorized snodes.
  309. queryForAuthorized(snp, &snode);
  310. }
  311. static void peerResponse(struct SwitchPinger_Response* resp, void* userData)
  312. {
  313. struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) userData);
  314. char* err = "";
  315. switch (resp->res) {
  316. case SwitchPinger_Result_OK: peerResponseOK(resp, snp); return;
  317. case SwitchPinger_Result_LABEL_MISMATCH: err = "LABEL_MISMATCH"; break;
  318. case SwitchPinger_Result_WRONG_DATA: err = "WRONG_DATA"; break;
  319. case SwitchPinger_Result_ERROR_RESPONSE: err = "ERROR_RESPONSE"; break;
  320. case SwitchPinger_Result_LOOP_ROUTE: err = "LOOP_ROUTE"; break;
  321. case SwitchPinger_Result_TIMEOUT: err = "TIMEOUT"; break;
  322. default: err = "unknown error"; break;
  323. }
  324. Log_debug(snp->log, "Error sending snp query to peer [%" PRIx64 "] [%s]",
  325. resp->label, err);
  326. }
  327. static void probePeerCycle(void* vsn)
  328. {
  329. struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) vsn);
  330. if (snp->pub.snodeIsReachable && !snp->snodePathUpdated) {
  331. updateSnodePath(snp);
  332. }
  333. if (snp->pub.snodeIsReachable > 1) { return; }
  334. if (snp->pub.snodeIsReachable && !snp->authorizedSnodes->length) { return; }
  335. if (!snp->myPeerAddrs->length) { return; }
  336. //Log_debug(snp->log, "probePeerCycle()");
  337. if (AddrSet_indexOf(snp->authorizedSnodes, snp->myAddress, AddrSet_Match_ADDRESS_ONLY) != -1) {
  338. Log_info(snp->log, "Self is specified as supernode, pinging...");
  339. adoptSupernode(snp, snp->myAddress);
  340. return;
  341. }
  342. struct Address* peer = getPeerByNpn(snp, snp->nextPeer);
  343. if (!peer) {
  344. Log_info(snp->log, "No peer found who is version >= 20");
  345. return;
  346. }
  347. struct SwitchPinger_Ping* p =
  348. SwitchPinger_newPing(peer->path, String_CONST(""), 3000, peerResponse, snp->alloc, snp->sp);
  349. Assert_true(p);
  350. Log_debug(snp->log, "Querying peer [%s] [%d] total [%d], blacklist size [%d]",
  351. Address_toString(peer, p->pingAlloc)->bytes,
  352. snp->nextPeer,
  353. snp->myPeerAddrs->length,
  354. snp->blacklist->length);
  355. snp->nextPeer++;
  356. p->type = SwitchPinger_Type_GETSNODE;
  357. if (snp->pub.snodeIsReachable) {
  358. Bits_memcpy(&p->snode, &snp->pub.snodeAddr, sizeof(struct Address));
  359. }
  360. p->onResponseContext = snp;
  361. }
  362. static void onSnodeUnreachable(struct SupernodeHunter* snh,
  363. int64_t sendTime,
  364. int64_t snodeRecvTime)
  365. {
  366. struct SupernodeHunter_pvt* snp = Identity_check((struct SupernodeHunter_pvt*) snh);
  367. Log_debug(snp->log, "Supernode unreachable.");
  368. snp->snodePathUpdated = false;
  369. // Snode unreachable, we need also reset peer snode candidate
  370. Bits_memset(&snp->snodeCandidate, 0, Address_SIZE);
  371. }
  372. struct SupernodeHunter* SupernodeHunter_new(struct Allocator* allocator,
  373. struct Log* log,
  374. EventBase_t* base,
  375. struct SwitchPinger* sp,
  376. struct AddrSet* peers,
  377. struct MsgCore* msgCore,
  378. struct Address* myAddress,
  379. struct ReachabilityCollector* rc)
  380. {
  381. struct Allocator* alloc = Allocator_child(allocator);
  382. struct SupernodeHunter_pvt* out =
  383. Allocator_calloc(alloc, sizeof(struct SupernodeHunter_pvt), 1);
  384. Identity_set(out);
  385. out->authorizedSnodes = AddrSet_new(alloc);
  386. out->blacklist = AddrSet_new(alloc);
  387. out->myPeerAddrs = peers;
  388. out->base = base;
  389. //out->rand = rand;
  390. //out->nodes = AddrSet_new(alloc);
  391. //out->timeSnodeCalled = Time_currentTimeMilliseconds();
  392. //out->snodeCandidates = AddrSet_new(alloc);
  393. out->log = log;
  394. out->alloc = alloc;
  395. out->msgCore = msgCore;
  396. out->myAddress = myAddress;
  397. out->rc = rc;
  398. out->selfAddrStr = String_newBinary(myAddress->ip6.bytes, 16, alloc);
  399. out->sp = sp;
  400. out->snodePathUpdated = false;
  401. out->pub.onSnodeUnreachable = onSnodeUnreachable;
  402. Timeout_setInterval(probePeerCycle, out, CYCLE_MS, base, alloc);
  403. return &out->pub;
  404. }