SearchRunner.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. */
  15. #include "dht/Address.h"
  16. #include "dht/dhtcore/SearchRunner.h"
  17. #include "dht/dhtcore/SearchStore.h"
  18. #include "dht/dhtcore/RumorMill.h"
  19. #include "dht/dhtcore/RouterModule.h"
  20. #include "dht/dhtcore/ReplySerializer.h"
  21. #include "dht/dhtcore/NodeStore.h"
  22. #include "dht/dhtcore/NodeList.h"
  23. #include "dht/CJDHTConstants.h"
  24. #include "util/Identity.h"
  25. #include "util/Bits.h"
  26. #include "util/log/Log.h"
  27. #include "util/events/EventBase.h"
  28. #include "util/events/Timeout.h"
  29. #include "util/version/Version.h"
  30. struct SearchRunner_pvt
  31. {
  32. struct SearchRunner pub;
  33. struct SearchStore* searchStore;
  34. struct NodeStore* nodeStore;
  35. struct Log* logger;
  36. struct EventBase* eventBase;
  37. struct RouterModule* router;
  38. struct RumorMill* rumorMill;
  39. uint8_t myAddress[16];
  40. /** Number of concurrent searches in operation. */
  41. int searches;
  42. /** Maximum number of concurrent searches allowed. */
  43. int maxConcurrentSearches;
  44. /** Beginning of a linked list of searches. */
  45. struct SearchRunner_Search* firstSearch;
  46. Identity
  47. };
  48. /**
  49. * A context for the internals of a search.
  50. */
  51. struct SearchRunner_Search;
  52. struct SearchRunner_Search
  53. {
  54. struct RouterModule_Promise pub;
  55. /** The router module carrying out the search. */
  56. struct SearchRunner_pvt* const runner;
  57. /** The number of requests which have been sent out so far for this search. */
  58. uint32_t totalRequests;
  59. /** Maximum number of requests to make before terminating the search. */
  60. uint32_t maxRequests;
  61. uint32_t maxRequestsIfFound;
  62. uint32_t numFinds;
  63. /** The address which we are searching for. */
  64. struct Address target;
  65. /** String form of the 16 byte ipv6 address. */
  66. String* targetStr;
  67. /**
  68. * The SearchStore_Search structure for this search,
  69. * used to keep track of which nodes are participating.
  70. */
  71. struct SearchStore_Search* search;
  72. /** The last node sent a search request. */
  73. struct Address lastNodeAsked;
  74. /**
  75. * The timeout if this timeout is hit then the search will continue
  76. * but the node will still be allowed to respond and it will be counted as a pong.
  77. */
  78. struct Timeout* continueSearchTimeout;
  79. /** Next search in the linked list. */
  80. struct SearchRunner_Search* nextSearch;
  81. /** Self pointer for this search so that the search can be removed from the linked list. */
  82. struct SearchRunner_Search** thisSearch;
  83. Identity
  84. };
  85. /**
  86. * Spot a duplicate entry in a node list.
  87. * If a router sends a response containing duplicate entries,
  88. * only the last (best) entry should be accepted.
  89. *
  90. * @param nodes the list of nodes.
  91. * @param index the index of the entry to check for being a duplicate.
  92. * @return true if duplicate, otherwise false.
  93. */
  94. static inline bool isDuplicateEntry(struct Address_List* list, uint32_t index)
  95. {
  96. for (int i = index+1; i < list->length; i++) {
  97. if (Bits_memcmp(&list->elems[i].key, &list->elems[i].key, Address_KEY_SIZE) == 0) {
  98. return true;
  99. }
  100. }
  101. return false;
  102. }
  103. static void searchStep(struct SearchRunner_Search* search);
  104. static void searchReplyCallback(struct RouterModule_Promise* promise,
  105. uint32_t lagMilliseconds,
  106. struct Address* from,
  107. Dict* result)
  108. {
  109. struct SearchRunner_Search* search =
  110. Identity_check((struct SearchRunner_Search*)promise->userData);
  111. if (!Bits_memcmp(from->ip6.bytes, search->target.ip6.bytes, 16)) {
  112. search->numFinds++;
  113. }
  114. struct Address_List* nodeList =
  115. ReplySerializer_parse(from, result, search->runner->logger, true, promise->alloc);
  116. struct Address* best = NULL;
  117. for (int i = 0; nodeList && i < nodeList->length; i++) {
  118. if (isDuplicateEntry(nodeList, i)) {
  119. continue;
  120. }
  121. if (Address_closest(&search->target, &nodeList->elems[i], from) >= 0) {
  122. // Too much noise.
  123. //Log_debug(search->runner->logger, "Answer was further from the target than us.\n");
  124. continue;
  125. }
  126. if (search->lastNodeAsked.path != from->path) {
  127. // old queries coming in late...
  128. continue;
  129. }
  130. struct Node_Two* nn =
  131. NodeStore_getBest(search->runner->nodeStore, nodeList->elems[i].ip6.bytes);
  132. if (!nn) {
  133. RumorMill_addNode(search->runner->rumorMill, &nodeList->elems[i]);
  134. }
  135. //nodeList->elems[i].path =
  136. // NodeStore_optimizePath(search->runner->nodeStore, nodeList->elems[i].path);
  137. if (!Bits_memcmp(nodeList->elems[i].ip6.bytes, search->target.ip6.bytes, 16)) {
  138. if (!best) {
  139. best = &nodeList->elems[i];
  140. continue;
  141. } else if (nodeList->elems[i].path < best->path) {
  142. SearchStore_addNodeToSearch(best, search->search);
  143. best = &nodeList->elems[i];
  144. continue;
  145. }
  146. }
  147. SearchStore_addNodeToSearch(&nodeList->elems[i], search->search);
  148. }
  149. if (best) {
  150. SearchStore_addNodeToSearch(best, search->search);
  151. }
  152. }
  153. static void searchCallback(struct RouterModule_Promise* promise,
  154. uint32_t lagMilliseconds,
  155. struct Address* from,
  156. Dict* result)
  157. {
  158. struct SearchRunner_Search* search =
  159. Identity_check((struct SearchRunner_Search*)promise->userData);
  160. if (from) {
  161. searchReplyCallback(promise, lagMilliseconds, from, result);
  162. }
  163. if (search->pub.callback) {
  164. search->pub.callback(&search->pub, lagMilliseconds, from, result);
  165. }
  166. searchStep(search);
  167. }
  168. /**
  169. * Send a search request to the next node in this search.
  170. * This is called whenever a response comes in or after the global mean response time passes.
  171. */
  172. static void searchStep(struct SearchRunner_Search* search)
  173. {
  174. struct SearchRunner_pvt* ctx = Identity_check((struct SearchRunner_pvt*)search->runner);
  175. struct SearchStore_Node* nextSearchNode;
  176. for (;;) {
  177. nextSearchNode = SearchStore_getNextNode(search->search);
  178. // If the number of requests sent has exceeded the max search requests, let's stop there.
  179. if (search->totalRequests >= search->maxRequests) {
  180. // fallthrough
  181. } else if (search->numFinds > 0 && search->totalRequests >= search->maxRequestsIfFound) {
  182. // fallthrough
  183. } else if (nextSearchNode == NULL) {
  184. // fallthrough
  185. } else {
  186. break;
  187. }
  188. if (search->pub.callback) {
  189. search->pub.callback(&search->pub, 0, NULL, NULL);
  190. }
  191. Allocator_free(search->pub.alloc);
  192. return;
  193. }
  194. Bits_memcpyConst(&search->lastNodeAsked, &nextSearchNode->address, sizeof(struct Address));
  195. struct RouterModule_Promise* rp =
  196. RouterModule_newMessage(&nextSearchNode->address, 0, ctx->router, search->pub.alloc);
  197. Dict* message = Dict_new(rp->alloc);
  198. if (!Bits_memcmp(nextSearchNode->address.ip6.bytes, search->target.ip6.bytes, 16)) {
  199. Dict_putString(message, CJDHTConstants_QUERY, CJDHTConstants_QUERY_GP, rp->alloc);
  200. } else {
  201. Dict_putString(message, CJDHTConstants_QUERY, CJDHTConstants_QUERY_FN, rp->alloc);
  202. }
  203. Dict_putString(message, CJDHTConstants_TARGET, search->targetStr, rp->alloc);
  204. rp->userData = search;
  205. rp->callback = searchCallback;
  206. RouterModule_sendMessage(rp, message);
  207. search->totalRequests++;
  208. }
  209. // Triggered by a search timeout (the message may still come back and will be treated as a ping)
  210. static void searchNextNode(void* vsearch)
  211. {
  212. struct SearchRunner_Search* search = Identity_check((struct SearchRunner_Search*) vsearch);
  213. // Timeout for trying the next node.
  214. Timeout_resetTimeout(search->continueSearchTimeout,
  215. RouterModule_searchTimeoutMilliseconds(search->runner->router));
  216. searchStep(search);
  217. }
  218. static int searchOnFree(struct Allocator_OnFreeJob* job)
  219. {
  220. struct SearchRunner_Search* search =
  221. Identity_check((struct SearchRunner_Search*)job->userData);
  222. *search->thisSearch = search->nextSearch;
  223. if (search->nextSearch) {
  224. search->nextSearch->thisSearch = search->thisSearch;
  225. }
  226. Assert_true(search->runner->searches > 0);
  227. search->runner->searches--;
  228. return 0;
  229. }
  230. struct SearchRunner_SearchData* SearchRunner_showActiveSearch(struct SearchRunner* searchRunner,
  231. int number,
  232. struct Allocator* alloc)
  233. {
  234. struct SearchRunner_pvt* runner = Identity_check((struct SearchRunner_pvt*)searchRunner);
  235. struct SearchRunner_Search* search = runner->firstSearch;
  236. while (search && number > 0) {
  237. search = search->nextSearch;
  238. number--;
  239. }
  240. struct SearchRunner_SearchData* out =
  241. Allocator_calloc(alloc, sizeof(struct SearchRunner_SearchData), 1);
  242. if (search) {
  243. Bits_memcpyConst(out->target, &search->target.ip6.bytes, 16);
  244. Bits_memcpyConst(&out->lastNodeAsked, &search->lastNodeAsked, sizeof(struct Address));
  245. out->totalRequests = search->totalRequests;
  246. }
  247. out->activeSearches = runner->searches;
  248. return out;
  249. }
  250. struct RouterModule_Promise* SearchRunner_search(uint8_t target[16],
  251. int maxRequests,
  252. int maxRequestsIfFound,
  253. struct SearchRunner* searchRunner,
  254. struct Allocator* allocator)
  255. {
  256. struct SearchRunner_pvt* runner = Identity_check((struct SearchRunner_pvt*)searchRunner);
  257. if (runner->searches > runner->maxConcurrentSearches) {
  258. Log_debug(runner->logger, "Skipping search because there are already [%d] searches active",
  259. runner->searches);
  260. return NULL;
  261. }
  262. if (maxRequests < 1) {
  263. maxRequests = SearchRunner_DEFAULT_MAX_REQUESTS;
  264. }
  265. if (maxRequestsIfFound < 1) {
  266. maxRequestsIfFound = SearchRunner_DEFAULT_MAX_REQUESTS_IF_FOUND;
  267. }
  268. struct Allocator* alloc = Allocator_child(allocator);
  269. struct Address targetAddr = { .path = 0 };
  270. Bits_memcpyConst(targetAddr.ip6.bytes, target, Address_SEARCH_TARGET_SIZE);
  271. struct NodeList* nodes =
  272. NodeStore_getClosestNodes(runner->nodeStore,
  273. &targetAddr,
  274. maxRequests,
  275. Version_CURRENT_PROTOCOL,
  276. alloc);
  277. if (nodes->size == 0) {
  278. Log_debug(runner->logger, "No nodes available for beginning search");
  279. Allocator_free(alloc);
  280. return NULL;
  281. }
  282. struct SearchStore_Search* sss = SearchStore_newSearch(target, runner->searchStore, alloc);
  283. for (int i = 0; i < (int)nodes->size; i++) {
  284. SearchStore_addNodeToSearch(&nodes->nodes[i]->address, sss);
  285. }
  286. struct SearchRunner_Search* search = Allocator_clone(alloc, (&(struct SearchRunner_Search) {
  287. .pub = {
  288. .alloc = alloc
  289. },
  290. .runner = runner,
  291. .search = sss,
  292. .maxRequests = maxRequests,
  293. .maxRequestsIfFound = maxRequestsIfFound
  294. }));
  295. Identity_set(search);
  296. runner->searches++;
  297. Allocator_onFree(alloc, searchOnFree, search);
  298. Bits_memcpyConst(&search->target, &targetAddr, sizeof(struct Address));
  299. if (runner->firstSearch) {
  300. search->nextSearch = runner->firstSearch;
  301. runner->firstSearch->thisSearch = &search->nextSearch;
  302. }
  303. runner->firstSearch = search;
  304. search->thisSearch = &runner->firstSearch;
  305. search->targetStr = String_newBinary((char*)targetAddr.ip6.bytes, 16, alloc);
  306. // Trigger the searchNextNode() immedietly but asynchronously.
  307. search->continueSearchTimeout =
  308. Timeout_setTimeout(searchNextNode, search, 0, runner->eventBase, alloc);
  309. return &search->pub;
  310. }
  311. struct SearchRunner* SearchRunner_new(struct NodeStore* nodeStore,
  312. struct Log* logger,
  313. struct EventBase* base,
  314. struct RouterModule* module,
  315. uint8_t myAddress[16],
  316. struct RumorMill* rumorMill,
  317. struct Allocator* allocator)
  318. {
  319. struct Allocator* alloc = Allocator_child(allocator);
  320. struct SearchRunner_pvt* out = Allocator_clone(alloc, (&(struct SearchRunner_pvt) {
  321. .nodeStore = nodeStore,
  322. .logger = logger,
  323. .eventBase = base,
  324. .router = module,
  325. .rumorMill = rumorMill,
  326. .maxConcurrentSearches = SearchRunner_DEFAULT_MAX_CONCURRENT_SEARCHES
  327. }));
  328. out->searchStore = SearchStore_new(alloc, logger);
  329. Bits_memcpyConst(out->myAddress, myAddress, 16);
  330. Identity_set(out);
  331. return &out->pub;
  332. }