Pathfinder.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470
  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. */
  15. #include "dht/Pathfinder.h"
  16. #include "dht/DHTModule.h"
  17. #include "dht/Address.h"
  18. #include "wire/DataHeader.h"
  19. #include "wire/RouteHeader.h"
  20. #include "dht/ReplyModule.h"
  21. #include "dht/EncodingSchemeModule.h"
  22. #include "dht/SerializationModule.h"
  23. #include "dht/dhtcore/RouterModule.h"
  24. #include "dht/dhtcore/RouterModule_admin.h"
  25. #include "dht/dhtcore/RumorMill.h"
  26. #include "dht/dhtcore/SearchRunner.h"
  27. #include "dht/dhtcore/SearchRunner_admin.h"
  28. #include "dht/dhtcore/NodeStore_admin.h"
  29. #include "dht/dhtcore/Janitor_admin.h"
  30. #include "dht/dhtcore/Janitor.h"
  31. #include "dht/dhtcore/Router_new.h"
  32. #include "util/AddrTools.h"
  33. #include "wire/Error.h"
  34. #include "util/CString.h"
  35. ///////////////////// [ Address ][ content... ]
  36. #define RUMORMILL_CAPACITY 64
  37. struct Pathfinder_pvt
  38. {
  39. struct Pathfinder pub;
  40. struct Iface eventIf;
  41. struct DHTModule dhtModule;
  42. struct Allocator* alloc;
  43. struct Log* log;
  44. struct EventBase* base;
  45. struct Random* rand;
  46. struct Admin* admin;
  47. struct EventEmitter* ee;
  48. #define Pathfinder_pvt_state_INITIALIZING 0
  49. #define Pathfinder_pvt_state_RUNNING 1
  50. int state;
  51. // After begin connected, these fields will be filled.
  52. struct Address myAddr;
  53. struct DHTModuleRegistry* registry;
  54. struct NodeStore* nodeStore;
  55. struct Router* router;
  56. struct SearchRunner* searchRunner;
  57. struct RumorMill* rumorMill;
  58. struct Janitor* janitor;
  59. Identity
  60. };
  61. struct NodeStore* Pathfinder_getNodeStore(struct Pathfinder* pathfinder)
  62. {
  63. struct Pathfinder_pvt* pf = Identity_check((struct Pathfinder_pvt*) pathfinder);
  64. return pf->nodeStore;
  65. }
  66. static int incomingFromDHT(struct DHTMessage* dmessage, void* vpf)
  67. {
  68. struct Pathfinder_pvt* pf = Identity_check((struct Pathfinder_pvt*) vpf);
  69. struct Message* msg = dmessage->binMessage;
  70. struct Address* addr = dmessage->address;
  71. // Sanity check (make sure the addr was actually calculated)
  72. Assert_true(addr->ip6.bytes[0] == 0xfc);
  73. Message_shift(msg, PFChan_Msg_MIN_SIZE, NULL);
  74. struct PFChan_Msg* emsg = (struct PFChan_Msg*) msg->bytes;
  75. Bits_memset(emsg, 0, PFChan_Msg_MIN_SIZE);
  76. DataHeader_setVersion(&emsg->data, DataHeader_CURRENT_VERSION);
  77. DataHeader_setContentType(&emsg->data, ContentType_CJDHT);
  78. Bits_memcpyConst(emsg->route.ip6, addr->ip6.bytes, 16);
  79. emsg->route.version_be = Endian_hostToBigEndian32(addr->protocolVersion);
  80. emsg->route.sh.label_be = Endian_hostToBigEndian64(addr->path);
  81. Bits_memcpyConst(emsg->route.publicKey, addr->key, 32);
  82. Message_push32(msg, PFChan_Pathfinder_SENDMSG, NULL);
  83. if (dmessage->replyTo) {
  84. // see incomingMsg
  85. dmessage->replyTo->pleaseRespond = true;
  86. //Log_debug(pf->log, "send DHT reply");
  87. return 0;
  88. }
  89. //Log_debug(pf->log, "send DHT request");
  90. Iface_send(&pf->eventIf, msg);
  91. return 0;
  92. }
  93. static void nodeForAddress(struct PFChan_Node* nodeOut, struct Address* addr, uint32_t metric)
  94. {
  95. Bits_memset(nodeOut, 0, PFChan_Node_SIZE);
  96. nodeOut->version_be = Endian_hostToBigEndian32(addr->protocolVersion);
  97. nodeOut->metric_be = Endian_hostToBigEndian32(metric);
  98. nodeOut->path_be = Endian_hostToBigEndian64(addr->path);
  99. Bits_memcpyConst(nodeOut->publicKey, addr->key, 32);
  100. Bits_memcpyConst(nodeOut->ip6, addr->ip6.bytes, 16);
  101. }
  102. static Iface_DEFUN sendNode(struct Message* msg,
  103. struct Address* addr,
  104. uint32_t metric,
  105. struct Pathfinder_pvt* pf)
  106. {
  107. Message_reset(msg);
  108. Message_shift(msg, PFChan_Node_SIZE, NULL);
  109. nodeForAddress((struct PFChan_Node*) msg->bytes, addr, metric);
  110. if (addr->path == UINT64_MAX) {
  111. ((struct PFChan_Node*) msg->bytes)->path_be = 0;
  112. }
  113. Message_push32(msg, PFChan_Pathfinder_NODE, NULL);
  114. return Iface_next(&pf->eventIf, msg);
  115. }
  116. static void onBestPathChange(void* vPathfinder, struct Node_Two* node)
  117. {
  118. struct Pathfinder_pvt* pf = Identity_check((struct Pathfinder_pvt*) vPathfinder);
  119. struct Allocator* alloc = Allocator_child(pf->alloc);
  120. struct Message* msg = Message_new(0, 256, alloc);
  121. Iface_CALL(sendNode, msg, &node->address, 0xffffffffu - Node_getReach(node), pf);
  122. Allocator_free(alloc);
  123. }
  124. static Iface_DEFUN connected(struct Pathfinder_pvt* pf, struct Message* msg)
  125. {
  126. Log_debug(pf->log, "INIT");
  127. struct PFChan_Core_Connect conn;
  128. Message_pop(msg, &conn, PFChan_Core_Connect_SIZE, NULL);
  129. Assert_true(!msg->length);
  130. Bits_memcpyConst(pf->myAddr.key, conn.publicKey, 32);
  131. Address_getPrefix(&pf->myAddr);
  132. pf->myAddr.path = 1;
  133. // begin
  134. pf->registry = DHTModuleRegistry_new(pf->alloc);
  135. ReplyModule_register(pf->registry, pf->alloc);
  136. pf->rumorMill = RumorMill_new(pf->alloc, &pf->myAddr, RUMORMILL_CAPACITY, pf->log, "extern");
  137. pf->nodeStore = NodeStore_new(&pf->myAddr, pf->alloc, pf->base, pf->log, pf->rumorMill);
  138. pf->nodeStore->onBestPathChange = onBestPathChange;
  139. pf->nodeStore->onBestPathChangeCtx = pf;
  140. struct RouterModule* routerModule = RouterModule_register(pf->registry,
  141. pf->alloc,
  142. pf->myAddr.key,
  143. pf->base,
  144. pf->log,
  145. pf->rand,
  146. pf->nodeStore);
  147. pf->searchRunner = SearchRunner_new(pf->nodeStore,
  148. pf->log,
  149. pf->base,
  150. routerModule,
  151. pf->myAddr.ip6.bytes,
  152. pf->rumorMill,
  153. pf->alloc);
  154. pf->janitor = Janitor_new(routerModule,
  155. pf->nodeStore,
  156. pf->searchRunner,
  157. pf->rumorMill,
  158. pf->log,
  159. pf->alloc,
  160. pf->base,
  161. pf->rand);
  162. EncodingSchemeModule_register(pf->registry, pf->log, pf->alloc);
  163. SerializationModule_register(pf->registry, pf->log, pf->alloc);
  164. DHTModuleRegistry_register(&pf->dhtModule, pf->registry);
  165. pf->router = Router_new(routerModule, pf->nodeStore, pf->searchRunner, pf->alloc);
  166. // Now the admin stuff...
  167. if (pf->admin) {
  168. NodeStore_admin_register(pf->nodeStore, pf->admin, pf->alloc);
  169. RouterModule_admin_register(routerModule, pf->router, pf->admin, pf->alloc);
  170. SearchRunner_admin_register(pf->searchRunner, pf->admin, pf->alloc);
  171. Janitor_admin_register(pf->janitor, pf->admin, pf->alloc);
  172. }
  173. pf->state = Pathfinder_pvt_state_RUNNING;
  174. return NULL;
  175. }
  176. static void addressForNode(struct Address* addrOut, struct Message* msg)
  177. {
  178. struct PFChan_Node node;
  179. Message_pop(msg, &node, PFChan_Node_SIZE, NULL);
  180. Assert_true(!msg->length);
  181. addrOut->protocolVersion = Endian_bigEndianToHost32(node.version_be);
  182. addrOut->path = Endian_bigEndianToHost64(node.path_be);
  183. Bits_memcpyConst(addrOut->key, node.publicKey, 32);
  184. Bits_memcpyConst(addrOut->ip6.bytes, node.ip6, 16);
  185. }
  186. static Iface_DEFUN switchErr(struct Message* msg, struct Pathfinder_pvt* pf)
  187. {
  188. struct PFChan_Core_SwitchErr switchErr;
  189. Message_pop(msg, &switchErr, PFChan_Core_SwitchErr_MIN_SIZE, NULL);
  190. uint64_t path = Endian_bigEndianToHost64(switchErr.sh.label_be);
  191. uint64_t pathAtErrorHop = Endian_bigEndianToHost64(switchErr.ctrlErr.cause.label_be);
  192. uint8_t pathStr[20];
  193. AddrTools_printPath(pathStr, path);
  194. int err = Endian_bigEndianToHost32(switchErr.ctrlErr.errorType_be);
  195. Log_debug(pf->log, "switch err from [%s] type [%s][%d]", pathStr, Error_strerror(err), err);
  196. struct Node_Link* link = NodeStore_linkForPath(pf->nodeStore, path);
  197. uint8_t nodeAddr[16];
  198. if (link) {
  199. Bits_memcpyConst(nodeAddr, link->child->address.ip6.bytes, 16);
  200. }
  201. NodeStore_brokenLink(pf->nodeStore, path, pathAtErrorHop);
  202. if (link) {
  203. // Don't touch the node again, it might be a dangling pointer
  204. SearchRunner_search(nodeAddr, 20, 3, pf->searchRunner, pf->alloc);
  205. }
  206. return NULL;
  207. }
  208. static Iface_DEFUN searchReq(struct Message* msg, struct Pathfinder_pvt* pf)
  209. {
  210. uint8_t addr[16];
  211. Message_pop(msg, addr, 16, NULL);
  212. Assert_true(!msg->length);
  213. uint8_t printedAddr[40];
  214. AddrTools_printIp(printedAddr, addr);
  215. Log_debug(pf->log, "Search req [%s]", printedAddr);
  216. SearchRunner_search(addr, 20, 3, pf->searchRunner, pf->alloc);
  217. return NULL;
  218. }
  219. static Iface_DEFUN peer(struct Message* msg, struct Pathfinder_pvt* pf)
  220. {
  221. struct Address addr;
  222. addressForNode(&addr, msg);
  223. String* str = Address_toString(&addr, msg->alloc);
  224. Log_debug(pf->log, "Peer [%s]", str->bytes);
  225. struct Node_Link* link = NodeStore_linkForPath(pf->nodeStore, addr.path);
  226. // It exists, it's parent is the self-node, and it's label is equal to the switchLabel.
  227. if (link
  228. && Node_getBestParent(link->child)
  229. && Node_getBestParent(link->child)->parent->address.path == 1
  230. && Node_getBestParent(link->child)->cannonicalLabel == addr.path)
  231. {
  232. return NULL;
  233. }
  234. //RumorMill_addNode(pf->rumorMill, &addr);
  235. Router_sendGetPeers(pf->router, &addr, 0, 0, pf->alloc);
  236. return sendNode(msg, &addr, 0xffffff00, pf);
  237. }
  238. static Iface_DEFUN peerGone(struct Message* msg, struct Pathfinder_pvt* pf)
  239. {
  240. struct Address addr;
  241. addressForNode(&addr, msg);
  242. String* str = Address_toString(&addr, msg->alloc);
  243. Log_debug(pf->log, "Peer gone [%s]", str->bytes);
  244. NodeStore_disconnectedPeer(pf->nodeStore, addr.path);
  245. // We notify about the node but with max metric so it will be removed soon.
  246. return sendNode(msg, &addr, 0xffffffff, pf);
  247. }
  248. static Iface_DEFUN session(struct Message* msg, struct Pathfinder_pvt* pf)
  249. {
  250. struct Address addr;
  251. addressForNode(&addr, msg);
  252. String* str = Address_toString(&addr, msg->alloc);
  253. Log_debug(pf->log, "Session [%s]", str->bytes);
  254. struct Node_Two* node = NodeStore_nodeForAddr(pf->nodeStore, addr.ip6.bytes);
  255. if (!node) {
  256. SearchRunner_search(addr.ip6.bytes, 20, 3, pf->searchRunner, pf->alloc);
  257. }
  258. return NULL;
  259. }
  260. static Iface_DEFUN sessionEnded(struct Message* msg, struct Pathfinder_pvt* pf)
  261. {
  262. struct Address addr;
  263. addressForNode(&addr, msg);
  264. String* str = Address_toString(&addr, msg->alloc);
  265. Log_debug(pf->log, "Session ended [%s]", str->bytes);
  266. return NULL;
  267. }
  268. static Iface_DEFUN discoveredPath(struct Message* msg, struct Pathfinder_pvt* pf)
  269. {
  270. struct Address addr;
  271. addressForNode(&addr, msg);
  272. // We're somehow aware of this path (even if it's unused)
  273. if (NodeStore_linkForPath(pf->nodeStore, addr.path)) { return NULL; }
  274. // Our best path is "shorter" (label bits which is somewhat representitive of hop count)
  275. // basically this is just to dampen the flood to the RM because otherwise it prevents Janitor
  276. // from getting any actual work done.
  277. struct Node_Two* nn = NodeStore_nodeForAddr(pf->nodeStore, addr.ip6.bytes);
  278. if (nn && nn->address.path < addr.path) { return NULL; }
  279. // In order to avoid making the janitor ping endless nodes which are of no value, we will
  280. // filter out anything which has a longer path than the average of nodes in better or equal
  281. // k-buckets.
  282. if (!nn) {
  283. int ctr = 0;
  284. int totalLog2Path = 0;
  285. uint32_t selfPrefix = Address_getPrefix(&pf->myAddr);
  286. int ourKDistLog = Bits_log2x32(Address_getPrefix(&addr) ^ selfPrefix);
  287. for (;;) {
  288. nn = NodeStore_getNextNode(pf->nodeStore, nn);
  289. if (!nn) { break; }
  290. if (Bits_log2x32(Address_getPrefix(&nn->address) ^ selfPrefix) <= ourKDistLog) {
  291. ctr++;
  292. totalLog2Path += Bits_log2x64(nn->address.path);
  293. }
  294. }
  295. if (ctr && (totalLog2Path / ctr) <= Bits_log2x64(addr.path)) { return NULL; }
  296. }
  297. Log_debug(pf->log, "Discovered path [%s]", Address_toString(&addr, msg->alloc)->bytes);
  298. RumorMill_addNode(pf->rumorMill, &addr);
  299. return NULL;
  300. }
  301. static Iface_DEFUN handlePing(struct Message* msg, struct Pathfinder_pvt* pf)
  302. {
  303. Log_debug(pf->log, "Received ping");
  304. Message_push32(msg, PFChan_Pathfinder_PONG, NULL);
  305. return Iface_next(&pf->eventIf, msg);
  306. }
  307. static Iface_DEFUN handlePong(struct Message* msg, struct Pathfinder_pvt* pf)
  308. {
  309. Log_debug(pf->log, "Received pong");
  310. return NULL;
  311. }
  312. static Iface_DEFUN incomingMsg(struct Message* msg, struct Pathfinder_pvt* pf)
  313. {
  314. struct Address addr;
  315. struct RouteHeader* hdr = (struct RouteHeader*) msg->bytes;
  316. Message_shift(msg, -(RouteHeader_SIZE + DataHeader_SIZE), NULL);
  317. Bits_memcpyConst(addr.ip6.bytes, hdr->ip6, 16);
  318. Bits_memcpyConst(addr.key, hdr->publicKey, 32);
  319. int version = addr.protocolVersion = Endian_bigEndianToHost32(hdr->version_be);
  320. addr.padding = 0;
  321. addr.path = Endian_bigEndianToHost64(hdr->sh.label_be);
  322. //Log_debug(pf->log, "Incoming DHT");
  323. struct DHTMessage dht = {
  324. .address = &addr,
  325. .binMessage = msg,
  326. .allocator = msg->alloc
  327. };
  328. DHTModuleRegistry_handleIncoming(&dht, pf->registry);
  329. if (dht.pleaseRespond) {
  330. // what a beautiful hack, see incomingFromDHT
  331. return Iface_next(&pf->eventIf, msg);
  332. } else if (!version && addr.protocolVersion) {
  333. return sendNode(msg, &addr, 0xfffffff0, pf);
  334. }
  335. return NULL;
  336. }
  337. static Iface_DEFUN incomingFromEventIf(struct Message* msg, struct Iface* eventIf)
  338. {
  339. struct Pathfinder_pvt* pf = Identity_containerOf(eventIf, struct Pathfinder_pvt, eventIf);
  340. enum PFChan_Core ev = Message_pop32(msg, NULL);
  341. if (Pathfinder_pvt_state_INITIALIZING == pf->state) {
  342. Assert_true(ev == PFChan_Core_CONNECT);
  343. return connected(pf, msg);
  344. }
  345. switch (ev) {
  346. case PFChan_Core_SWITCH_ERR: return switchErr(msg, pf);
  347. case PFChan_Core_SEARCH_REQ: return searchReq(msg, pf);
  348. case PFChan_Core_PEER: return peer(msg, pf);
  349. case PFChan_Core_PEER_GONE: return peerGone(msg, pf);
  350. case PFChan_Core_SESSION: return session(msg, pf);
  351. case PFChan_Core_SESSION_ENDED: return sessionEnded(msg, pf);
  352. case PFChan_Core_DISCOVERED_PATH: return discoveredPath(msg, pf);
  353. case PFChan_Core_MSG: return incomingMsg(msg, pf);
  354. case PFChan_Core_PING: return handlePing(msg, pf);
  355. case PFChan_Core_PONG: return handlePong(msg, pf);
  356. default:;
  357. }
  358. Assert_failure("unexpected event [%d]", ev);
  359. }
  360. static void sendEvent(struct Pathfinder_pvt* pf, enum PFChan_Pathfinder ev, void* data, int size)
  361. {
  362. struct Allocator* alloc = Allocator_child(pf->alloc);
  363. struct Message* msg = Message_new(0, 512+size, alloc);
  364. Message_push(msg, data, size, NULL);
  365. Message_push32(msg, ev, NULL);
  366. Iface_send(&pf->eventIf, msg);
  367. Allocator_free(alloc);
  368. }
  369. struct Pathfinder* Pathfinder_register(struct Allocator* alloc,
  370. struct Log* log,
  371. struct EventBase* base,
  372. struct Random* rand,
  373. struct Admin* admin,
  374. struct EventEmitter* ee)
  375. {
  376. struct Pathfinder_pvt* pf = Allocator_calloc(alloc, sizeof(struct Pathfinder_pvt), 1);
  377. pf->alloc = alloc;
  378. pf->log = log;
  379. pf->base = base;
  380. pf->rand = rand;
  381. pf->admin = admin;
  382. pf->ee = ee;
  383. Identity_set(pf);
  384. pf->eventIf.send = incomingFromEventIf;
  385. EventEmitter_regPathfinderIface(ee, &pf->eventIf);
  386. pf->dhtModule.context = pf;
  387. pf->dhtModule.handleOutgoing = incomingFromDHT;
  388. struct PFChan_Pathfinder_Connect conn = {
  389. .superiority_be = Endian_hostToBigEndian32(1),
  390. .version_be = Endian_hostToBigEndian32(Version_CURRENT_PROTOCOL)
  391. };
  392. CString_strncpy(conn.userAgent, "Cjdns internal pathfinder", 64);
  393. sendEvent(pf, PFChan_Pathfinder_CONNECT, &conn, PFChan_Pathfinder_Connect_SIZE);
  394. return &pf->pub;
  395. }