/* vim: set expandtab ts=4 sw=4: */ /* * You may redistribute this program and/or modify it under the terms of * the GNU General Public License as published by the Free Software Foundation, * either version 3 of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "crypto/random/Random.h" #include "dht/Address.h" #include "dht/dhtcore/Janitor.h" #include "dht/dhtcore/Node.h" #include "dht/dhtcore/NodeList.h" #include "dht/dhtcore/RumorMill.h" #include "dht/dhtcore/RouterModule.h" #include "dht/dhtcore/SearchRunner.h" #include "dht/dhtcore/ReplySerializer.h" #include "benc/Object.h" #include "memory/Allocator.h" #include "util/AddrTools.h" #include "util/AverageRoller.h" #include "util/Bits.h" #include "util/events/EventBase.h" #include "util/Hex.h" #include "util/events/Timeout.h" #include "util/events/Time.h" #include "util/platform/libc/string.h" #include #include /** * The goal of this is to run searches in the local area of this node. * it searches for hashes every localMaintainenceSearchPeriod milliseconds. * it runs searches by picking hashes at random, if a hash is chosen and there is a * non-zero-reach node which services that space, it stops. This way it will run many * searches early on but as the number of known nodes increases, it begins to taper off. */ struct Janitor { struct RouterModule* routerModule; struct NodeStore* nodeStore; struct SearchRunner* searchRunner; struct RumorMill* rumorMill; struct RumorMill* nodesOfInterest; struct Timeout* timeout; struct Log* logger; uint64_t globalMaintainenceMilliseconds; uint64_t timeOfNextGlobalMaintainence; uint64_t localMaintainenceMilliseconds; struct Allocator* allocator; uint64_t timeOfNextSearchRepeat; uint64_t searchRepeatMilliseconds; struct EventBase* eventBase; struct Random* rand; /** Number of concurrent searches taking place. */ int searches; Identity }; struct Janitor_Search { struct Janitor* janitor; struct Address best; uint8_t target[16]; struct Allocator* alloc; Identity }; static void responseCallback(struct RouterModule_Promise* promise, uint32_t lagMilliseconds, struct Address* from, Dict* result) { struct Janitor_Search* search = Identity_check((struct Janitor_Search*)promise->userData); if (from) { Bits_memcpyConst(&search->best, from, sizeof(struct Address)); return; } search->janitor->searches--; if (!search->best.path) { Log_debug(search->janitor->logger, "Search completed with no nodes found"); } Allocator_free(search->alloc); } static void search(uint8_t target[16], struct Janitor* janitor) { if (janitor->searches >= 20) { Log_debug(janitor->logger, "Skipping search because 20 are in progress"); return; } #ifdef Log_DEBUG uint8_t targetStr[40]; AddrTools_printIp(targetStr, target); Log_debug(janitor->logger, "Beginning search for [%s]", targetStr); #endif struct Allocator* searchAlloc = Allocator_child(janitor->allocator); struct RouterModule_Promise* rp = SearchRunner_search(target, janitor->searchRunner, searchAlloc); if (!rp) { Log_debug(janitor->logger, "SearchRunner_search() returned NULL, probably full."); return; } janitor->searches++; struct Janitor_Search* search = Allocator_clone(rp->alloc, (&(struct Janitor_Search) { .janitor = janitor, .alloc = searchAlloc, })); Identity_set(search); Bits_memcpyConst(search->target, target, 16); rp->callback = responseCallback; rp->userData = search; } static void searchNoDupe(uint8_t target[Address_SEARCH_TARGET_SIZE], struct Janitor* janitor) { // See if we're already searching for this address. struct Allocator* seachListAlloc = Allocator_child(janitor->allocator); struct SearchRunner_SearchData* searchData; for (int i = 0; i < SearchRunner_DEFAULT_MAX_CONCURRENT_SEARCHES; i++) { searchData = SearchRunner_showActiveSearch(janitor->searchRunner, i, seachListAlloc); if (!searchData) { continue; } if (!Bits_memcmp(searchData->target, target, Address_SEARCH_TARGET_SIZE)) { // Already have a search going for this address, so nothing to do. Allocator_free(seachListAlloc); return; } } Allocator_free(seachListAlloc); // There's no search running for this address, so we start one. search(target, janitor); #ifdef Log_DEBUG uint8_t addrStr[40]; AddrTools_printIp(addrStr, target); Log_debug(janitor->logger, "No active search for [%s], starting one.", addrStr); #endif } /** * For a Distributed Hash Table to work, each node must know a valid next hop in the search, * if such a thing exists. * * In a Kademlia DHT, can be done by organizing nodes into k-buckets. These are collections * of k nodes for which the first n bits of your node IDs match. Among other things, k-buckets * allow a node to identify holes in their lookup table and fill them. If the nth bucket is empty, * it means your node knows of no valid next hop for any key matching the first n bits of your * address and differing in bit n+1. * * Without going to the trouble of organizing nodes in the buckets, this function iterates * bitwise over keyspace, to identify the same kind of routing holes. * It then dispatches a search for the first (largest) such hole in keyspace that it finds. */ static void plugLargestKeyspaceHole(struct Janitor* janitor) { struct Address addr = *janitor->nodeStore->selfAddress; // Get furthest possible address for (int i = 1 ; i < Address_SEARCH_TARGET_SIZE ; i++) { // Leaving [0] alone keeps the 0xfc around, so it's a valid cjdns address. addr.ip6.bytes[i] = ~(addr.ip6.bytes[i]); } int byte = 0; int bit = 0; for (int i = 0; i < 128 ; i++) { if (63 < i && i < 72) { // We want to leave the 0xfc alone continue; } if (i < 64) { byte = 8 + (i/8); } else { byte = (i/8) - 8; } bit = (i % 8); addr.ip6.bytes[byte] = addr.ip6.bytes[byte] ^ (0x01 << bit); struct Node_Two* n = RouterModule_lookup(addr.ip6.bytes, janitor->routerModule); if (!n) { // We found a hole! Exit loop and let the search trigger. break; } } searchNoDupe(addr.ip6.bytes, janitor); } static void peersResponseCallback(struct RouterModule_Promise* promise, uint32_t lagMilliseconds, struct Address* from, Dict* result) { struct Janitor* janitor = Identity_check((struct Janitor*)promise->userData); if (!from) { return; } struct Node_Link* fromLink = NodeStore_linkForPath(janitor->nodeStore, from->path); Assert_true(fromLink); struct Address_List* addresses = ReplySerializer_parse(from, fromLink->child->encodingScheme, fromLink->inverseLinkEncodingFormNumber, result, janitor->logger, promise->alloc); for (int i = 0; addresses && i < addresses->length; i++) { if (!NodeStore_linkForPath(janitor->nodeStore, addresses->elems[i].path)) { RumorMill_addNode(janitor->rumorMill, &addresses->elems[i]); } } } static void checkPeers(struct Janitor* janitor, struct Node_Two* n) { // Lets check for non-one-hop links at each node along the path between us and this node. uint32_t i = 0; for (;;i++) { struct Node_Link* link = NodeStore_getLinkOnPath(janitor->nodeStore, n->address.path, i); if (!link) { return; } if (link->parent == janitor->nodeStore->selfNode) { continue; } int count = NodeStore_linkCount(link->child); for (int j = 0; j < count; j++) { struct Node_Link* l = NodeStore_getLink(link->child, j); if (!Node_isOneHopLink(l) || link->parent->pathQuality == 0) { struct RouterModule_Promise* rp = RouterModule_getPeers(&link->parent->address, l->cannonicalLabel, 0, janitor->routerModule, janitor->allocator); rp->callback = peersResponseCallback; rp->userData = janitor; // Only send max 1 getPeers req per second. return; } } } } static void maintanenceCycle(void* vcontext) { struct Janitor* const janitor = Identity_check((struct Janitor*) vcontext); uint64_t now = Time_currentTimeMilliseconds(janitor->eventBase); uint64_t nextTimeout = (janitor->localMaintainenceMilliseconds / 2); nextTimeout += Random_uint32(janitor->rand) % (nextTimeout * 2); janitor->timeout = Timeout_setTimeout(maintanenceCycle, janitor, nextTimeout, janitor->eventBase, janitor->allocator); if (NodeStore_size(janitor->nodeStore) == 0 && janitor->rumorMill->count == 0) { if (now > janitor->timeOfNextGlobalMaintainence) { Log_warn(janitor->logger, "No nodes in routing table, check network connection and configuration."); janitor->timeOfNextGlobalMaintainence += janitor->globalMaintainenceMilliseconds; } return; } struct Address addr = { .protocolVersion = 0 }; // ping a node from the ping queue if (RumorMill_getNode(janitor->rumorMill, &addr)) { addr.path = NodeStore_optimizePath(janitor->nodeStore, addr.path); if (NodeStore_optimizePath_INVALID != addr.path) { struct RouterModule_Promise* rp = RouterModule_getPeers(&addr, Random_uint32(janitor->rand), 0, janitor->routerModule, janitor->allocator); rp->callback = peersResponseCallback; rp->userData = janitor; #ifdef Log_DEBUG uint8_t addrStr[60]; Address_print(addrStr, &addr); Log_debug(janitor->logger, "Pinging possible node [%s] from RumorMill", addrStr); #endif } } // This is good for DHT health. See function description. plugLargestKeyspaceHole(janitor); // Do something useful for a node we're actively trying to communicate with. if (RumorMill_getNode(janitor->nodesOfInterest, &addr)) { searchNoDupe(addr.ip6.bytes, janitor); } // random search Random_bytes(janitor->rand, addr.ip6.bytes, 16); // Make this a valid address. addr.ip6.bytes[0] = 0xfc; struct Node_Two* n = RouterModule_lookup(addr.ip6.bytes, janitor->routerModule); // If the best next node doesn't exist or has 0 reach, run a local maintenance search. if (n == NULL || n->pathQuality == 0) { search(addr.ip6.bytes, janitor); return; } else { checkPeers(janitor, n); } #ifdef Log_DEBUG int nonZeroNodes = NodeStore_nonZeroNodes(janitor->nodeStore); int total = NodeStore_size(janitor->nodeStore); Log_debug(janitor->logger, "Global Mean Response Time: %u non-zero nodes: [%d] zero nodes [%d] " "total [%d]", RouterModule_globalMeanResponseTime(janitor->routerModule), nonZeroNodes, (total - nonZeroNodes), total); #endif if (now > janitor->timeOfNextGlobalMaintainence) { search(addr.ip6.bytes, janitor); janitor->timeOfNextGlobalMaintainence += janitor->globalMaintainenceMilliseconds; } } struct Janitor* Janitor_new(uint64_t localMaintainenceMilliseconds, uint64_t globalMaintainenceMilliseconds, struct RouterModule* routerModule, struct NodeStore* nodeStore, struct SearchRunner* searchRunner, struct RumorMill* rumorMill, struct RumorMill* nodesOfInterest, struct Log* logger, struct Allocator* allocator, struct EventBase* eventBase, struct Random* rand) { struct Allocator* alloc = Allocator_child(allocator); struct Janitor* janitor = Allocator_clone(alloc, (&(struct Janitor) { .eventBase = eventBase, .routerModule = routerModule, .nodeStore = nodeStore, .searchRunner = searchRunner, .rumorMill = rumorMill, .nodesOfInterest = nodesOfInterest, .logger = logger, .globalMaintainenceMilliseconds = globalMaintainenceMilliseconds, .localMaintainenceMilliseconds = localMaintainenceMilliseconds, .allocator = alloc, .rand = rand })); Identity_set(janitor); janitor->timeOfNextGlobalMaintainence = Time_currentTimeMilliseconds(eventBase); janitor->timeout = Timeout_setTimeout(maintanenceCycle, janitor, localMaintainenceMilliseconds, eventBase, alloc); return janitor; }