SearchRunner.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. */
  15. #include "dht/Address.h"
  16. #include "dht/dhtcore/SearchRunner.h"
  17. #include "dht/dhtcore/SearchStore.h"
  18. #include "dht/dhtcore/RumorMill.h"
  19. #include "dht/dhtcore/RouterModule.h"
  20. #include "dht/dhtcore/ReplySerializer.h"
  21. #include "dht/dhtcore/NodeStore.h"
  22. #include "dht/dhtcore/NodeList.h"
  23. #include "dht/CJDHTConstants.h"
  24. #include "util/Identity.h"
  25. #include "util/Bits.h"
  26. #include "util/log/Log.h"
  27. #include "util/events/EventBase.h"
  28. #include "util/events/Timeout.h"
  29. #include "util/version/Version.h"
  30. /** The maximum number of requests to make before calling a search failed. */
  31. #define MAX_REQUESTS_PER_SEARCH 8
  32. struct SearchRunner_pvt
  33. {
  34. struct SearchRunner pub;
  35. struct SearchStore* searchStore;
  36. struct NodeStore* nodeStore;
  37. struct Log* logger;
  38. struct EventBase* eventBase;
  39. struct RouterModule* router;
  40. struct RumorMill* rumorMill;
  41. uint8_t myAddress[16];
  42. /** Number of concurrent searches in operation. */
  43. int searches;
  44. /** Maximum number of concurrent searches allowed. */
  45. int maxConcurrentSearches;
  46. /** Beginning of a linked list of searches. */
  47. struct SearchRunner_Search* firstSearch;
  48. Identity
  49. };
  50. /**
  51. * A context for the internals of a search.
  52. */
  53. struct SearchRunner_Search;
  54. struct SearchRunner_Search
  55. {
  56. struct RouterModule_Promise pub;
  57. /** The router module carrying out the search. */
  58. struct SearchRunner_pvt* const runner;
  59. /** The number of requests which have been sent out so far for this search. */
  60. uint32_t totalRequests;
  61. /** The address which we are searching for. */
  62. struct Address target;
  63. /** String form of the 16 byte ipv6 address. */
  64. String* targetStr;
  65. /**
  66. * The SearchStore_Search structure for this search,
  67. * used to keep track of which nodes are participating.
  68. */
  69. struct SearchStore_Search* search;
  70. /** The last node sent a search request. */
  71. struct Address lastNodeAsked;
  72. /**
  73. * The timeout if this timeout is hit then the search will continue
  74. * but the node will still be allowed to respond and it will be counted as a pong.
  75. */
  76. struct Timeout* continueSearchTimeout;
  77. /** Next search in the linked list. */
  78. struct SearchRunner_Search* nextSearch;
  79. /** Self pointer for this search so that the search can be removed from the linked list. */
  80. struct SearchRunner_Search** thisSearch;
  81. Identity
  82. };
  83. /**
  84. * Spot a duplicate entry in a node list.
  85. * If a router sends a response containing duplicate entries,
  86. * only the last (best) entry should be accepted.
  87. *
  88. * @param nodes the list of nodes.
  89. * @param index the index of the entry to check for being a duplicate.
  90. * @return true if duplicate, otherwise false.
  91. */
  92. static inline bool isDuplicateEntry(struct Address_List* list, uint32_t index)
  93. {
  94. for (int i = index+1; i < list->length; i++) {
  95. if (Bits_memcmp(&list->elems[i].key, &list->elems[i].key, Address_KEY_SIZE) == 0) {
  96. return true;
  97. }
  98. }
  99. return false;
  100. }
  101. static void searchStep(struct SearchRunner_Search* search);
  102. static void searchReplyCallback(struct RouterModule_Promise* promise,
  103. uint32_t lagMilliseconds,
  104. struct Address* from,
  105. Dict* result)
  106. {
  107. struct SearchRunner_Search* search =
  108. Identity_check((struct SearchRunner_Search*)promise->userData);
  109. struct Address_List* nodeList =
  110. ReplySerializer_parse(from, result, search->runner->logger, promise->alloc);
  111. for (int i = 0; nodeList && i < nodeList->length; i++) {
  112. if (isDuplicateEntry(nodeList, i)) {
  113. continue;
  114. }
  115. if (Address_closest(&search->target, &nodeList->elems[i], from) >= 0) {
  116. // Too much noise.
  117. //Log_debug(search->runner->logger, "Answer was further from the target than us.\n");
  118. continue;
  119. }
  120. if (search->lastNodeAsked.path != from->path) {
  121. // old queries coming in late...
  122. continue;
  123. }
  124. struct Node_Two* nn =
  125. NodeStore_getBest(search->runner->nodeStore, nodeList->elems[i].ip6.bytes);
  126. if (!nn) {
  127. RumorMill_addNode(search->runner->rumorMill, &nodeList->elems[i]);
  128. }
  129. nodeList->elems[i].path =
  130. NodeStore_optimizePath(search->runner->nodeStore, nodeList->elems[i].path);
  131. SearchStore_addNodeToSearch(&nodeList->elems[i], search->search);
  132. }
  133. }
  134. static void searchCallback(struct RouterModule_Promise* promise,
  135. uint32_t lagMilliseconds,
  136. struct Address* from,
  137. Dict* result)
  138. {
  139. struct SearchRunner_Search* search =
  140. Identity_check((struct SearchRunner_Search*)promise->userData);
  141. if (from) {
  142. searchReplyCallback(promise, lagMilliseconds, from, result);
  143. }
  144. if (search->pub.callback) {
  145. search->pub.callback(&search->pub, lagMilliseconds, from, result);
  146. }
  147. searchStep(search);
  148. }
  149. /**
  150. * Send a search request to the next node in this search.
  151. * This is called whenever a response comes in or after the global mean response time passes.
  152. */
  153. static void searchStep(struct SearchRunner_Search* search)
  154. {
  155. struct SearchRunner_pvt* ctx = Identity_check((struct SearchRunner_pvt*)search->runner);
  156. struct Node_Two* node;
  157. struct SearchStore_Node* nextSearchNode;
  158. for (;;) {
  159. nextSearchNode = SearchStore_getNextNode(search->search);
  160. // If the number of requests sent has exceeded the max search requests, let's stop there.
  161. if (search->totalRequests >= MAX_REQUESTS_PER_SEARCH || nextSearchNode == NULL) {
  162. if (search->pub.callback) {
  163. search->pub.callback(&search->pub, 0, NULL, NULL);
  164. }
  165. Allocator_free(search->pub.alloc);
  166. return;
  167. }
  168. node = NodeStore_getBest(ctx->nodeStore, nextSearchNode->address.ip6.bytes);
  169. if (!node) { continue; }
  170. if (node == ctx->nodeStore->selfNode) { continue; }
  171. if (Bits_memcmp(node->address.ip6.bytes, nextSearchNode->address.ip6.bytes, 16)) {
  172. continue;
  173. }
  174. break;
  175. }
  176. Assert_true(node != ctx->nodeStore->selfNode);
  177. Bits_memcpyConst(&search->lastNodeAsked, &node->address, sizeof(struct Address));
  178. struct RouterModule_Promise* rp =
  179. RouterModule_newMessage(&node->address, 0, ctx->router, search->pub.alloc);
  180. Dict* message = Dict_new(rp->alloc);
  181. Dict_putString(message, CJDHTConstants_QUERY, CJDHTConstants_QUERY_FN, rp->alloc);
  182. Dict_putString(message, CJDHTConstants_TARGET, search->targetStr, rp->alloc);
  183. rp->userData = search;
  184. rp->callback = searchCallback;
  185. RouterModule_sendMessage(rp, message);
  186. search->totalRequests++;
  187. }
  188. // Triggered by a search timeout (the message may still come back and will be treated as a ping)
  189. static void searchNextNode(void* vsearch)
  190. {
  191. struct SearchRunner_Search* search = Identity_check((struct SearchRunner_Search*) vsearch);
  192. // Timeout for trying the next node.
  193. Timeout_resetTimeout(search->continueSearchTimeout,
  194. RouterModule_searchTimeoutMilliseconds(search->runner->router));
  195. searchStep(search);
  196. }
  197. static int searchOnFree(struct Allocator_OnFreeJob* job)
  198. {
  199. struct SearchRunner_Search* search =
  200. Identity_check((struct SearchRunner_Search*)job->userData);
  201. *search->thisSearch = search->nextSearch;
  202. if (search->nextSearch) {
  203. search->nextSearch->thisSearch = search->thisSearch;
  204. }
  205. Assert_true(search->runner->searches > 0);
  206. search->runner->searches--;
  207. return 0;
  208. }
  209. struct SearchRunner_SearchData* SearchRunner_showActiveSearch(struct SearchRunner* searchRunner,
  210. int number,
  211. struct Allocator* alloc)
  212. {
  213. struct SearchRunner_pvt* runner = Identity_check((struct SearchRunner_pvt*)searchRunner);
  214. struct SearchRunner_Search* search = runner->firstSearch;
  215. while (search && number > 0) {
  216. search = search->nextSearch;
  217. number--;
  218. }
  219. struct SearchRunner_SearchData* out =
  220. Allocator_calloc(alloc, sizeof(struct SearchRunner_SearchData), 1);
  221. if (search) {
  222. Bits_memcpyConst(out->target, &search->target.ip6.bytes, 16);
  223. Bits_memcpyConst(&out->lastNodeAsked, &search->lastNodeAsked, sizeof(struct Address));
  224. out->totalRequests = search->totalRequests;
  225. }
  226. out->activeSearches = runner->searches;
  227. return out;
  228. }
  229. struct RouterModule_Promise* SearchRunner_search(uint8_t target[16],
  230. struct SearchRunner* searchRunner,
  231. struct Allocator* allocator)
  232. {
  233. struct SearchRunner_pvt* runner = Identity_check((struct SearchRunner_pvt*)searchRunner);
  234. if (runner->searches > runner->maxConcurrentSearches) {
  235. Log_debug(runner->logger, "Skipping search because there are already [%d] searches active",
  236. runner->searches);
  237. return NULL;
  238. }
  239. struct Allocator* alloc = Allocator_child(allocator);
  240. struct Address targetAddr = { .path = 0 };
  241. Bits_memcpyConst(targetAddr.ip6.bytes, target, Address_SEARCH_TARGET_SIZE);
  242. struct NodeList* nodes =
  243. NodeStore_getClosestNodes(runner->nodeStore,
  244. &targetAddr,
  245. MAX_REQUESTS_PER_SEARCH,
  246. Version_CURRENT_PROTOCOL,
  247. alloc);
  248. if (nodes->size == 0) {
  249. Log_debug(runner->logger, "No nodes available for beginning search");
  250. Allocator_free(alloc);
  251. return NULL;
  252. }
  253. struct SearchStore_Search* sss = SearchStore_newSearch(target, runner->searchStore, alloc);
  254. for (int i = 0; i < (int)nodes->size; i++) {
  255. SearchStore_addNodeToSearch(&nodes->nodes[i]->address, sss);
  256. }
  257. struct SearchRunner_Search* search = Allocator_clone(alloc, (&(struct SearchRunner_Search) {
  258. .pub = {
  259. .alloc = alloc
  260. },
  261. .runner = runner,
  262. .search = sss
  263. }));
  264. Identity_set(search);
  265. runner->searches++;
  266. Allocator_onFree(alloc, searchOnFree, search);
  267. Bits_memcpyConst(&search->target, &targetAddr, sizeof(struct Address));
  268. if (runner->firstSearch) {
  269. search->nextSearch = runner->firstSearch;
  270. runner->firstSearch->thisSearch = &search->nextSearch;
  271. }
  272. runner->firstSearch = search;
  273. search->thisSearch = &runner->firstSearch;
  274. search->targetStr = String_newBinary((char*)targetAddr.ip6.bytes, 16, alloc);
  275. // Trigger the searchNextNode() immedietly but asynchronously.
  276. search->continueSearchTimeout =
  277. Timeout_setTimeout(searchNextNode, search, 0, runner->eventBase, alloc);
  278. return &search->pub;
  279. }
  280. struct SearchRunner* SearchRunner_new(struct NodeStore* nodeStore,
  281. struct Log* logger,
  282. struct EventBase* base,
  283. struct RouterModule* module,
  284. uint8_t myAddress[16],
  285. struct RumorMill* rumorMill,
  286. struct Allocator* allocator)
  287. {
  288. struct Allocator* alloc = Allocator_child(allocator);
  289. struct SearchRunner_pvt* out = Allocator_clone(alloc, (&(struct SearchRunner_pvt) {
  290. .nodeStore = nodeStore,
  291. .logger = logger,
  292. .eventBase = base,
  293. .router = module,
  294. .rumorMill = rumorMill,
  295. .maxConcurrentSearches = SearchRunner_DEFAULT_MAX_CONCURRENT_SEARCHES
  296. }));
  297. out->searchStore = SearchStore_new(alloc, logger);
  298. Bits_memcpyConst(out->myAddress, myAddress, 16);
  299. Identity_set(out);
  300. return &out->pub;
  301. }