1
0

SearchRunner.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. */
  15. #include "dht/Address.h"
  16. #include "dht/dhtcore/SearchRunner.h"
  17. #include "dht/dhtcore/SearchStore.h"
  18. #include "dht/dhtcore/RumorMill.h"
  19. #include "dht/dhtcore/RouterModule.h"
  20. #include "dht/dhtcore/ReplySerializer.h"
  21. #include "dht/dhtcore/NodeStore.h"
  22. #include "dht/dhtcore/NodeList.h"
  23. #include "dht/CJDHTConstants.h"
  24. #include "util/Identity.h"
  25. #include "util/Bits.h"
  26. #include "util/log/Log.h"
  27. #include "util/events/EventBase.h"
  28. #include "util/events/Timeout.h"
  29. #include "util/version/Version.h"
  30. struct SearchRunner_pvt
  31. {
  32. struct SearchRunner pub;
  33. struct SearchStore* searchStore;
  34. struct NodeStore* nodeStore;
  35. struct Log* logger;
  36. struct EventBase* eventBase;
  37. struct RouterModule* router;
  38. struct RumorMill* rumorMill;
  39. uint8_t myAddress[16];
  40. /** Number of concurrent searches in operation. */
  41. int searches;
  42. /** Maximum number of concurrent searches allowed. */
  43. int maxConcurrentSearches;
  44. /** Beginning of a linked list of searches. */
  45. struct SearchRunner_Search* firstSearch;
  46. Identity
  47. };
  48. /**
  49. * A context for the internals of a search.
  50. */
  51. struct SearchRunner_Search;
  52. struct SearchRunner_Search
  53. {
  54. struct RouterModule_Promise pub;
  55. /** The router module carrying out the search. */
  56. struct SearchRunner_pvt* const runner;
  57. /** The number of requests which have been sent out so far for this search. */
  58. uint32_t totalRequests;
  59. /** Maximum number of requests to make before terminating the search. */
  60. uint32_t maxRequests;
  61. uint32_t maxRequestsIfFound;
  62. uint32_t numFinds;
  63. /** The address which we are searching for. */
  64. struct Address target;
  65. /** String form of the 16 byte ipv6 address. */
  66. String* targetStr;
  67. /**
  68. * The SearchStore_Search structure for this search,
  69. * used to keep track of which nodes are participating.
  70. */
  71. struct SearchStore_Search* search;
  72. /** The last node sent a search request. */
  73. struct Address lastNodeAsked;
  74. /**
  75. * The timeout if this timeout is hit then the search will continue
  76. * but the node will still be allowed to respond and it will be counted as a pong.
  77. */
  78. struct Timeout* continueSearchTimeout;
  79. /** Next search in the linked list. */
  80. struct SearchRunner_Search* nextSearch;
  81. /** Self pointer for this search so that the search can be removed from the linked list. */
  82. struct SearchRunner_Search** thisSearch;
  83. Identity
  84. };
  85. /**
  86. * Spot a duplicate entry in a node list.
  87. * If a router sends a response containing duplicate entries,
  88. * only the last (best) entry should be accepted.
  89. *
  90. * @param nodes the list of nodes.
  91. * @param index the index of the entry to check for being a duplicate.
  92. * @return true if duplicate, otherwise false.
  93. */
  94. static inline bool isDuplicateEntry(struct Address_List* list, uint32_t index)
  95. {
  96. for (int i = index+1; i < list->length; i++) {
  97. if (Bits_memcmp(&list->elems[i].key, &list->elems[i].key, Address_KEY_SIZE) == 0) {
  98. return true;
  99. }
  100. }
  101. return false;
  102. }
  103. static void searchStep(struct SearchRunner_Search* search);
  104. static void searchReplyCallback(struct RouterModule_Promise* promise,
  105. uint32_t lagMilliseconds,
  106. struct Address* from,
  107. Dict* result)
  108. {
  109. struct SearchRunner_Search* search =
  110. Identity_check((struct SearchRunner_Search*)promise->userData);
  111. if (!Bits_memcmp(from->ip6.bytes, search->lastNodeAsked.ip6.bytes, 16)) {
  112. Timeout_resetTimeout(search->continueSearchTimeout,
  113. RouterModule_searchTimeoutMilliseconds(search->runner->router));
  114. }
  115. if (!Bits_memcmp(from->ip6.bytes, search->target.ip6.bytes, 16)) {
  116. search->numFinds++;
  117. }
  118. struct Address_List* nodeList =
  119. ReplySerializer_parse(from, result, search->runner->logger, true, promise->alloc);
  120. struct Address* best = NULL;
  121. for (int i = 0; nodeList && i < nodeList->length; i++) {
  122. if (isDuplicateEntry(nodeList, i)) {
  123. continue;
  124. }
  125. if (Address_closest(&search->target, &nodeList->elems[i], from) >= 0) {
  126. // Too much noise.
  127. //Log_debug(search->runner->logger, "Answer was further from the target than us.\n");
  128. continue;
  129. }
  130. if (search->lastNodeAsked.path != from->path) {
  131. // old queries coming in late...
  132. continue;
  133. }
  134. struct Node_Two* nn =
  135. NodeStore_getBest(search->runner->nodeStore, nodeList->elems[i].ip6.bytes);
  136. if (!nn) {
  137. RumorMill_addNode(search->runner->rumorMill, &nodeList->elems[i]);
  138. }
  139. //nodeList->elems[i].path =
  140. // NodeStore_optimizePath(search->runner->nodeStore, nodeList->elems[i].path);
  141. if (!Bits_memcmp(nodeList->elems[i].ip6.bytes, search->target.ip6.bytes, 16)) {
  142. if (!best) {
  143. best = &nodeList->elems[i];
  144. continue;
  145. } else if (nodeList->elems[i].path < best->path) {
  146. SearchStore_addNodeToSearch(best, search->search);
  147. best = &nodeList->elems[i];
  148. continue;
  149. }
  150. }
  151. SearchStore_addNodeToSearch(&nodeList->elems[i], search->search);
  152. }
  153. if (best) {
  154. SearchStore_addNodeToSearch(best, search->search);
  155. }
  156. }
  157. static void searchCallback(struct RouterModule_Promise* promise,
  158. uint32_t lagMilliseconds,
  159. struct Address* from,
  160. Dict* result)
  161. {
  162. struct SearchRunner_Search* search =
  163. Identity_check((struct SearchRunner_Search*)promise->userData);
  164. if (from) {
  165. searchReplyCallback(promise, lagMilliseconds, from, result);
  166. }
  167. if (search->pub.callback) {
  168. search->pub.callback(&search->pub, lagMilliseconds, from, result);
  169. }
  170. searchStep(search);
  171. }
  172. /**
  173. * Send a search request to the next node in this search.
  174. * This is called whenever a response comes in or after the global mean response time passes.
  175. */
  176. static void searchStep(struct SearchRunner_Search* search)
  177. {
  178. struct SearchRunner_pvt* ctx = Identity_check((struct SearchRunner_pvt*)search->runner);
  179. struct SearchStore_Node* nextSearchNode;
  180. for (;;) {
  181. nextSearchNode = SearchStore_getNextNode(search->search);
  182. // If the number of requests sent has exceeded the max search requests, let's stop there.
  183. if (search->totalRequests >= search->maxRequests) {
  184. // fallthrough
  185. } else if (search->numFinds > 0 && search->totalRequests >= search->maxRequestsIfFound) {
  186. // fallthrough
  187. } else if (nextSearchNode == NULL) {
  188. // fallthrough
  189. } else {
  190. break;
  191. }
  192. if (search->pub.callback) {
  193. search->pub.callback(&search->pub, 0, NULL, NULL);
  194. }
  195. Allocator_free(search->pub.alloc);
  196. return;
  197. }
  198. Bits_memcpy(&search->lastNodeAsked, &nextSearchNode->address, sizeof(struct Address));
  199. struct RouterModule_Promise* rp =
  200. RouterModule_newMessage(&nextSearchNode->address, 0, ctx->router, search->pub.alloc);
  201. Dict* message = Dict_new(rp->alloc);
  202. if (!Bits_memcmp(nextSearchNode->address.ip6.bytes, search->target.ip6.bytes, 16)) {
  203. Dict_putString(message, CJDHTConstants_QUERY, CJDHTConstants_QUERY_GP, rp->alloc);
  204. } else {
  205. Dict_putString(message, CJDHTConstants_QUERY, CJDHTConstants_QUERY_FN, rp->alloc);
  206. }
  207. Dict_putString(message, CJDHTConstants_TARGET, search->targetStr, rp->alloc);
  208. rp->userData = search;
  209. rp->callback = searchCallback;
  210. RouterModule_sendMessage(rp, message);
  211. search->totalRequests++;
  212. }
  213. // Triggered by a search timeout (the message may still come back and will be treated as a ping)
  214. static void searchNextNode(void* vsearch)
  215. {
  216. struct SearchRunner_Search* search = Identity_check((struct SearchRunner_Search*) vsearch);
  217. // Timeout for trying the next node.
  218. Timeout_resetTimeout(search->continueSearchTimeout,
  219. RouterModule_searchTimeoutMilliseconds(search->runner->router));
  220. searchStep(search);
  221. }
  222. static int searchOnFree(struct Allocator_OnFreeJob* job)
  223. {
  224. struct SearchRunner_Search* search =
  225. Identity_check((struct SearchRunner_Search*)job->userData);
  226. *search->thisSearch = search->nextSearch;
  227. if (search->nextSearch) {
  228. search->nextSearch->thisSearch = search->thisSearch;
  229. }
  230. Assert_true(search->runner->searches > 0);
  231. search->runner->searches--;
  232. return 0;
  233. }
  234. struct SearchRunner_SearchData* SearchRunner_showActiveSearch(struct SearchRunner* searchRunner,
  235. int number,
  236. struct Allocator* alloc)
  237. {
  238. struct SearchRunner_pvt* runner = Identity_check((struct SearchRunner_pvt*)searchRunner);
  239. struct SearchRunner_Search* search = runner->firstSearch;
  240. while (search && number > 0) {
  241. search = search->nextSearch;
  242. number--;
  243. }
  244. struct SearchRunner_SearchData* out =
  245. Allocator_calloc(alloc, sizeof(struct SearchRunner_SearchData), 1);
  246. if (search) {
  247. Bits_memcpy(out->target, &search->target.ip6.bytes, 16);
  248. Bits_memcpy(&out->lastNodeAsked, &search->lastNodeAsked, sizeof(struct Address));
  249. out->totalRequests = search->totalRequests;
  250. }
  251. out->activeSearches = runner->searches;
  252. return out;
  253. }
  254. struct RouterModule_Promise* SearchRunner_search(uint8_t target[16],
  255. int maxRequests,
  256. int maxRequestsIfFound,
  257. struct SearchRunner* searchRunner,
  258. struct Allocator* allocator)
  259. {
  260. struct SearchRunner_pvt* runner = Identity_check((struct SearchRunner_pvt*)searchRunner);
  261. if (runner->searches > runner->maxConcurrentSearches) {
  262. Log_debug(runner->logger, "Skipping search because there are already [%d] searches active",
  263. runner->searches);
  264. return NULL;
  265. }
  266. if (maxRequests < 1) {
  267. maxRequests = SearchRunner_DEFAULT_MAX_REQUESTS;
  268. }
  269. if (maxRequestsIfFound < 1) {
  270. maxRequestsIfFound = SearchRunner_DEFAULT_MAX_REQUESTS_IF_FOUND;
  271. }
  272. struct Allocator* alloc = Allocator_child(allocator);
  273. struct Address targetAddr = { .path = 0 };
  274. Bits_memcpy(targetAddr.ip6.bytes, target, Address_SEARCH_TARGET_SIZE);
  275. struct NodeList* nodes =
  276. NodeStore_getClosestNodes(runner->nodeStore,
  277. &targetAddr,
  278. maxRequests,
  279. Version_CURRENT_PROTOCOL,
  280. alloc);
  281. if (nodes->size == 0) {
  282. Log_debug(runner->logger, "No nodes available for beginning search");
  283. Allocator_free(alloc);
  284. return NULL;
  285. }
  286. struct SearchStore_Search* sss = SearchStore_newSearch(target, runner->searchStore, alloc);
  287. for (int i = 0; i < (int)nodes->size; i++) {
  288. SearchStore_addNodeToSearch(&nodes->nodes[i]->address, sss);
  289. }
  290. struct SearchRunner_Search* search = Allocator_clone(alloc, (&(struct SearchRunner_Search) {
  291. .pub = {
  292. .alloc = alloc
  293. },
  294. .runner = runner,
  295. .search = sss,
  296. .maxRequests = maxRequests,
  297. .maxRequestsIfFound = maxRequestsIfFound
  298. }));
  299. Identity_set(search);
  300. runner->searches++;
  301. Allocator_onFree(alloc, searchOnFree, search);
  302. Bits_memcpy(&search->target, &targetAddr, sizeof(struct Address));
  303. if (runner->firstSearch) {
  304. search->nextSearch = runner->firstSearch;
  305. runner->firstSearch->thisSearch = &search->nextSearch;
  306. }
  307. runner->firstSearch = search;
  308. search->thisSearch = &runner->firstSearch;
  309. search->targetStr = String_newBinary((char*)targetAddr.ip6.bytes, 16, alloc);
  310. // Trigger the searchNextNode() immedietly but asynchronously.
  311. search->continueSearchTimeout =
  312. Timeout_setTimeout(searchNextNode, search, 0, runner->eventBase, alloc);
  313. return &search->pub;
  314. }
  315. struct SearchRunner* SearchRunner_new(struct NodeStore* nodeStore,
  316. struct Log* logger,
  317. struct EventBase* base,
  318. struct RouterModule* module,
  319. uint8_t myAddress[16],
  320. struct RumorMill* rumorMill,
  321. struct Allocator* allocator)
  322. {
  323. struct Allocator* alloc = Allocator_child(allocator);
  324. struct SearchRunner_pvt* out = Allocator_clone(alloc, (&(struct SearchRunner_pvt) {
  325. .nodeStore = nodeStore,
  326. .logger = logger,
  327. .eventBase = base,
  328. .router = module,
  329. .rumorMill = rumorMill,
  330. .maxConcurrentSearches = SearchRunner_DEFAULT_MAX_CONCURRENT_SEARCHES
  331. }));
  332. out->searchStore = SearchStore_new(alloc, logger);
  333. Bits_memcpy(out->myAddress, myAddress, 16);
  334. Identity_set(out);
  335. return &out->pub;
  336. }