123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394 |
- /* vim: set expandtab ts=4 sw=4: */
- /*
- * You may redistribute this program and/or modify it under the terms of
- * the GNU General Public License as published by the Free Software Foundation,
- * either version 3 of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
- #include "crypto/random/Random.h"
- #include "dht/Address.h"
- #include "dht/dhtcore/Janitor.h"
- #include "dht/dhtcore/Node.h"
- #include "dht/dhtcore/NodeList.h"
- #include "dht/dhtcore/RumorMill.h"
- #include "dht/dhtcore/RouterModule.h"
- #include "dht/dhtcore/SearchRunner.h"
- #include "dht/dhtcore/ReplySerializer.h"
- #include "benc/Object.h"
- #include "memory/Allocator.h"
- #include "util/AddrTools.h"
- #include "util/AverageRoller.h"
- #include "util/Bits.h"
- #include "util/events/EventBase.h"
- #include "util/Hex.h"
- #include "util/events/Timeout.h"
- #include "util/events/Time.h"
- #include "util/platform/libc/string.h"
- #include <stdint.h>
- #include <stdbool.h>
- /**
- * The goal of this is to run searches in the local area of this node.
- * it searches for hashes every localMaintainenceSearchPeriod milliseconds.
- * it runs searches by picking hashes at random, if a hash is chosen and there is a
- * non-zero-reach node which services that space, it stops. This way it will run many
- * searches early on but as the number of known nodes increases, it begins to taper off.
- */
- struct Janitor
- {
- struct RouterModule* routerModule;
- struct NodeStore* nodeStore;
- struct SearchRunner* searchRunner;
- struct RumorMill* rumorMill;
- struct RumorMill* nodesOfInterest;
- struct Timeout* timeout;
- struct Log* logger;
- uint64_t globalMaintainenceMilliseconds;
- uint64_t timeOfNextGlobalMaintainence;
- uint64_t localMaintainenceMilliseconds;
- struct Allocator* allocator;
- uint64_t timeOfNextSearchRepeat;
- uint64_t searchRepeatMilliseconds;
- struct EventBase* eventBase;
- struct Random* rand;
- /** Number of concurrent searches taking place. */
- int searches;
- Identity
- };
- struct Janitor_Search
- {
- struct Janitor* janitor;
- struct Address best;
- uint8_t target[16];
- struct Allocator* alloc;
- Identity
- };
- static void responseCallback(struct RouterModule_Promise* promise,
- uint32_t lagMilliseconds,
- struct Address* from,
- Dict* result)
- {
- struct Janitor_Search* search = Identity_check((struct Janitor_Search*)promise->userData);
- if (from) {
- Bits_memcpyConst(&search->best, from, sizeof(struct Address));
- return;
- }
- search->janitor->searches--;
- if (!search->best.path) {
- Log_debug(search->janitor->logger, "Search completed with no nodes found");
- }
- Allocator_free(search->alloc);
- }
- static void search(uint8_t target[16], struct Janitor* janitor)
- {
- if (janitor->searches >= 20) {
- Log_debug(janitor->logger, "Skipping search because 20 are in progress");
- return;
- }
- #ifdef Log_DEBUG
- uint8_t targetStr[40];
- AddrTools_printIp(targetStr, target);
- Log_debug(janitor->logger, "Beginning search for [%s]", targetStr);
- #endif
- struct Allocator* searchAlloc = Allocator_child(janitor->allocator);
- struct RouterModule_Promise* rp =
- SearchRunner_search(target, janitor->searchRunner, searchAlloc);
- if (!rp) {
- Log_debug(janitor->logger, "SearchRunner_search() returned NULL, probably full.");
- return;
- }
- janitor->searches++;
- struct Janitor_Search* search = Allocator_clone(rp->alloc, (&(struct Janitor_Search) {
- .janitor = janitor,
- .alloc = searchAlloc,
- }));
- Identity_set(search);
- Bits_memcpyConst(search->target, target, 16);
- rp->callback = responseCallback;
- rp->userData = search;
- }
- static void searchNoDupe(uint8_t target[Address_SEARCH_TARGET_SIZE], struct Janitor* janitor)
- {
- // See if we're already searching for this address.
- struct Allocator* seachListAlloc = Allocator_child(janitor->allocator);
- struct SearchRunner_SearchData* searchData;
- for (int i = 0; i < SearchRunner_DEFAULT_MAX_CONCURRENT_SEARCHES; i++) {
- searchData = SearchRunner_showActiveSearch(janitor->searchRunner,
- i,
- seachListAlloc);
- if (!searchData) { continue; }
- if (!Bits_memcmp(searchData->target, target, Address_SEARCH_TARGET_SIZE)) {
- // Already have a search going for this address, so nothing to do.
- Allocator_free(seachListAlloc);
- return;
- }
- }
- Allocator_free(seachListAlloc);
- // There's no search running for this address, so we start one.
- search(target, janitor);
- #ifdef Log_DEBUG
- uint8_t addrStr[40];
- AddrTools_printIp(addrStr, target);
- Log_debug(janitor->logger, "No active search for [%s], starting one.", addrStr);
- #endif
- }
- /**
- * For a Distributed Hash Table to work, each node must know a valid next hop in the search,
- * if such a thing exists.
- *
- * In a Kademlia DHT, can be done by organizing nodes into k-buckets. These are collections
- * of k nodes for which the first n bits of your node IDs match. Among other things, k-buckets
- * allow a node to identify holes in their lookup table and fill them. If the nth bucket is empty,
- * it means your node knows of no valid next hop for any key matching the first n bits of your
- * address and differing in bit n+1.
- *
- * Without going to the trouble of organizing nodes in the buckets, this function iterates
- * bitwise over keyspace, to identify the same kind of routing holes.
- * It then dispatches a search for the first (largest) such hole in keyspace that it finds.
- */
- static void plugLargestKeyspaceHole(struct Janitor* janitor)
- {
- struct Address addr = *janitor->nodeStore->selfAddress;
- // Get furthest possible address
- for (int i = 1 ; i < Address_SEARCH_TARGET_SIZE ; i++) {
- // Leaving [0] alone keeps the 0xfc around, so it's a valid cjdns address.
- addr.ip6.bytes[i] = ~(addr.ip6.bytes[i]);
- }
- int byte = 0;
- int bit = 0;
- for (int i = 0; i < 128 ; i++) {
- if (63 < i && i < 72) {
- // We want to leave the 0xfc alone
- continue;
- }
- if (i < 64) { byte = 8 + (i/8); }
- else { byte = (i/8) - 8; }
- bit = (i % 8);
- addr.ip6.bytes[byte] = addr.ip6.bytes[byte] ^ (0x01 << bit);
- struct Node_Two* n = RouterModule_lookup(addr.ip6.bytes, janitor->routerModule);
- if (!n) {
- // We found a hole! Exit loop and let the search trigger.
- break;
- }
- }
- searchNoDupe(addr.ip6.bytes, janitor);
- }
- static void peersResponseCallback(struct RouterModule_Promise* promise,
- uint32_t lagMilliseconds,
- struct Address* from,
- Dict* result)
- {
- struct Janitor* janitor = Identity_check((struct Janitor*)promise->userData);
- if (!from) { return; }
- struct Node_Link* fromLink = NodeStore_linkForPath(janitor->nodeStore, from->path);
- Assert_true(fromLink);
- struct Address_List* addresses =
- ReplySerializer_parse(from,
- fromLink->child->encodingScheme,
- fromLink->inverseLinkEncodingFormNumber,
- result,
- janitor->logger,
- promise->alloc);
- for (int i = 0; addresses && i < addresses->length; i++) {
- if (!NodeStore_linkForPath(janitor->nodeStore, addresses->elems[i].path)) {
- RumorMill_addNode(janitor->rumorMill, &addresses->elems[i]);
- }
- }
- }
- static void checkPeers(struct Janitor* janitor, struct Node_Two* n)
- {
- // Lets check for non-one-hop links at each node along the path between us and this node.
- uint32_t i = 0;
- for (;;i++) {
- struct Node_Link* link =
- NodeStore_getLinkOnPath(janitor->nodeStore, n->address.path, i);
- if (!link) { return; }
- if (link->parent == janitor->nodeStore->selfNode) { continue; }
- int count = NodeStore_linkCount(link->child);
- for (int j = 0; j < count; j++) {
- struct Node_Link* l = NodeStore_getLink(link->child, j);
- if (!Node_isOneHopLink(l) || link->parent->pathQuality == 0) {
- struct RouterModule_Promise* rp =
- RouterModule_getPeers(&link->parent->address, l->cannonicalLabel, 0,
- janitor->routerModule, janitor->allocator);
- rp->callback = peersResponseCallback;
- rp->userData = janitor;
- // Only send max 1 getPeers req per second.
- return;
- }
- }
- }
- }
- static void maintanenceCycle(void* vcontext)
- {
- struct Janitor* const janitor = Identity_check((struct Janitor*) vcontext);
- uint64_t now = Time_currentTimeMilliseconds(janitor->eventBase);
- uint64_t nextTimeout = (janitor->localMaintainenceMilliseconds / 2);
- nextTimeout += Random_uint32(janitor->rand) % (nextTimeout * 2);
- janitor->timeout = Timeout_setTimeout(maintanenceCycle,
- janitor,
- nextTimeout,
- janitor->eventBase,
- janitor->allocator);
- if (NodeStore_size(janitor->nodeStore) == 0 && janitor->rumorMill->count == 0) {
- if (now > janitor->timeOfNextGlobalMaintainence) {
- Log_warn(janitor->logger,
- "No nodes in routing table, check network connection and configuration.");
- janitor->timeOfNextGlobalMaintainence += janitor->globalMaintainenceMilliseconds;
- }
- return;
- }
- struct Address addr = { .protocolVersion = 0 };
- // ping a node from the ping queue
- if (RumorMill_getNode(janitor->rumorMill, &addr)) {
- addr.path = NodeStore_optimizePath(janitor->nodeStore, addr.path);
- if (NodeStore_optimizePath_INVALID != addr.path) {
- struct RouterModule_Promise* rp =
- RouterModule_getPeers(&addr,
- Random_uint32(janitor->rand),
- 0,
- janitor->routerModule,
- janitor->allocator);
- rp->callback = peersResponseCallback;
- rp->userData = janitor;
- #ifdef Log_DEBUG
- uint8_t addrStr[60];
- Address_print(addrStr, &addr);
- Log_debug(janitor->logger, "Pinging possible node [%s] from RumorMill", addrStr);
- #endif
- }
- }
- // This is good for DHT health. See function description.
- plugLargestKeyspaceHole(janitor);
- // Do something useful for a node we're actively trying to communicate with.
- if (RumorMill_getNode(janitor->nodesOfInterest, &addr)) {
- searchNoDupe(addr.ip6.bytes, janitor);
- }
- // random search
- Random_bytes(janitor->rand, addr.ip6.bytes, 16);
- // Make this a valid address.
- addr.ip6.bytes[0] = 0xfc;
- struct Node_Two* n = RouterModule_lookup(addr.ip6.bytes, janitor->routerModule);
- // If the best next node doesn't exist or has 0 reach, run a local maintenance search.
- if (n == NULL || n->pathQuality == 0) {
- search(addr.ip6.bytes, janitor);
- return;
- } else {
- checkPeers(janitor, n);
- }
- #ifdef Log_DEBUG
- int nonZeroNodes = NodeStore_nonZeroNodes(janitor->nodeStore);
- int total = NodeStore_size(janitor->nodeStore);
- Log_debug(janitor->logger,
- "Global Mean Response Time: %u non-zero nodes: [%d] zero nodes [%d] "
- "total [%d]",
- RouterModule_globalMeanResponseTime(janitor->routerModule),
- nonZeroNodes,
- (total - nonZeroNodes),
- total);
- #endif
- if (now > janitor->timeOfNextGlobalMaintainence) {
- search(addr.ip6.bytes, janitor);
- janitor->timeOfNextGlobalMaintainence += janitor->globalMaintainenceMilliseconds;
- }
- }
- struct Janitor* Janitor_new(uint64_t localMaintainenceMilliseconds,
- uint64_t globalMaintainenceMilliseconds,
- struct RouterModule* routerModule,
- struct NodeStore* nodeStore,
- struct SearchRunner* searchRunner,
- struct RumorMill* rumorMill,
- struct RumorMill* nodesOfInterest,
- struct Log* logger,
- struct Allocator* allocator,
- struct EventBase* eventBase,
- struct Random* rand)
- {
- struct Allocator* alloc = Allocator_child(allocator);
- struct Janitor* janitor = Allocator_clone(alloc, (&(struct Janitor) {
- .eventBase = eventBase,
- .routerModule = routerModule,
- .nodeStore = nodeStore,
- .searchRunner = searchRunner,
- .rumorMill = rumorMill,
- .nodesOfInterest = nodesOfInterest,
- .logger = logger,
- .globalMaintainenceMilliseconds = globalMaintainenceMilliseconds,
- .localMaintainenceMilliseconds = localMaintainenceMilliseconds,
- .allocator = alloc,
- .rand = rand
- }));
- Identity_set(janitor);
- janitor->timeOfNextGlobalMaintainence = Time_currentTimeMilliseconds(eventBase);
- janitor->timeout = Timeout_setTimeout(maintanenceCycle,
- janitor,
- localMaintainenceMilliseconds,
- eventBase,
- alloc);
- return janitor;
- }
|