Janitor.c 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577
  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. */
  15. #include "crypto/random/Random.h"
  16. #include "dht/Address.h"
  17. #include "dht/dhtcore/Janitor.h"
  18. #include "dht/dhtcore/Node.h"
  19. #include "dht/dhtcore/NodeList.h"
  20. #include "dht/dhtcore/RumorMill.h"
  21. #include "dht/dhtcore/RouterModule.h"
  22. #include "dht/dhtcore/SearchRunner.h"
  23. #include "dht/dhtcore/ReplySerializer.h"
  24. #include "benc/Object.h"
  25. #include "memory/Allocator.h"
  26. #include "util/AddrTools.h"
  27. #include "util/AverageRoller.h"
  28. #include "util/Bits.h"
  29. #include "util/events/EventBase.h"
  30. #include "util/Hex.h"
  31. #include "util/events/Timeout.h"
  32. #include "util/events/Time.h"
  33. #include "util/platform/libc/string.h"
  34. #include <stdint.h>
  35. #include <stdbool.h>
  36. #define MAX_SEARCHES 10
  37. /**
  38. * The goal of this is to run searches in the local area of this node.
  39. * it searches for hashes every localMaintainenceSearchPeriod milliseconds.
  40. * it runs searches by picking hashes at random, if a hash is chosen and there is a
  41. * non-zero-reach node which services that space, it stops. This way it will run many
  42. * searches early on but as the number of known nodes increases, it begins to taper off.
  43. */
  44. struct Janitor
  45. {
  46. struct RouterModule* routerModule;
  47. struct NodeStore* nodeStore;
  48. struct SearchRunner* searchRunner;
  49. // Externally accessible RumorMill.
  50. // Used for direct peers and search results that are closer than the responder.
  51. struct RumorMill* rumorMill;
  52. // High priority RumorMill.
  53. // Used when the response could help split non-one-hop links.
  54. struct RumorMill* splitMill;
  55. // Low priority RumorMill.
  56. // Used to explore physically nearby nodes. By far the most used mill.
  57. struct RumorMill* idleMill;
  58. struct Timeout* timeout;
  59. struct Log* logger;
  60. uint64_t globalMaintainenceMilliseconds;
  61. uint64_t timeOfNextGlobalMaintainence;
  62. uint64_t localMaintainenceMilliseconds;
  63. struct Allocator* allocator;
  64. uint64_t timeOfNextSearchRepeat;
  65. uint64_t searchRepeatMilliseconds;
  66. struct EventBase* eventBase;
  67. struct Random* rand;
  68. /** Number of concurrent searches taking place. */
  69. int searches;
  70. // Used to keep dht healthy
  71. uint8_t keyspaceMaintainenceCounter;
  72. uint8_t keyspaceHoleDepthCounter;
  73. Identity
  74. };
  75. struct Janitor_Search
  76. {
  77. struct Janitor* janitor;
  78. struct Address best;
  79. uint8_t target[16];
  80. struct Allocator* alloc;
  81. Identity
  82. };
  83. static void responseCallback(struct RouterModule_Promise* promise,
  84. uint32_t lagMilliseconds,
  85. struct Address* from,
  86. Dict* result)
  87. {
  88. struct Janitor_Search* search = Identity_check((struct Janitor_Search*)promise->userData);
  89. if (from) {
  90. Bits_memcpyConst(&search->best, from, sizeof(struct Address));
  91. return;
  92. }
  93. search->janitor->searches--;
  94. if (!search->best.path) {
  95. Log_debug(search->janitor->logger, "Search completed with no nodes found");
  96. }
  97. Allocator_free(search->alloc);
  98. }
  99. static void search(uint8_t target[16], struct Janitor* janitor)
  100. {
  101. if (janitor->searches >= MAX_SEARCHES) {
  102. Log_debug(janitor->logger, "Skipping search because 20 are in progress");
  103. return;
  104. }
  105. #ifdef Log_DEBUG
  106. uint8_t targetStr[40];
  107. AddrTools_printIp(targetStr, target);
  108. Log_debug(janitor->logger, "Beginning search for [%s]", targetStr);
  109. #endif
  110. struct Allocator* searchAlloc = Allocator_child(janitor->allocator);
  111. struct RouterModule_Promise* rp =
  112. SearchRunner_search(target, janitor->searchRunner, searchAlloc);
  113. if (!rp) {
  114. Log_debug(janitor->logger, "SearchRunner_search() returned NULL, probably full.");
  115. Allocator_free(searchAlloc);
  116. return;
  117. }
  118. janitor->searches++;
  119. struct Janitor_Search* search = Allocator_clone(rp->alloc, (&(struct Janitor_Search) {
  120. .janitor = janitor,
  121. .alloc = searchAlloc,
  122. }));
  123. Identity_set(search);
  124. Bits_memcpyConst(search->target, target, 16);
  125. rp->callback = responseCallback;
  126. rp->userData = search;
  127. }
  128. static void searchNoDupe(uint8_t target[Address_SEARCH_TARGET_SIZE], struct Janitor* janitor)
  129. {
  130. // See if we're already searching for this address.
  131. struct Allocator* seachListAlloc = Allocator_child(janitor->allocator);
  132. struct SearchRunner_SearchData* searchData;
  133. for (int i = 0; i < SearchRunner_DEFAULT_MAX_CONCURRENT_SEARCHES; i++) {
  134. searchData = SearchRunner_showActiveSearch(janitor->searchRunner,
  135. i,
  136. seachListAlloc);
  137. if (!searchData) { continue; }
  138. if (!Bits_memcmp(searchData->target, target, Address_SEARCH_TARGET_SIZE)) {
  139. // Already have a search going for this address, so nothing to do.
  140. Allocator_free(seachListAlloc);
  141. return;
  142. }
  143. }
  144. Allocator_free(seachListAlloc);
  145. // There's no search running for this address, so we start one.
  146. search(target, janitor);
  147. #ifdef Log_DEBUG
  148. uint8_t addrStr[40];
  149. AddrTools_printIp(addrStr, target);
  150. Log_debug(janitor->logger, "No active search for [%s], starting one.", addrStr);
  151. #endif
  152. }
  153. /**
  154. * For a Distributed Hash Table to work, each node must know a valid next hop in the search,
  155. * if such a thing exists.
  156. *
  157. * In a Kademlia DHT, can be done by organizing nodes into k-buckets. These are collections
  158. * of k nodes for which the first n bits of your node IDs match. Among other things, k-buckets
  159. * allow a node to identify holes in their lookup table and fill them. If the nth bucket is empty,
  160. * it means your node knows of no valid next hop for any key matching the first n bits of your
  161. * address and differing in bit n+1.
  162. *
  163. * Without going to the trouble of organizing nodes in the buckets, this function iterates
  164. * bitwise over keyspace, to identify the same kind of routing holes.
  165. * It then dispatches a search for the first (largest) such hole in keyspace that it finds.
  166. */
  167. static void plugLargestKeyspaceHole(struct Janitor* janitor, bool force)
  168. {
  169. struct Address addr = *janitor->nodeStore->selfAddress;
  170. int byte = 0;
  171. int bit = 0;
  172. uint8_t holeDepth = 0;
  173. for (uint8_t i = 0; i < 128 ; i++) {
  174. // Bitwise walk across keyspace
  175. if (63 < i && i < 72) {
  176. // We want to leave the 0xfc alone
  177. continue;
  178. }
  179. // Figure out which bit of the address to flip for this step in keyspace.
  180. // This looks ugly because of the rot64 done in distance calculations.
  181. if (i < 64) { byte = 8 + (i/8); }
  182. else { byte = (i/8) - 8; }
  183. bit = (i % 8);
  184. // Flip that bit.
  185. addr.ip6.bytes[byte] = addr.ip6.bytes[byte] ^ (0x80 >> bit);
  186. // See if we know a valid next hop.
  187. struct Node_Two* n = RouterModule_lookup(addr.ip6.bytes, janitor->routerModule);
  188. if (n) {
  189. // We do know a valid next hop, so flip the bit back and continue.
  190. addr.ip6.bytes[byte] = addr.ip6.bytes[byte] ^ (0x80 >> bit);
  191. continue;
  192. }
  193. // We found a hole! Exit loop and let the search trigger.
  194. holeDepth = i;
  195. break;
  196. }
  197. // Search for a node that satisfies the address requirements to fill the hole.
  198. if (holeDepth != janitor->keyspaceHoleDepthCounter || force) {
  199. Log_debug(janitor->logger, "Setting keyspaceHoleDepthCounter [%u]", holeDepth);
  200. janitor->keyspaceHoleDepthCounter = holeDepth;
  201. searchNoDupe(addr.ip6.bytes, janitor);
  202. }
  203. }
  204. // Counterpart to plugLargestKeyspaceHole, used to refresh reach of known routes with a search.
  205. // This also finds redundant routes for that area of keyspace, which helps the DHT some.
  206. static void keyspaceMaintainence(struct Janitor* janitor)
  207. {
  208. struct Address addr = *janitor->nodeStore->selfAddress;
  209. int byte = 0;
  210. int bit = 0;
  211. // Restart cycle if we've already finished it.
  212. if (janitor->keyspaceMaintainenceCounter > 127) {
  213. janitor->keyspaceMaintainenceCounter = 0;
  214. }
  215. for (;janitor->keyspaceMaintainenceCounter < 128;
  216. janitor->keyspaceMaintainenceCounter++) {
  217. // Just to make referring to this thing quicker
  218. int i = janitor->keyspaceMaintainenceCounter;
  219. if (63 < i && i < 72) {
  220. // We want to leave the 0xfc alone
  221. continue;
  222. }
  223. // Figure out which bit of the address to flip for this step in keyspace.
  224. // This looks ugly because of the rot64 done in distance calculations.
  225. if (i < 64) { byte = 8 + (i/8); }
  226. else { byte = (i/8) - 8; }
  227. bit = (i % 8);
  228. // Flip that bit.
  229. addr.ip6.bytes[byte] = addr.ip6.bytes[byte] ^ (0x80 >> bit);
  230. // See if we know a valid next hop.
  231. struct Node_Two* n = RouterModule_lookup(addr.ip6.bytes, janitor->routerModule);
  232. if (n) {
  233. // Start the next search 1 step further into keyspace.
  234. janitor->keyspaceMaintainenceCounter = i+1;
  235. break;
  236. }
  237. // Clean up address and move further into keyspace.
  238. addr.ip6.bytes[byte] = addr.ip6.bytes[byte] ^ (0x80 >> bit);
  239. continue;
  240. }
  241. // Search for a node that satisfies the address requirements to fill the hole.
  242. // Should end up self-searching in the event that we're all the way through keyspace.
  243. searchNoDupe(addr.ip6.bytes, janitor);
  244. }
  245. static void peersResponseCallback(struct RouterModule_Promise* promise,
  246. uint32_t lagMilliseconds,
  247. struct Address* from,
  248. Dict* result)
  249. {
  250. struct Janitor* janitor = Identity_check((struct Janitor*)promise->userData);
  251. if (!from) { return; }
  252. struct Address_List* addresses =
  253. ReplySerializer_parse(from, result, janitor->logger, promise->alloc);
  254. struct Node_Two* parent = NodeStore_nodeForAddr(janitor->nodeStore, from->ip6.bytes);
  255. if (!parent) { return; }
  256. // Figure out if this node has any split-able links.
  257. bool hasSplitableLinks = false;
  258. struct Node_Link* link = NodeStore_nextLink(parent, NULL);
  259. while (link) {
  260. if (!Node_isOneHopLink(link)) {
  261. hasSplitableLinks = true;
  262. break;
  263. }
  264. link = NodeStore_nextLink(parent, link);
  265. }
  266. int loopCount = 0;
  267. for (int i = 0; addresses && i < addresses->length; i++) {
  268. struct Node_Link* nl = NodeStore_linkForPath(janitor->nodeStore, addresses->elems[i].path);
  269. if (!nl) {
  270. addresses->elems[i].path = NodeStore_optimizePath(janitor->nodeStore,
  271. addresses->elems[i].path);
  272. if (hasSplitableLinks) {
  273. RumorMill_addNode(janitor->splitMill, &addresses->elems[i]);
  274. } else {
  275. RumorMill_addNode(janitor->idleMill, &addresses->elems[i]);
  276. }
  277. } else if (!Address_isSameIp(&addresses->elems[i], &nl->child->address)) {
  278. // they're telling us about themselves, how helpful...
  279. if (nl && nl->child == parent) { continue; }
  280. if (nl->parent != parent) {
  281. #ifdef Log_INFO
  282. uint8_t newAddr[60];
  283. Address_print(newAddr, from);
  284. uint8_t labelStr[20];
  285. AddrTools_printPath(labelStr, nl->cannonicalLabel);
  286. Log_info(janitor->logger, "Apparently [%s] reported [%s] as it's peer",
  287. newAddr, labelStr);
  288. #endif
  289. continue;
  290. }
  291. #ifdef Log_INFO
  292. uint8_t newAddr[60];
  293. Address_print(newAddr, from);
  294. Log_info(janitor->logger, "Apparently [%s] has renumbered it's switch", newAddr);
  295. #endif
  296. link = NodeStore_nextLink(parent, NULL);
  297. while (link) {
  298. struct Node_Link* nextLink = NodeStore_nextLink(parent, link);
  299. NodeStore_unlinkNodes(janitor->nodeStore, link);
  300. link = nextLink;
  301. // restart from the beginning...
  302. i = 0;
  303. Assert_true(!loopCount);
  304. }
  305. Assert_true(!NodeStore_nextLink(parent, NULL));
  306. loopCount++;
  307. }
  308. }
  309. }
  310. static void checkPeers(struct Janitor* janitor, struct Node_Two* n)
  311. {
  312. // Lets check for non-one-hop links at each node along the path between us and this node.
  313. uint64_t path = n->address.path;
  314. struct Node_Link* link = NULL;
  315. for (;;) {
  316. link = NodeStore_firstHopInPath(janitor->nodeStore, path, &path, link);
  317. if (!link) { return; }
  318. if (link->parent == janitor->nodeStore->selfNode) { continue; }
  319. struct Node_Link* l = NULL;
  320. do {
  321. l = NodeStore_nextLink(link->child, l);
  322. if (l && (!Node_isOneHopLink(l) || Node_getReach(link->parent) == 0)) {
  323. struct RouterModule_Promise* rp =
  324. RouterModule_getPeers(&link->parent->address, l->cannonicalLabel, 0,
  325. janitor->routerModule, janitor->allocator);
  326. rp->callback = peersResponseCallback;
  327. rp->userData = janitor;
  328. // Only send max 1 getPeers req per second.
  329. return;
  330. }
  331. } while (l);
  332. }
  333. }
  334. // Iterate over all nodes in the table. Try to split any split-able links.
  335. static void splitLinks(struct Janitor* janitor)
  336. {
  337. uint32_t index = 0;
  338. struct Node_Two* node = NodeStore_dumpTable(janitor->nodeStore, index);
  339. while (node) {
  340. struct Node_Link* bestParent = Node_getBestParent(node);
  341. if (bestParent) {
  342. struct Node_Link* link = NodeStore_nextLink(node, NULL);
  343. while (link) {
  344. if (!Node_isOneHopLink(link)) {
  345. RumorMill_addNode(janitor->splitMill, &node->address);
  346. break;
  347. }
  348. link = NodeStore_nextLink(node, link);
  349. }
  350. }
  351. index++;
  352. node = NodeStore_dumpTable(janitor->nodeStore, index);
  353. }
  354. }
  355. static void maintanenceCycle(void* vcontext)
  356. {
  357. struct Janitor* const janitor = Identity_check((struct Janitor*) vcontext);
  358. uint64_t now = Time_currentTimeMilliseconds(janitor->eventBase);
  359. uint64_t nextTimeout = (janitor->localMaintainenceMilliseconds / 2);
  360. nextTimeout += Random_uint32(janitor->rand) % (nextTimeout * 2);
  361. Timeout_resetTimeout(janitor->timeout, nextTimeout);
  362. if (janitor->nodeStore->nodeCount == 0 && janitor->rumorMill->count == 0) {
  363. if (now > janitor->timeOfNextGlobalMaintainence) {
  364. Log_warn(janitor->logger,
  365. "No nodes in routing table, check network connection and configuration.");
  366. janitor->timeOfNextGlobalMaintainence += janitor->globalMaintainenceMilliseconds;
  367. }
  368. return;
  369. }
  370. struct Address addr = { .protocolVersion = 0 };
  371. if (RumorMill_getNode(janitor->splitMill, &addr)) {
  372. // ping a link-splitting node from the high-priority ping queue
  373. addr.path = NodeStore_optimizePath(janitor->nodeStore, addr.path);
  374. if (NodeStore_optimizePath_INVALID != addr.path) {
  375. struct RouterModule_Promise* rp =
  376. RouterModule_getPeers(&addr,
  377. Random_uint32(janitor->rand),
  378. 0,
  379. janitor->routerModule,
  380. janitor->allocator);
  381. rp->callback = peersResponseCallback;
  382. rp->userData = janitor;
  383. #ifdef Log_DEBUG
  384. uint8_t addrStr[60];
  385. Address_print(addrStr, &addr);
  386. Log_debug(janitor->logger, "Pinging possible node [%s] from "
  387. "priority RumorMill", addrStr);
  388. #endif
  389. }
  390. } else if (RumorMill_getNode(janitor->rumorMill, &addr)) {
  391. // ping a node from the ping normal-priority queue
  392. addr.path = NodeStore_optimizePath(janitor->nodeStore, addr.path);
  393. if (NodeStore_optimizePath_INVALID != addr.path) {
  394. struct RouterModule_Promise* rp =
  395. RouterModule_getPeers(&addr,
  396. Random_uint32(janitor->rand),
  397. 0,
  398. janitor->routerModule,
  399. janitor->allocator);
  400. rp->callback = peersResponseCallback;
  401. rp->userData = janitor;
  402. #ifdef Log_DEBUG
  403. uint8_t addrStr[60];
  404. Address_print(addrStr, &addr);
  405. Log_debug(janitor->logger, "Pinging possible node [%s] from "
  406. "normal RumorMill", addrStr);
  407. #endif
  408. }
  409. } else if (RumorMill_getNode(janitor->idleMill, &addr)) {
  410. // ping a node from the low-priority ping queue
  411. addr.path = NodeStore_optimizePath(janitor->nodeStore, addr.path);
  412. if (NodeStore_optimizePath_INVALID != addr.path) {
  413. struct RouterModule_Promise* rp =
  414. RouterModule_getPeers(&addr,
  415. Random_uint32(janitor->rand),
  416. 0,
  417. janitor->routerModule,
  418. janitor->allocator);
  419. rp->callback = peersResponseCallback;
  420. rp->userData = janitor;
  421. #ifdef Log_DEBUG
  422. uint8_t addrStr[60];
  423. Address_print(addrStr, &addr);
  424. Log_debug(janitor->logger, "Pinging possible node [%s] from "
  425. "idle RumorMill", addrStr);
  426. #endif
  427. }
  428. }
  429. // random search
  430. Random_bytes(janitor->rand, addr.ip6.bytes, 16);
  431. // Make this a valid address.
  432. addr.ip6.bytes[0] = 0xfc;
  433. struct Node_Two* n = RouterModule_lookup(addr.ip6.bytes, janitor->routerModule);
  434. // If the best next node doesn't exist or has 0 reach, run a local maintenance search.
  435. if (n == NULL || Node_getReach(n) == 0) {
  436. //search(addr.ip6.bytes, janitor);
  437. plugLargestKeyspaceHole(janitor, true);
  438. return;
  439. } else {
  440. checkPeers(janitor, n);
  441. }
  442. plugLargestKeyspaceHole(janitor, false);
  443. Log_debug(janitor->logger,
  444. "Global Mean Response Time: %u nodes [%d] links [%d]",
  445. RouterModule_globalMeanResponseTime(janitor->routerModule),
  446. janitor->nodeStore->nodeCount,
  447. janitor->nodeStore->linkCount);
  448. if (now > janitor->timeOfNextGlobalMaintainence) {
  449. //search(addr.ip6.bytes, janitor);
  450. plugLargestKeyspaceHole(janitor, true);
  451. keyspaceMaintainence(janitor);
  452. splitLinks(janitor);
  453. janitor->timeOfNextGlobalMaintainence += janitor->globalMaintainenceMilliseconds;
  454. }
  455. }
  456. struct Janitor* Janitor_new(uint64_t localMaintainenceMilliseconds,
  457. uint64_t globalMaintainenceMilliseconds,
  458. struct RouterModule* routerModule,
  459. struct NodeStore* nodeStore,
  460. struct SearchRunner* searchRunner,
  461. struct RumorMill* rumorMill,
  462. struct Log* logger,
  463. struct Allocator* allocator,
  464. struct EventBase* eventBase,
  465. struct Random* rand)
  466. {
  467. struct Allocator* alloc = Allocator_child(allocator);
  468. struct Janitor* janitor = Allocator_clone(alloc, (&(struct Janitor) {
  469. .eventBase = eventBase,
  470. .routerModule = routerModule,
  471. .nodeStore = nodeStore,
  472. .searchRunner = searchRunner,
  473. .rumorMill = rumorMill,
  474. .logger = logger,
  475. .globalMaintainenceMilliseconds = globalMaintainenceMilliseconds,
  476. .localMaintainenceMilliseconds = localMaintainenceMilliseconds,
  477. .keyspaceMaintainenceCounter = 0,
  478. .keyspaceHoleDepthCounter = 0,
  479. .allocator = alloc,
  480. .rand = rand
  481. }));
  482. Identity_set(janitor);
  483. janitor->splitMill = RumorMill_new(janitor->allocator, janitor->nodeStore->selfAddress, 16);
  484. janitor->idleMill = RumorMill_new(janitor->allocator, janitor->nodeStore->selfAddress, 64);
  485. janitor->timeOfNextGlobalMaintainence = Time_currentTimeMilliseconds(eventBase);
  486. janitor->timeout = Timeout_setTimeout(maintanenceCycle,
  487. janitor,
  488. localMaintainenceMilliseconds,
  489. eventBase,
  490. alloc);
  491. return janitor;
  492. }