SearchRunner.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. */
  15. #include "dht/Address.h"
  16. #include "dht/dhtcore/SearchRunner.h"
  17. #include "dht/dhtcore/SearchStore.h"
  18. #include "dht/dhtcore/RumorMill.h"
  19. #include "dht/dhtcore/RouterModule.h"
  20. #include "dht/dhtcore/ReplySerializer.h"
  21. #include "dht/dhtcore/NodeStore.h"
  22. #include "dht/dhtcore/NodeList.h"
  23. #include "dht/CJDHTConstants.h"
  24. #include "util/Identity.h"
  25. #include "util/Bits.h"
  26. #include "util/log/Log.h"
  27. #include "util/events/EventBase.h"
  28. #include "util/events/Timeout.h"
  29. #include "util/version/Version.h"
  30. /** The maximum number of requests to make before calling a search failed. */
  31. #define DEFAULT_MAX_REQUESTS_PER_SEARCH 8
  32. struct SearchRunner_pvt
  33. {
  34. struct SearchRunner pub;
  35. struct SearchStore* searchStore;
  36. struct NodeStore* nodeStore;
  37. struct Log* logger;
  38. struct EventBase* eventBase;
  39. struct RouterModule* router;
  40. struct RumorMill* rumorMill;
  41. uint8_t myAddress[16];
  42. /** Number of concurrent searches in operation. */
  43. int searches;
  44. /** Maximum number of concurrent searches allowed. */
  45. int maxConcurrentSearches;
  46. /** Beginning of a linked list of searches. */
  47. struct SearchRunner_Search* firstSearch;
  48. Identity
  49. };
  50. /**
  51. * A context for the internals of a search.
  52. */
  53. struct SearchRunner_Search;
  54. struct SearchRunner_Search
  55. {
  56. struct RouterModule_Promise pub;
  57. /** The router module carrying out the search. */
  58. struct SearchRunner_pvt* const runner;
  59. /** The number of requests which have been sent out so far for this search. */
  60. uint32_t totalRequests;
  61. /** Maximum number of requests to make before terminating the search. */
  62. uint32_t maxRequests;
  63. /** The address which we are searching for. */
  64. struct Address target;
  65. /** String form of the 16 byte ipv6 address. */
  66. String* targetStr;
  67. /**
  68. * The SearchStore_Search structure for this search,
  69. * used to keep track of which nodes are participating.
  70. */
  71. struct SearchStore_Search* search;
  72. /** The last node sent a search request. */
  73. struct Address lastNodeAsked;
  74. /**
  75. * The timeout if this timeout is hit then the search will continue
  76. * but the node will still be allowed to respond and it will be counted as a pong.
  77. */
  78. struct Timeout* continueSearchTimeout;
  79. /** Next search in the linked list. */
  80. struct SearchRunner_Search* nextSearch;
  81. /** Self pointer for this search so that the search can be removed from the linked list. */
  82. struct SearchRunner_Search** thisSearch;
  83. Identity
  84. };
  85. /**
  86. * Spot a duplicate entry in a node list.
  87. * If a router sends a response containing duplicate entries,
  88. * only the last (best) entry should be accepted.
  89. *
  90. * @param nodes the list of nodes.
  91. * @param index the index of the entry to check for being a duplicate.
  92. * @return true if duplicate, otherwise false.
  93. */
  94. static inline bool isDuplicateEntry(struct Address_List* list, uint32_t index)
  95. {
  96. for (int i = index+1; i < list->length; i++) {
  97. if (Bits_memcmp(&list->elems[i].key, &list->elems[i].key, Address_KEY_SIZE) == 0) {
  98. return true;
  99. }
  100. }
  101. return false;
  102. }
  103. static void searchStep(struct SearchRunner_Search* search);
  104. static void searchReplyCallback(struct RouterModule_Promise* promise,
  105. uint32_t lagMilliseconds,
  106. struct Address* from,
  107. Dict* result)
  108. {
  109. struct SearchRunner_Search* search =
  110. Identity_check((struct SearchRunner_Search*)promise->userData);
  111. struct Address_List* nodeList =
  112. ReplySerializer_parse(from, result, search->runner->logger, true, promise->alloc);
  113. for (int i = 0; nodeList && i < nodeList->length; i++) {
  114. if (isDuplicateEntry(nodeList, i)) {
  115. continue;
  116. }
  117. if (Address_closest(&search->target, &nodeList->elems[i], from) >= 0) {
  118. // Too much noise.
  119. //Log_debug(search->runner->logger, "Answer was further from the target than us.\n");
  120. continue;
  121. }
  122. if (search->lastNodeAsked.path != from->path) {
  123. // old queries coming in late...
  124. continue;
  125. }
  126. struct Node_Two* nn =
  127. NodeStore_getBest(search->runner->nodeStore, nodeList->elems[i].ip6.bytes);
  128. if (!nn) {
  129. RumorMill_addNode(search->runner->rumorMill, &nodeList->elems[i]);
  130. }
  131. nodeList->elems[i].path =
  132. NodeStore_optimizePath(search->runner->nodeStore, nodeList->elems[i].path);
  133. SearchStore_addNodeToSearch(&nodeList->elems[i], search->search);
  134. }
  135. }
  136. static void searchCallback(struct RouterModule_Promise* promise,
  137. uint32_t lagMilliseconds,
  138. struct Address* from,
  139. Dict* result)
  140. {
  141. struct SearchRunner_Search* search =
  142. Identity_check((struct SearchRunner_Search*)promise->userData);
  143. if (from) {
  144. searchReplyCallback(promise, lagMilliseconds, from, result);
  145. }
  146. if (search->pub.callback) {
  147. search->pub.callback(&search->pub, lagMilliseconds, from, result);
  148. }
  149. searchStep(search);
  150. }
  151. /**
  152. * Send a search request to the next node in this search.
  153. * This is called whenever a response comes in or after the global mean response time passes.
  154. */
  155. static void searchStep(struct SearchRunner_Search* search)
  156. {
  157. struct SearchRunner_pvt* ctx = Identity_check((struct SearchRunner_pvt*)search->runner);
  158. struct Node_Two* node;
  159. struct SearchStore_Node* nextSearchNode;
  160. for (;;) {
  161. nextSearchNode = SearchStore_getNextNode(search->search);
  162. // If the number of requests sent has exceeded the max search requests, let's stop there.
  163. if (search->totalRequests >= search->maxRequests || nextSearchNode == NULL) {
  164. if (search->pub.callback) {
  165. search->pub.callback(&search->pub, 0, NULL, NULL);
  166. }
  167. Allocator_free(search->pub.alloc);
  168. return;
  169. }
  170. node = NodeStore_getBest(ctx->nodeStore, nextSearchNode->address.ip6.bytes);
  171. if (!node) { continue; }
  172. if (node == ctx->nodeStore->selfNode) { continue; }
  173. if (Bits_memcmp(node->address.ip6.bytes, nextSearchNode->address.ip6.bytes, 16)) {
  174. continue;
  175. }
  176. break;
  177. }
  178. Assert_true(node != ctx->nodeStore->selfNode);
  179. Bits_memcpyConst(&search->lastNodeAsked, &node->address, sizeof(struct Address));
  180. struct RouterModule_Promise* rp =
  181. RouterModule_newMessage(&node->address, 0, ctx->router, search->pub.alloc);
  182. Dict* message = Dict_new(rp->alloc);
  183. Dict_putString(message, CJDHTConstants_QUERY, CJDHTConstants_QUERY_FN, rp->alloc);
  184. Dict_putString(message, CJDHTConstants_TARGET, search->targetStr, rp->alloc);
  185. rp->userData = search;
  186. rp->callback = searchCallback;
  187. RouterModule_sendMessage(rp, message);
  188. search->totalRequests++;
  189. }
  190. // Triggered by a search timeout (the message may still come back and will be treated as a ping)
  191. static void searchNextNode(void* vsearch)
  192. {
  193. struct SearchRunner_Search* search = Identity_check((struct SearchRunner_Search*) vsearch);
  194. // Timeout for trying the next node.
  195. Timeout_resetTimeout(search->continueSearchTimeout,
  196. RouterModule_searchTimeoutMilliseconds(search->runner->router));
  197. searchStep(search);
  198. }
  199. static int searchOnFree(struct Allocator_OnFreeJob* job)
  200. {
  201. struct SearchRunner_Search* search =
  202. Identity_check((struct SearchRunner_Search*)job->userData);
  203. *search->thisSearch = search->nextSearch;
  204. if (search->nextSearch) {
  205. search->nextSearch->thisSearch = search->thisSearch;
  206. }
  207. Assert_true(search->runner->searches > 0);
  208. search->runner->searches--;
  209. return 0;
  210. }
  211. struct SearchRunner_SearchData* SearchRunner_showActiveSearch(struct SearchRunner* searchRunner,
  212. int number,
  213. struct Allocator* alloc)
  214. {
  215. struct SearchRunner_pvt* runner = Identity_check((struct SearchRunner_pvt*)searchRunner);
  216. struct SearchRunner_Search* search = runner->firstSearch;
  217. while (search && number > 0) {
  218. search = search->nextSearch;
  219. number--;
  220. }
  221. struct SearchRunner_SearchData* out =
  222. Allocator_calloc(alloc, sizeof(struct SearchRunner_SearchData), 1);
  223. if (search) {
  224. Bits_memcpyConst(out->target, &search->target.ip6.bytes, 16);
  225. Bits_memcpyConst(&out->lastNodeAsked, &search->lastNodeAsked, sizeof(struct Address));
  226. out->totalRequests = search->totalRequests;
  227. }
  228. out->activeSearches = runner->searches;
  229. return out;
  230. }
  231. struct RouterModule_Promise* SearchRunner_search(uint8_t target[16],
  232. int maxRequests,
  233. struct SearchRunner* searchRunner,
  234. struct Allocator* allocator)
  235. {
  236. struct SearchRunner_pvt* runner = Identity_check((struct SearchRunner_pvt*)searchRunner);
  237. if (runner->searches > runner->maxConcurrentSearches) {
  238. Log_debug(runner->logger, "Skipping search because there are already [%d] searches active",
  239. runner->searches);
  240. return NULL;
  241. }
  242. if (maxRequests < 1) {
  243. maxRequests = DEFAULT_MAX_REQUESTS_PER_SEARCH;
  244. }
  245. struct Allocator* alloc = Allocator_child(allocator);
  246. struct Address targetAddr = { .path = 0 };
  247. Bits_memcpyConst(targetAddr.ip6.bytes, target, Address_SEARCH_TARGET_SIZE);
  248. struct NodeList* nodes =
  249. NodeStore_getClosestNodes(runner->nodeStore,
  250. &targetAddr,
  251. maxRequests,
  252. Version_CURRENT_PROTOCOL,
  253. alloc);
  254. if (nodes->size == 0) {
  255. Log_debug(runner->logger, "No nodes available for beginning search");
  256. Allocator_free(alloc);
  257. return NULL;
  258. }
  259. struct SearchStore_Search* sss = SearchStore_newSearch(target, runner->searchStore, alloc);
  260. for (int i = 0; i < (int)nodes->size; i++) {
  261. SearchStore_addNodeToSearch(&nodes->nodes[i]->address, sss);
  262. }
  263. struct SearchRunner_Search* search = Allocator_clone(alloc, (&(struct SearchRunner_Search) {
  264. .pub = {
  265. .alloc = alloc
  266. },
  267. .runner = runner,
  268. .search = sss,
  269. .maxRequests = maxRequests
  270. }));
  271. Identity_set(search);
  272. runner->searches++;
  273. Allocator_onFree(alloc, searchOnFree, search);
  274. Bits_memcpyConst(&search->target, &targetAddr, sizeof(struct Address));
  275. if (runner->firstSearch) {
  276. search->nextSearch = runner->firstSearch;
  277. runner->firstSearch->thisSearch = &search->nextSearch;
  278. }
  279. runner->firstSearch = search;
  280. search->thisSearch = &runner->firstSearch;
  281. search->targetStr = String_newBinary((char*)targetAddr.ip6.bytes, 16, alloc);
  282. // Trigger the searchNextNode() immedietly but asynchronously.
  283. search->continueSearchTimeout =
  284. Timeout_setTimeout(searchNextNode, search, 0, runner->eventBase, alloc);
  285. return &search->pub;
  286. }
  287. struct SearchRunner* SearchRunner_new(struct NodeStore* nodeStore,
  288. struct Log* logger,
  289. struct EventBase* base,
  290. struct RouterModule* module,
  291. uint8_t myAddress[16],
  292. struct RumorMill* rumorMill,
  293. struct Allocator* allocator)
  294. {
  295. struct Allocator* alloc = Allocator_child(allocator);
  296. struct SearchRunner_pvt* out = Allocator_clone(alloc, (&(struct SearchRunner_pvt) {
  297. .nodeStore = nodeStore,
  298. .logger = logger,
  299. .eventBase = base,
  300. .router = module,
  301. .rumorMill = rumorMill,
  302. .maxConcurrentSearches = SearchRunner_DEFAULT_MAX_CONCURRENT_SEARCHES
  303. }));
  304. out->searchStore = SearchStore_new(alloc, logger);
  305. Bits_memcpyConst(out->myAddress, myAddress, 16);
  306. Identity_set(out);
  307. return &out->pub;
  308. }