RouterModule.c 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721
  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. */
  15. #include "benc/String.h"
  16. #include "dht/Address.h"
  17. #include "dht/dhtcore/RouterModule.h"
  18. #include "dht/dhtcore/RouterModule_pvt.h"
  19. #include "dht/dhtcore/Node.h"
  20. #include "dht/dhtcore/NodeList.h"
  21. #include "dht/dhtcore/NodeStore.h"
  22. #include "dht/dhtcore/VersionList.h"
  23. #include "dht/CJDHTConstants.h"
  24. #include "dht/DHTMessage.h"
  25. #include "dht/DHTModule.h"
  26. #include "dht/DHTModuleRegistry.h"
  27. #include "util/log/Log.h"
  28. #include "memory/Allocator.h"
  29. #include "switch/LabelSplicer.h"
  30. #include "switch/NumberCompress.h"
  31. #include "util/events/EventBase.h"
  32. #include "util/AverageRoller.h"
  33. #include "util/Bits.h"
  34. #include "util/Hex.h"
  35. #include "util/Endian.h"
  36. #include "util/Pinger.h"
  37. #include "util/events/Time.h"
  38. #include "util/events/Timeout.h"
  39. #include "util/version/Version.h"
  40. #include "wire/Message.h"
  41. /*
  42. * The router module is the central part of the DHT engine.
  43. * Its job is to maintain a routing table which is updated by all incoming packets.
  44. * When it gets an incoming query, its job is to add nodes to the reply so that the asking node
  45. * can find other nodes which are closer to its target than us.
  46. *
  47. * This implementation does not split nodes explicitly into buckets nor does it explicitly try to
  48. * distinguish between "good" and "bad" nodes. Instead it tries to determine which node will help
  49. * get to the requested record the fastest. Instead of periodicly pinging a random node in each
  50. * "bucket", this implementation periodically searches for a random[1] hash. When a node is sent a
  51. * query, the distance[2] between it and the first node is divided by the amount of time it
  52. * takes the node to respond, for each successful search, this number is added to an attribute of
  53. * the node called "reach".
  54. *
  55. * Visually representing a node as an area whose location is defined by the node id and its size is
  56. * defined by the node reach, you can see that there is a possibility for a record to be closer in
  57. * key space to node2 while is is still further inside of node1's reach thus node1 is a better
  58. * choice for the next node to ask.
  59. *
  60. * |<--------- Node 1 ---------->|
  61. * |<--- Node 2 ---->|
  62. * ^----- Desired record location.
  63. *
  64. * New nodes are inserted into the table but with a reach of 0. It is up to the search client to
  65. * send search requests to them so they can prove their validity and have their reach number
  66. * updated.
  67. *
  68. * Reach of a node is incremented by 2 every time the node responds to a query and incremented by 1
  69. * every time a node sends a query of its own. This has almost no effect except that it means a
  70. * node which has recently sent data will be preferred over one which has not.
  71. *
  72. * When a search is carried out, the next K returned nodes are not necessarily the closest known
  73. * nodes to the id of the record. The nodes returned will be the nodes with the lowest
  74. * distance:reach ratio. The distance:reach ratio is calculated by dividing the distance between
  75. * the node and the record by the node's reach number. Actually it is done by multiplying
  76. * UINT32_MAX minus the distance by the reach so that it does not need to use slower divison.
  77. * See: NodeCollector.h
  78. *
  79. * Since information about a node becomes stale over time, all reach numbers are decreased by
  80. * the constant REACH_DECREASE_PER_SECOND times the number of seconds since the last cycle,
  81. * this operation is performed periodicly every LOCAL_MAINTENANCE_SEARCH_MILLISECONDS unless
  82. * a local maintainence search is being run which is not often once the network is stable.
  83. *
  84. * TODO(cjd): ---
  85. * In order to have the nodes with least distance:reach ratio ready to handle any incoming search,
  86. * we precompute the borders where the "best next node" changes. This computation is best understood
  87. * by graphing the nodes with their location in keyspace on the X axis and their reach on the Y
  88. * axis. The border between two nodes, nodeA and nodeB is the location where a line drawn from the
  89. * X axis up to either node location would be the same angle.
  90. *
  91. * ^ ^
  92. * | nodeA | nodeA
  93. * | |\ | |\__
  94. * | | \ | | \__
  95. * | | \ nodeB | | \nodeB
  96. * | | \ /| | | \__
  97. * | | \ / | | | | \__
  98. * | | \/ | | | | \__
  99. * +---------------------------------------> +--------------------------------------->
  100. * ^-- border ^-- border2
  101. *
  102. * Everything to the left of the border and everything to the right of border2 is to be serviced by
  103. * nodeA. Everything between the two borders is serviced by nodeB. Border2 is found by
  104. * drawing a line from the point given for nodeA to through the point given for nodeB and finding
  105. * the intersection of that line with the Y axis. border and border2 are shown on different graphs
  106. * only to limit clutter, they are the same nodeA and nodeB.
  107. *
  108. * When resolving a search, this implementation will lookup the location of the searched for record
  109. * and return the nodes which belong to the insides of the nearest K borders, this guarantees return
  110. * of the nodes whose distance:reach ratio is the lowest for that location.
  111. * ---
  112. *
  113. * This implementation must never respond to a search by sending any node who's id is not closer
  114. * to the target than its own. Such an event would lead to the possibility of "routing loops" and
  115. * must be prevented. Searches for which this node has the lowest distance:reach ratio will be
  116. * replied to with nodes which have 0 reach but are closer than this node or, if there are no such
  117. * nodes, no nodes at all.
  118. *
  119. * The search consumer in this routing module tries to minimize the amount of traffic sent when
  120. * doing a lookup. To achieve this, it sends a request only to the last node in the search response
  121. * packet, after the global mean response time has passed without it getting a response, it sends
  122. * requests to the second to last and so forth, working backward. Nodes which fail to respond in
  123. * time have their reach immedietly set to zero.
  124. *
  125. * The global mean response time is the average amount of time it takes a node to respond to a
  126. * search query. It is a rolling average over the past 256 seconds.
  127. *
  128. * To maximize the quality of service offered by this node this implementation will repeat
  129. * searches which it handles every number of seconds given by the constant:
  130. * GLOBAL_MAINTENANCE_SEARCH_MILLISECONDS.
  131. * Any incoming search with a get_peers request is eligable to be repeated.
  132. *
  133. * [1] The implementation runs periodic searches for random hashes but unless the search target
  134. * is closer in keyspace to this node than it is to any node with non-zero reach, the search
  135. * is not performed. This means that the node will send out lots of searches when it doesn't
  136. * know many reliable nodes but it will taper off like a governer as it becomes more
  137. * integrated in the network. These searches are run every number of milliseconds given
  138. * by the constant LOCAL_MAINTENANCE_SEARCH_MILLISECONDS.
  139. *
  140. * [2] If a response "overshoots" the record requested then it is calculated as if it had undershot
  141. * by the same amount so as not to provide arbitrage advantage to nodes who return results which
  142. * are very far away yet very inaccurate. If it overshoots by more than the distance between the
  143. * node and the searched for location (this should never happen), it is considered to be 0.
  144. */
  145. /*--------------------Constants--------------------*/
  146. /** The number of seconds of time overwhich to calculate the global mean response time. */
  147. #define GMRT_SECONDS 256
  148. /**
  149. * The number to initialize the global mean response time averager with so that it will
  150. * return sane results early on, this number can be much higher than the expected average.
  151. */
  152. #define GMRT_INITAL_MILLISECONDS 5000
  153. /** The number of nodes which we will keep track of. */
  154. #define NODE_STORE_SIZE 8192
  155. /** The number of milliseconds between attempting local maintenance searches. */
  156. #define LOCAL_MAINTENANCE_SEARCH_MILLISECONDS 1000
  157. /**
  158. * The number of milliseconds to pass between global maintainence searches.
  159. * These are searches for random targets which are used to discover new nodes.
  160. */
  161. #define GLOBAL_MAINTENANCE_SEARCH_MILLISECONDS 30000
  162. #define SEARCH_REPEAT_MILLISECONDS 7500
  163. /** The number of times the GMRT before pings should be timed out. */
  164. #define PING_TIMEOUT_GMRT_MULTIPLIER 10
  165. /** The minimum amount of time before a ping should timeout. */
  166. #define PING_TIMEOUT_MINIMUM 3000
  167. #define PING_TIMEOUT_MAXIMUM 30000
  168. /** You are not expected to understand this. */
  169. #define LINK_STATE_MULTIPLIER 536870
  170. /** All searches will be killed after this amount of time nomatter how bad the GMRT is. */
  171. #define MAX_TIMEOUT 10000
  172. /** Never allow a search to be timed out in less than this number of milliseconds. */
  173. #define MIN_TIMEOUT 10
  174. /*--------------------Prototypes--------------------*/
  175. static int handleIncoming(struct DHTMessage* message, void* vcontext);
  176. static int handleOutgoing(struct DHTMessage* message, void* vcontext);
  177. /*--------------------Interface--------------------*/
  178. /**
  179. * Register a new RouterModule.
  180. *
  181. * @param registry the DHT module registry for signal handling.
  182. * @param allocator a means to allocate memory.
  183. * @param myAddress the address for this DHT node.
  184. * @param nodeStore the place to put the nodes
  185. * @return the RouterModule.
  186. */
  187. struct RouterModule* RouterModule_register(struct DHTModuleRegistry* registry,
  188. struct Allocator* allocator,
  189. const uint8_t myAddress[Address_KEY_SIZE],
  190. EventBase_t* eventBase,
  191. struct Log* logger,
  192. struct Random* rand,
  193. struct NodeStore* nodeStore)
  194. {
  195. struct RouterModule* const out = Allocator_calloc(allocator, sizeof(struct RouterModule), 1);
  196. struct DHTModule* dm = Allocator_clone(allocator, (&(struct DHTModule) {
  197. .name = "RouterModule",
  198. .context = out,
  199. .handleIncoming = handleIncoming,
  200. .handleOutgoing = handleOutgoing
  201. }));
  202. DHTModuleRegistry_register(dm, registry);
  203. Address_forKey(&out->address, myAddress);
  204. out->gmrtRoller = AverageRoller_new(GMRT_SECONDS, eventBase, allocator);
  205. AverageRoller_update(out->gmrtRoller, GMRT_INITAL_MILLISECONDS);
  206. out->nodeStore = nodeStore;
  207. out->registry = registry;
  208. out->eventBase = eventBase;
  209. out->logger = logger;
  210. out->allocator = allocator;
  211. out->rand = rand;
  212. out->pinger = Pinger_new(eventBase, rand, logger, allocator);
  213. Identity_set(out);
  214. return out;
  215. }
  216. /**
  217. * The amount of time to wait before skipping over the first node and trying another in a search.
  218. * Any node which can't beat this time will have its reach set to 0.
  219. *
  220. * @param module this module.
  221. * @return the timeout time.
  222. */
  223. uint64_t RouterModule_searchTimeoutMilliseconds(struct RouterModule* module)
  224. {
  225. uint64_t x = (((uint64_t) AverageRoller_getAverage(module->gmrtRoller)) * 4);
  226. x = x + (Random_uint32(module->rand) % (x | 1)) / 2;
  227. return (x > MAX_TIMEOUT) ? MAX_TIMEOUT : (x < MIN_TIMEOUT) ? MIN_TIMEOUT : x;
  228. }
  229. static inline int sendNodes(struct NodeList* nodeList,
  230. struct DHTMessage* message,
  231. struct RouterModule* module,
  232. uint32_t askerVersion)
  233. {
  234. struct DHTMessage* query = message->replyTo;
  235. int count = 0;
  236. for (int i = 0; i < (int)nodeList->size; i++) {
  237. count += !nodeList->nodes[i]->dnd;
  238. }
  239. String* nodes =
  240. String_newBinary(NULL, count * Address_SERIALIZED_SIZE, message->allocator);
  241. struct VersionList* versions = VersionList_new(count, message->allocator);
  242. int j = 0;
  243. for (int i = 0; i < (int)nodeList->size; i++) {
  244. if (nodeList->nodes[i]->dnd) { continue; }
  245. // We have to modify the reply in case this node uses a longer label discriminator
  246. // in our switch than its target address, the target address *must* have the same
  247. // length or longer.
  248. struct Address addr;
  249. Bits_memcpy(&addr, &nodeList->nodes[i]->address, sizeof(struct Address));
  250. addr.path = NumberCompress_getLabelFor(addr.path, query->address->path);
  251. Address_serialize(&nodes->bytes[j * Address_SERIALIZED_SIZE], &addr);
  252. versions->versions[j] = nodeList->nodes[i]->address.protocolVersion;
  253. Assert_ifParanoid(!Bits_isZero(&nodes->bytes[i * Address_SERIALIZED_SIZE],
  254. Address_SERIALIZED_SIZE));
  255. j++;
  256. }
  257. nodes->len = j * Address_SERIALIZED_SIZE;
  258. versions->length = j;
  259. if (j > 0) {
  260. Dict_putString(message->asDict, CJDHTConstants_NODES, nodes, message->allocator);
  261. String* versionsStr = VersionList_stringify(versions, message->allocator);
  262. Dict_putString(message->asDict,
  263. CJDHTConstants_NODE_PROTOCOLS,
  264. versionsStr,
  265. message->allocator);
  266. }
  267. return 0;
  268. }
  269. /**
  270. * Handle an incoming search query.
  271. * This is setup to handle the outgoing *response* to the query, it should
  272. * be called from handleOutgoing() and populate the response with nodes.
  273. *
  274. * @param message the empty response message to populate.
  275. * @param replyArgs the arguments dictionary in the response (to be populated).
  276. * @param module the routing module context.
  277. * @return 0 as long as the packet should not be stopped (at this point always 0).
  278. */
  279. static inline int handleQuery(struct DHTMessage* message,
  280. struct RouterModule* module)
  281. {
  282. struct DHTMessage* query = message->replyTo;
  283. int64_t* versionPtr = Dict_getInt(query->asDict, CJDHTConstants_PROTOCOL);
  284. uint32_t version = (versionPtr && *versionPtr <= UINT32_MAX) ? *versionPtr : 0;
  285. struct NodeList* nodeList = NULL;
  286. String* queryType = Dict_getString(query->asDict, CJDHTConstants_QUERY);
  287. if (String_equals(queryType, CJDHTConstants_QUERY_FN)) {
  288. Log_debug(module->logger, "FindNode Query");
  289. // get the target
  290. String* target = Dict_getString(query->asDict, CJDHTConstants_TARGET);
  291. if (target == NULL || target->len != Address_SEARCH_TARGET_SIZE) {
  292. return 0;
  293. }
  294. struct Address targetAddr = { .path = 0 };
  295. Bits_memcpy(targetAddr.ip6.bytes, target->bytes, Address_SEARCH_TARGET_SIZE);
  296. // send the closest nodes
  297. nodeList = NodeStore_getClosestNodes(module->nodeStore,
  298. &targetAddr,
  299. RouterModule_K,
  300. version,
  301. message->allocator);
  302. } else if (String_equals(queryType, CJDHTConstants_QUERY_GP)) {
  303. Log_debug(module->logger, "GetPeers Query");
  304. // get the target
  305. String* target = Dict_getString(query->asDict, CJDHTConstants_TARGET);
  306. if (target == NULL || target->len != 8) {
  307. return 0;
  308. }
  309. uint64_t targetPath;
  310. Bits_memcpy(&targetPath, target->bytes, 8);
  311. targetPath = Endian_bigEndianToHost64(targetPath);
  312. nodeList =
  313. NodeStore_getPeers(targetPath, RouterModule_K, message->allocator, module->nodeStore);
  314. } else if (String_equals(queryType, CJDHTConstants_QUERY_NH)) {
  315. Log_debug(module->logger, "HN Query");
  316. // get the target
  317. String* target = Dict_getString(query->asDict, CJDHTConstants_TARGET);
  318. if (target == NULL || target->len != Address_SEARCH_TARGET_SIZE) {
  319. return 0;
  320. }
  321. struct Node_Two* nn = NodeStore_getBest(module->nodeStore, target->bytes);
  322. nodeList = Allocator_calloc(message->allocator, sizeof(struct NodeList), 1);
  323. if (nn) {
  324. nodeList->size = 1;
  325. nodeList->nodes = Allocator_calloc(message->allocator, sizeof(char*), 1);
  326. nodeList->nodes[0] = nn;
  327. }
  328. }
  329. return (nodeList) ? sendNodes(nodeList, message, module, version) : 0;
  330. }
  331. /**
  332. * We handle 2 kinds of packets on the outgoing.
  333. * 1. our requests
  334. * 2. our replies to others' requests.
  335. * Everything is tagged with our address, replies to requests which are not ping requests
  336. * will also be given a list of nodes.
  337. */
  338. static int handleOutgoing(struct DHTMessage* message, void* vcontext)
  339. {
  340. struct RouterModule* module = Identity_check((struct RouterModule*) vcontext);
  341. Dict_putInt(message->asDict,
  342. CJDHTConstants_PROTOCOL,
  343. Version_CURRENT_PROTOCOL,
  344. message->allocator);
  345. if (message->replyTo != NULL) {
  346. return handleQuery(message, module);
  347. }
  348. return 0;
  349. }
  350. struct PingContext
  351. {
  352. struct RouterModule_Promise pub;
  353. /** nonNull if this ping is part of a search. */
  354. struct SearchContext* search;
  355. struct RouterModule* router;
  356. struct Address address;
  357. /** The internal ping structure */
  358. struct Pinger_Ping* pp;
  359. /** A template of the message to be sent. */
  360. Dict* messageDict;
  361. Identity
  362. };
  363. static void sendMsg(String* txid, void* vpingContext)
  364. {
  365. struct PingContext* pc = Identity_check((struct PingContext*) vpingContext);
  366. // "t":"1234"
  367. Dict_putString(pc->messageDict, CJDHTConstants_TXID, txid, pc->pp->pingAlloc);
  368. struct Allocator* temp = Allocator_child(pc->pp->pingAlloc);
  369. Message_t* msg = Message_new(0, DHTMessage_MAX_SIZE + 512, temp);
  370. struct DHTMessage* dmesg = Allocator_calloc(temp, sizeof(struct DHTMessage), 1);
  371. dmesg->binMessage = msg;
  372. dmesg->address = &pc->address;
  373. dmesg->asDict = pc->messageDict;
  374. dmesg->allocator = temp;
  375. DHTModuleRegistry_handleOutgoing(dmesg, pc->router->registry);
  376. }
  377. static void onTimeout(uint32_t milliseconds, struct PingContext* pctx)
  378. {
  379. struct Node_Two* n = NodeStore_closestNode(pctx->router->nodeStore, pctx->address.path);
  380. // Ping timeout -> decrease reach
  381. if (n && !Bits_memcmp(pctx->address.key, n->address.key, 32)) {
  382. NodeStore_pathTimeout(pctx->router->nodeStore, pctx->address.path);
  383. }
  384. if (pctx->pub.callback) {
  385. pctx->pub.callback(&pctx->pub, milliseconds, NULL, NULL);
  386. }
  387. }
  388. static uint64_t pingTimeoutMilliseconds(struct RouterModule* module)
  389. {
  390. uint64_t out = AverageRoller_getAverage(module->gmrtRoller) * PING_TIMEOUT_GMRT_MULTIPLIER;
  391. out = (out < PING_TIMEOUT_MINIMUM) ? PING_TIMEOUT_MINIMUM : out;
  392. return (out > PING_TIMEOUT_MAXIMUM) ? PING_TIMEOUT_MAXIMUM : out;
  393. }
  394. /**
  395. * The only type of message we handle on the incoming side is
  396. * a response to one of our queries.
  397. */
  398. static int handleIncoming(struct DHTMessage* message, void* vcontext)
  399. {
  400. String* txid = Dict_getString(message->asDict, CJDHTConstants_TXID);
  401. String* query = Dict_getString(message->asDict, CJDHTConstants_QUERY);
  402. if (query || !txid) {
  403. return 0;
  404. }
  405. struct RouterModule* module = Identity_check((struct RouterModule*) vcontext);
  406. // This is retrieved below by onResponseOrTimeout()
  407. module->currentMessage = message;
  408. Pinger_pongReceived(txid, module->pinger);
  409. module->currentMessage = NULL;
  410. return 0;
  411. }
  412. // ping or search response came in
  413. static void onResponseOrTimeout(String* data, uint32_t milliseconds, void* vping)
  414. {
  415. struct PingContext* pctx = Identity_check((struct PingContext*) vping);
  416. struct RouterModule* module = pctx->router;
  417. module->pingsInFlight--;
  418. if (data == NULL) {
  419. // This is how Pinger signals a timeout.
  420. onTimeout(milliseconds, pctx);
  421. return;
  422. }
  423. // Grab out the message which was put here in handleIncoming()
  424. struct DHTMessage* message = module->currentMessage;
  425. module->currentMessage = NULL;
  426. // This should never happen
  427. if (!Address_isSameIp(&pctx->address, message->address)) {
  428. #ifdef Log_WARN
  429. uint8_t expectedAddr[60];
  430. Address_print(expectedAddr, &pctx->address);
  431. uint8_t receivedAddr[60];
  432. Address_print(receivedAddr, message->address);
  433. Log_warn(module->logger,
  434. "Got return packet from different address than search was sent!\n"
  435. "Expected:%s\n"
  436. " Got:%s\n",
  437. expectedAddr,
  438. receivedAddr);
  439. #endif
  440. return;
  441. }
  442. // update the GMRT
  443. AverageRoller_update(pctx->router->gmrtRoller, milliseconds);
  444. Log_debug(pctx->router->logger,
  445. "Received response in %u milliseconds, gmrt now %u\n",
  446. milliseconds,
  447. AverageRoller_getAverage(pctx->router->gmrtRoller));
  448. // prevent division by zero
  449. if (milliseconds == 0) { milliseconds++; }
  450. struct Node_Two* node = NodeStore_closestNode(module->nodeStore, message->address->path);
  451. if (node && !Bits_memcmp(node->address.key, message->address->key, 32)) {
  452. int64_t* dnd = Dict_getIntC(message->asDict, "dnd");
  453. if (dnd && *dnd) {
  454. // do not disturb
  455. // We'll still keep it in the table but we won't tell anybody about it
  456. node->dnd = 1;
  457. } else {
  458. node->dnd = 0;
  459. }
  460. NodeStore_pathResponse(module->nodeStore, message->address->path, milliseconds);
  461. } else {
  462. NodeStore_discoverNode(module->nodeStore,
  463. message->address,
  464. message->encodingScheme,
  465. message->encIndex,
  466. milliseconds);
  467. }
  468. #ifdef Log_DEBUG
  469. String* versionBin = Dict_getString(message->asDict, CJDHTConstants_VERSION);
  470. if (versionBin && versionBin->len == 20) {
  471. uint8_t printedAddr[60];
  472. Address_print(printedAddr, message->address);
  473. uint8_t versionStr[41];
  474. Hex_encode(versionStr, 41, (uint8_t*) versionBin->bytes, 20);
  475. Log_debug(module->logger, "Got pong! [%s] ver[%s]\n", printedAddr, versionStr);
  476. }
  477. #endif
  478. if (pctx->pub.callback) {
  479. pctx->pub.callback(&pctx->pub, milliseconds, message->address, message->asDict);
  480. }
  481. }
  482. struct RouterModule_Promise* RouterModule_newMessage(struct Address* addr,
  483. uint32_t timeoutMilliseconds,
  484. struct RouterModule* module,
  485. struct Allocator* alloc)
  486. {
  487. Assert_ifParanoid(addr->path ==
  488. EncodingScheme_convertLabel(module->nodeStore->selfNode->encodingScheme,
  489. addr->path,
  490. EncodingScheme_convertLabel_convertTo_CANNONICAL));
  491. Assert_true(addr->protocolVersion);
  492. module->pingsInFlight++;
  493. if (timeoutMilliseconds == 0) {
  494. timeoutMilliseconds = pingTimeoutMilliseconds(module);
  495. }
  496. Log_debug(module->logger, "Sending ping with [%u] millisecond timeout, [%u] in flight now",
  497. timeoutMilliseconds, module->pingsInFlight);
  498. struct Pinger_Ping* pp = Pinger_newPing(NULL,
  499. onResponseOrTimeout,
  500. sendMsg,
  501. timeoutMilliseconds,
  502. alloc,
  503. module->pinger);
  504. struct PingContext* pctx = Allocator_clone(pp->pingAlloc, (&(struct PingContext) {
  505. .pub = {
  506. .alloc = pp->pingAlloc
  507. },
  508. .router = module,
  509. .pp = pp
  510. }));
  511. Identity_set(pctx);
  512. Bits_memcpy(&pctx->address, addr, sizeof(struct Address));
  513. pp->context = pctx;
  514. return &pctx->pub;
  515. }
  516. void RouterModule_sendMessage(struct RouterModule_Promise* promise, Dict* request)
  517. {
  518. struct PingContext* pctx = Identity_check((struct PingContext*)promise);
  519. pctx->messageDict = request;
  520. // actual send is triggered asynchronously
  521. }
  522. struct RouterModule_Promise* RouterModule_pingNode(struct Address* addr,
  523. uint32_t timeoutMilliseconds,
  524. struct RouterModule* module,
  525. struct Allocator* alloc)
  526. {
  527. struct RouterModule_Promise* promise =
  528. RouterModule_newMessage(addr, timeoutMilliseconds, module, alloc);
  529. Dict* d = Dict_new(promise->alloc);
  530. Dict_putString(d, CJDHTConstants_QUERY, CJDHTConstants_QUERY_PING, promise->alloc);
  531. RouterModule_sendMessage(promise, d);
  532. #ifdef Log_DEBUG
  533. uint8_t buff[60];
  534. Address_print(buff, addr);
  535. Log_debug(module->logger, "Sending ping [%u] to [%s]",
  536. ((struct PingContext*)promise)->pp->handle, buff);
  537. #endif
  538. Assert_true(addr->path != 0);
  539. return promise;
  540. }
  541. struct RouterModule_Promise* RouterModule_nextHop(struct Address* whoToAsk,
  542. uint8_t target[16],
  543. uint32_t timeoutMilliseconds,
  544. struct RouterModule* module,
  545. struct Allocator* alloc)
  546. {
  547. struct RouterModule_Promise* promise =
  548. RouterModule_newMessage(whoToAsk, timeoutMilliseconds, module, alloc);
  549. Dict* d = Dict_new(promise->alloc);
  550. Dict_putString(d, CJDHTConstants_QUERY, CJDHTConstants_QUERY_NH, promise->alloc);
  551. String* targetStr = String_newBinary(target, 16, promise->alloc);
  552. Dict_putString(d, CJDHTConstants_TARGET, targetStr, promise->alloc);
  553. RouterModule_sendMessage(promise, d);
  554. return promise;
  555. }
  556. struct RouterModule_Promise* RouterModule_findNode(struct Address* whoToAsk,
  557. uint8_t target[16],
  558. uint32_t timeoutMilliseconds,
  559. struct RouterModule* module,
  560. struct Allocator* alloc)
  561. {
  562. struct RouterModule_Promise* promise =
  563. RouterModule_newMessage(whoToAsk, timeoutMilliseconds, module, alloc);
  564. Dict* d = Dict_new(promise->alloc);
  565. Dict_putString(d, CJDHTConstants_QUERY, CJDHTConstants_QUERY_FN, promise->alloc);
  566. String* targetStr = String_newBinary(target, 16, promise->alloc);
  567. Dict_putString(d, CJDHTConstants_TARGET, targetStr, promise->alloc);
  568. RouterModule_sendMessage(promise, d);
  569. return promise;
  570. }
  571. struct RouterModule_Promise* RouterModule_getPeers(struct Address* addr,
  572. uint64_t nearbyLabel,
  573. uint32_t timeoutMilliseconds,
  574. struct RouterModule* module,
  575. struct Allocator* alloc)
  576. {
  577. struct RouterModule_Promise* promise =
  578. RouterModule_newMessage(addr, timeoutMilliseconds, module, alloc);
  579. Dict* d = Dict_new(promise->alloc);
  580. Dict_putString(d, CJDHTConstants_QUERY, CJDHTConstants_QUERY_GP, promise->alloc);
  581. uint64_t nearbyLabel_be = Endian_hostToBigEndian64(nearbyLabel);
  582. uint8_t nearbyLabelBytes[8];
  583. Bits_memcpy(nearbyLabelBytes, &nearbyLabel_be, 8);
  584. String* target = String_newBinary(nearbyLabelBytes, 8, promise->alloc);
  585. Dict_putString(d, CJDHTConstants_TARGET, target, promise->alloc);
  586. RouterModule_sendMessage(promise, d);
  587. return promise;
  588. }
  589. struct Node_Two* RouterModule_nodeForPath(uint64_t path, struct RouterModule* module)
  590. {
  591. struct Node_Link* link = NodeStore_linkForPath(module->nodeStore, path);
  592. if (!link) { return NULL; }
  593. return link->child;
  594. }
  595. /*void RouterModule_brokenPath(const uint64_t path, struct RouterModule* module)
  596. {
  597. NodeStore_brokenPath(path, module->nodeStore);
  598. }*/
  599. uint32_t RouterModule_globalMeanResponseTime(struct RouterModule* module)
  600. {
  601. return (uint32_t) AverageRoller_getAverage(module->gmrtRoller);
  602. }
  603. void RouterModule_peerIsReachable(uint64_t pathToPeer,
  604. uint64_t lagMilliseconds,
  605. struct RouterModule* module)
  606. {
  607. Assert_ifParanoid(EncodingScheme_isOneHop(module->nodeStore->selfNode->encodingScheme,
  608. pathToPeer));
  609. struct Node_Two* nn = RouterModule_nodeForPath(pathToPeer, module);
  610. for (struct Node_Link* peerLink = nn->reversePeers; peerLink; peerLink = peerLink->nextPeer) {
  611. if (peerLink->parent != module->nodeStore->selfNode) { continue; }
  612. if (peerLink->cannonicalLabel != pathToPeer) { continue; }
  613. struct Address address = { .path = 0 };
  614. Bits_memcpy(&address, &nn->address, sizeof(struct Address));
  615. address.path = pathToPeer;
  616. NodeStore_discoverNode(module->nodeStore,
  617. &address,
  618. nn->encodingScheme,
  619. peerLink->inverseLinkEncodingFormNumber,
  620. lagMilliseconds);
  621. return;
  622. }
  623. Assert_true(0);
  624. }