Janitor.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. */
  15. #include "crypto/random/Random.h"
  16. #include "dht/Address.h"
  17. #include "dht/dhtcore/Janitor.h"
  18. #include "dht/dhtcore/Node.h"
  19. #include "dht/dhtcore/NodeList.h"
  20. #include "dht/dhtcore/RumorMill.h"
  21. #include "dht/dhtcore/RouterModule.h"
  22. #include "dht/dhtcore/SearchRunner.h"
  23. #include "dht/dhtcore/ReplySerializer.h"
  24. #include "benc/Object.h"
  25. #include "memory/Allocator.h"
  26. #include "util/AddrTools.h"
  27. #include "util/AverageRoller.h"
  28. #include "util/Bits.h"
  29. #include "util/events/EventBase.h"
  30. #include "util/Hex.h"
  31. #include "util/events/Timeout.h"
  32. #include "util/events/Time.h"
  33. #include "util/platform/libc/string.h"
  34. #include <stdint.h>
  35. #include <stdbool.h>
  36. /**
  37. * The goal of this is to run searches in the local area of this node.
  38. * it searches for hashes every localMaintainenceSearchPeriod milliseconds.
  39. * it runs searches by picking hashes at random, if a hash is chosen and there is a
  40. * non-zero-reach node which services that space, it stops. This way it will run many
  41. * searches early on but as the number of known nodes increases, it begins to taper off.
  42. */
  43. struct Janitor
  44. {
  45. struct RouterModule* routerModule;
  46. struct NodeStore* nodeStore;
  47. struct SearchRunner* searchRunner;
  48. struct RumorMill* rumorMill;
  49. struct RumorMill* nodesOfInterest;
  50. struct Timeout* timeout;
  51. struct Log* logger;
  52. uint64_t globalMaintainenceMilliseconds;
  53. uint64_t timeOfNextGlobalMaintainence;
  54. uint64_t localMaintainenceMilliseconds;
  55. struct Allocator* allocator;
  56. uint64_t timeOfNextSearchRepeat;
  57. uint64_t searchRepeatMilliseconds;
  58. struct EventBase* eventBase;
  59. struct Random* rand;
  60. /** Number of concurrent searches taking place. */
  61. int searches;
  62. Identity
  63. };
  64. struct Janitor_Search
  65. {
  66. struct Janitor* janitor;
  67. struct Address best;
  68. uint8_t target[16];
  69. struct Allocator* alloc;
  70. Identity
  71. };
  72. static void responseCallback(struct RouterModule_Promise* promise,
  73. uint32_t lagMilliseconds,
  74. struct Address* from,
  75. Dict* result)
  76. {
  77. struct Janitor_Search* search = Identity_check((struct Janitor_Search*)promise->userData);
  78. if (from) {
  79. Bits_memcpyConst(&search->best, from, sizeof(struct Address));
  80. return;
  81. }
  82. search->janitor->searches--;
  83. if (!search->best.path) {
  84. Log_debug(search->janitor->logger, "Search completed with no nodes found");
  85. }
  86. Allocator_free(search->alloc);
  87. }
  88. static void search(uint8_t target[16], struct Janitor* janitor)
  89. {
  90. if (janitor->searches >= 20) {
  91. Log_debug(janitor->logger, "Skipping search because 20 are in progress");
  92. return;
  93. }
  94. #ifdef Log_DEBUG
  95. uint8_t targetStr[40];
  96. AddrTools_printIp(targetStr, target);
  97. Log_debug(janitor->logger, "Beginning search for [%s]", targetStr);
  98. #endif
  99. struct Allocator* searchAlloc = Allocator_child(janitor->allocator);
  100. struct RouterModule_Promise* rp =
  101. SearchRunner_search(target, janitor->searchRunner, searchAlloc);
  102. if (!rp) {
  103. Log_debug(janitor->logger, "SearchRunner_search() returned NULL, probably full.");
  104. return;
  105. }
  106. janitor->searches++;
  107. struct Janitor_Search* search = Allocator_clone(rp->alloc, (&(struct Janitor_Search) {
  108. .janitor = janitor,
  109. .alloc = searchAlloc,
  110. }));
  111. Identity_set(search);
  112. Bits_memcpyConst(search->target, target, 16);
  113. rp->callback = responseCallback;
  114. rp->userData = search;
  115. }
  116. static void searchNoDupe(uint8_t target[Address_SEARCH_TARGET_SIZE], struct Janitor* janitor)
  117. {
  118. // See if we're already searching for this address.
  119. struct Allocator* seachListAlloc = Allocator_child(janitor->allocator);
  120. struct SearchRunner_SearchData* searchData;
  121. for (int i = 0; i < SearchRunner_DEFAULT_MAX_CONCURRENT_SEARCHES; i++) {
  122. searchData = SearchRunner_showActiveSearch(janitor->searchRunner,
  123. i,
  124. seachListAlloc);
  125. if (!searchData) { continue; }
  126. if (!Bits_memcmp(searchData->target, target, Address_SEARCH_TARGET_SIZE)) {
  127. // Already have a search going for this address, so nothing to do.
  128. Allocator_free(seachListAlloc);
  129. return;
  130. }
  131. }
  132. Allocator_free(seachListAlloc);
  133. // There's no search running for this address, so we start one.
  134. search(target, janitor);
  135. #ifdef Log_DEBUG
  136. uint8_t addrStr[40];
  137. AddrTools_printIp(addrStr, target);
  138. Log_debug(janitor->logger, "No active search for [%s], starting one.", addrStr);
  139. #endif
  140. }
  141. /**
  142. * For a Distributed Hash Table to work, each node must know a valid next hop in the search,
  143. * if such a thing exists.
  144. *
  145. * In a Kademlia DHT, can be done by organizing nodes into k-buckets. These are collections
  146. * of k nodes for which the first n bits of your node IDs match. Among other things, k-buckets
  147. * allow a node to identify holes in their lookup table and fill them. If the nth bucket is empty,
  148. * it means your node knows of no valid next hop for any key matching the first n bits of your
  149. * address and differing in bit n+1.
  150. *
  151. * Without going to the trouble of organizing nodes in the buckets, this function iterates
  152. * bitwise over keyspace, to identify the same kind of routing holes.
  153. * It then dispatches a search for the first (largest) such hole in keyspace that it finds.
  154. */
  155. static void plugLargestKeyspaceHole(struct Janitor* janitor)
  156. {
  157. struct Address addr = *janitor->nodeStore->selfAddress;
  158. // Get furthest possible address
  159. for (int i = 1 ; i < Address_SEARCH_TARGET_SIZE ; i++) {
  160. // Leaving [0] alone keeps the 0xfc around, so it's a valid cjdns address.
  161. addr.ip6.bytes[i] = ~(addr.ip6.bytes[i]);
  162. }
  163. int byte = 0;
  164. int bit = 0;
  165. for (int i = 0; i < 128 ; i++) {
  166. if (63 < i && i < 72) {
  167. // We want to leave the 0xfc alone
  168. continue;
  169. }
  170. if (i < 64) { byte = 8 + (i/8); }
  171. else { byte = (i/8) - 8; }
  172. bit = (i % 8);
  173. addr.ip6.bytes[byte] = addr.ip6.bytes[byte] ^ (0x01 << bit);
  174. struct Node_Two* n = RouterModule_lookup(addr.ip6.bytes, janitor->routerModule);
  175. if (!n) {
  176. // We found a hole! Exit loop and let the search trigger.
  177. break;
  178. }
  179. }
  180. searchNoDupe(addr.ip6.bytes, janitor);
  181. }
  182. static void peersResponseCallback(struct RouterModule_Promise* promise,
  183. uint32_t lagMilliseconds,
  184. struct Address* from,
  185. Dict* result)
  186. {
  187. struct Janitor* janitor = Identity_check((struct Janitor*)promise->userData);
  188. if (!from) { return; }
  189. struct Node_Link* fromLink = NodeStore_linkForPath(janitor->nodeStore, from->path);
  190. Assert_true(fromLink);
  191. struct Address_List* addresses =
  192. ReplySerializer_parse(from,
  193. fromLink->child->encodingScheme,
  194. fromLink->inverseLinkEncodingFormNumber,
  195. result,
  196. janitor->logger,
  197. promise->alloc);
  198. for (int i = 0; addresses && i < addresses->length; i++) {
  199. if (!NodeStore_linkForPath(janitor->nodeStore, addresses->elems[i].path)) {
  200. RumorMill_addNode(janitor->rumorMill, &addresses->elems[i]);
  201. }
  202. }
  203. }
  204. static void checkPeers(struct Janitor* janitor, struct Node_Two* n)
  205. {
  206. // Lets check for non-one-hop links at each node along the path between us and this node.
  207. uint32_t i = 0;
  208. for (;;i++) {
  209. struct Node_Link* link =
  210. NodeStore_getLinkOnPath(janitor->nodeStore, n->address.path, i);
  211. if (!link) { return; }
  212. if (link->parent == janitor->nodeStore->selfNode) { continue; }
  213. int count = NodeStore_linkCount(link->child);
  214. for (int j = 0; j < count; j++) {
  215. struct Node_Link* l = NodeStore_getLink(link->child, j);
  216. if (!Node_isOneHopLink(l) || link->parent->pathQuality == 0) {
  217. struct RouterModule_Promise* rp =
  218. RouterModule_getPeers(&link->parent->address, l->cannonicalLabel, 0,
  219. janitor->routerModule, janitor->allocator);
  220. rp->callback = peersResponseCallback;
  221. rp->userData = janitor;
  222. // Only send max 1 getPeers req per second.
  223. return;
  224. }
  225. }
  226. }
  227. }
  228. static void maintanenceCycle(void* vcontext)
  229. {
  230. struct Janitor* const janitor = Identity_check((struct Janitor*) vcontext);
  231. uint64_t now = Time_currentTimeMilliseconds(janitor->eventBase);
  232. uint64_t nextTimeout = (janitor->localMaintainenceMilliseconds / 2);
  233. nextTimeout += Random_uint32(janitor->rand) % (nextTimeout * 2);
  234. janitor->timeout = Timeout_setTimeout(maintanenceCycle,
  235. janitor,
  236. nextTimeout,
  237. janitor->eventBase,
  238. janitor->allocator);
  239. if (NodeStore_size(janitor->nodeStore) == 0 && janitor->rumorMill->count == 0) {
  240. if (now > janitor->timeOfNextGlobalMaintainence) {
  241. Log_warn(janitor->logger,
  242. "No nodes in routing table, check network connection and configuration.");
  243. janitor->timeOfNextGlobalMaintainence += janitor->globalMaintainenceMilliseconds;
  244. }
  245. return;
  246. }
  247. struct Address addr = { .protocolVersion = 0 };
  248. // ping a node from the ping queue
  249. if (RumorMill_getNode(janitor->rumorMill, &addr)) {
  250. addr.path = NodeStore_optimizePath(janitor->nodeStore, addr.path);
  251. if (NodeStore_optimizePath_INVALID != addr.path) {
  252. struct RouterModule_Promise* rp =
  253. RouterModule_getPeers(&addr,
  254. Random_uint32(janitor->rand),
  255. 0,
  256. janitor->routerModule,
  257. janitor->allocator);
  258. rp->callback = peersResponseCallback;
  259. rp->userData = janitor;
  260. #ifdef Log_DEBUG
  261. uint8_t addrStr[60];
  262. Address_print(addrStr, &addr);
  263. Log_debug(janitor->logger, "Pinging possible node [%s] from RumorMill", addrStr);
  264. #endif
  265. }
  266. }
  267. // This is good for DHT health. See function description.
  268. plugLargestKeyspaceHole(janitor);
  269. // Do something useful for a node we're actively trying to communicate with.
  270. if (RumorMill_getNode(janitor->nodesOfInterest, &addr)) {
  271. searchNoDupe(addr.ip6.bytes, janitor);
  272. }
  273. // random search
  274. Random_bytes(janitor->rand, addr.ip6.bytes, 16);
  275. // Make this a valid address.
  276. addr.ip6.bytes[0] = 0xfc;
  277. struct Node_Two* n = RouterModule_lookup(addr.ip6.bytes, janitor->routerModule);
  278. // If the best next node doesn't exist or has 0 reach, run a local maintenance search.
  279. if (n == NULL || n->pathQuality == 0) {
  280. search(addr.ip6.bytes, janitor);
  281. return;
  282. } else {
  283. checkPeers(janitor, n);
  284. }
  285. #ifdef Log_DEBUG
  286. int nonZeroNodes = NodeStore_nonZeroNodes(janitor->nodeStore);
  287. int total = NodeStore_size(janitor->nodeStore);
  288. Log_debug(janitor->logger,
  289. "Global Mean Response Time: %u non-zero nodes: [%d] zero nodes [%d] "
  290. "total [%d]",
  291. RouterModule_globalMeanResponseTime(janitor->routerModule),
  292. nonZeroNodes,
  293. (total - nonZeroNodes),
  294. total);
  295. #endif
  296. if (now > janitor->timeOfNextGlobalMaintainence) {
  297. search(addr.ip6.bytes, janitor);
  298. janitor->timeOfNextGlobalMaintainence += janitor->globalMaintainenceMilliseconds;
  299. }
  300. }
  301. struct Janitor* Janitor_new(uint64_t localMaintainenceMilliseconds,
  302. uint64_t globalMaintainenceMilliseconds,
  303. struct RouterModule* routerModule,
  304. struct NodeStore* nodeStore,
  305. struct SearchRunner* searchRunner,
  306. struct RumorMill* rumorMill,
  307. struct RumorMill* nodesOfInterest,
  308. struct Log* logger,
  309. struct Allocator* allocator,
  310. struct EventBase* eventBase,
  311. struct Random* rand)
  312. {
  313. struct Allocator* alloc = Allocator_child(allocator);
  314. struct Janitor* janitor = Allocator_clone(alloc, (&(struct Janitor) {
  315. .eventBase = eventBase,
  316. .routerModule = routerModule,
  317. .nodeStore = nodeStore,
  318. .searchRunner = searchRunner,
  319. .rumorMill = rumorMill,
  320. .nodesOfInterest = nodesOfInterest,
  321. .logger = logger,
  322. .globalMaintainenceMilliseconds = globalMaintainenceMilliseconds,
  323. .localMaintainenceMilliseconds = localMaintainenceMilliseconds,
  324. .allocator = alloc,
  325. .rand = rand
  326. }));
  327. Identity_set(janitor);
  328. janitor->timeOfNextGlobalMaintainence = Time_currentTimeMilliseconds(eventBase);
  329. janitor->timeout = Timeout_setTimeout(maintanenceCycle,
  330. janitor,
  331. localMaintainenceMilliseconds,
  332. eventBase,
  333. alloc);
  334. return janitor;
  335. }