Pathfinder.c 18 KB

  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. */
  15. #include "crypto/AddressCalc.h"
  16. #include "dht/Pathfinder.h"
  17. #include "dht/DHTModule.h"
  18. #include "dht/Address.h"
  19. #include "wire/DataHeader.h"
  20. #include "wire/RouteHeader.h"
  21. #include "dht/ReplyModule.h"
  22. #include "dht/EncodingSchemeModule.h"
  23. #include "dht/Pathfinder_pvt.h"
  24. #include "dht/SerializationModule.h"
  25. #include "dht/dhtcore/RouterModule.h"
  26. #include "dht/dhtcore/RouterModule_admin.h"
  27. #include "dht/dhtcore/RumorMill.h"
  28. #include "dht/dhtcore/SearchRunner.h"
  29. #include "dht/dhtcore/SearchRunner_admin.h"
  30. #include "dht/dhtcore/NodeStore_admin.h"
  31. #include "dht/dhtcore/Janitor_admin.h"
  32. #include "dht/dhtcore/Janitor.h"
  33. #include "dht/dhtcore/Router_new.h"
  34. #include "util/AddrTools.h"
  35. #include "util/events/Timeout.h"
  36. #include "wire/Error.h"
  37. #include "wire/PFChan.h"
  38. #include "util/CString.h"
  39. #include "wire/Metric.h"
  40. ///////////////////// [ Address ][ content... ]
  41. #define RUMORMILL_CAPACITY 64
  42. struct Pathfinder_pvt
  43. {
  44. struct Pathfinder pub;
  45. struct DHTModule dhtModule;
  46. struct Allocator* alloc;
  47. struct Log* log;
  48. struct EventBase* base;
  49. struct Random* rand;
  50. struct Admin* admin;
  51. #define Pathfinder_pvt_state_INITIALIZING 0
  52. #define Pathfinder_pvt_state_RUNNING 1
  53. int state;
  54. int bestPathChanges;
  55. // After begin connected, these fields will be filled.
  56. struct Address myAddr;
  57. struct DHTModuleRegistry* registry;
  58. struct NodeStore* nodeStore;
  59. struct Router* router;
  60. struct SearchRunner* searchRunner;
  61. struct RumorMill* rumorMill;
  62. struct Janitor* janitor;
  63. Identity
  64. };
  65. struct NodeStore* Pathfinder_getNodeStore(struct Pathfinder* pathfinder)
  66. {
  67. struct Pathfinder_pvt* pf = Identity_check((struct Pathfinder_pvt*) pathfinder);
  68. return pf->nodeStore;
  69. }
  70. static int incomingFromDHT(struct DHTMessage* dmessage, void* vpf)
  71. {
  72. struct Pathfinder_pvt* pf = Identity_check((struct Pathfinder_pvt*) vpf);
  73. struct Message* msg = dmessage->binMessage;
  74. struct Address* addr = dmessage->address;
  75. if (addr->path == 1) {
  76. // Message to myself, can't handle this later because encrypting a message to yourself
  77. // causes problems.
  78. DHTModuleRegistry_handleIncoming(dmessage, pf->registry);
  79. return 0;
  80. }
  81. // This seems to be happening, this whole section of the code is deprecated so lets not
  82. // try to debug it too much and just squash the message.
  83. if (addr->path == 0xffffffffffffffffull) {
  84. return 0;
  85. }
  86. // Sanity check (make sure the addr was actually calculated)
  87. Assert_true(AddressCalc_validAddress(addr->ip6.bytes));
  88. Er_assert(Message_eshift(msg, PFChan_Msg_MIN_SIZE));
  89. struct PFChan_Msg* emsg = (struct PFChan_Msg*) msg->msgbytes;
  90. Bits_memset(emsg, 0, PFChan_Msg_MIN_SIZE);
  91. DataHeader_setVersion(&emsg->data, DataHeader_CURRENT_VERSION);
  92. DataHeader_setContentType(&emsg->data, ContentType_CJDHT);
  93. Bits_memcpy(emsg->route.ip6, addr->ip6.bytes, 16);
  94. emsg->route.version_be = Endian_hostToBigEndian32(addr->protocolVersion);
  95. emsg->route.sh.label_be = Endian_hostToBigEndian64(addr->path);
  96. emsg->route.flags |= RouteHeader_flags_PATHFINDER;
  97. SwitchHeader_setVersion(&emsg->route.sh, SwitchHeader_CURRENT_VERSION);
  98. Bits_memcpy(emsg->route.publicKey, addr->key, 32);
  99. Assert_true(!Bits_isZero(emsg->route.publicKey, 32));
  100. Assert_true(emsg->route.sh.label_be);
  101. Assert_true(emsg->route.sh.label_be != 0xffffffffffffffffull);
  102. Assert_true(emsg->route.version_be);
  103. Er_assert(Message_epush32be(msg, PFChan_Pathfinder_SENDMSG));
  104. if (dmessage->replyTo) {
  105. // see incomingMsg
  106. dmessage->replyTo->pleaseRespond = true;
  107. //Log_debug(pf->log, "send DHT reply");
  108. return 0;
  109. }
  110. //Log_debug(pf->log, "send DHT request");
  111. Iface_send(&pf->pub.eventIf, msg);
  112. return 0;
  113. }
  114. static void nodeForAddress(struct PFChan_Node* nodeOut, struct Address* addr, uint32_t metric)
  115. {
  116. Bits_memset(nodeOut, 0, PFChan_Node_SIZE);
  117. nodeOut->version_be = Endian_hostToBigEndian32(addr->protocolVersion);
  118. nodeOut->metric_be = Endian_hostToBigEndian32(metric);
  119. nodeOut->path_be = Endian_hostToBigEndian64(addr->path);
  120. Bits_memcpy(nodeOut->publicKey, addr->key, 32);
  121. Bits_memcpy(nodeOut->ip6, addr->ip6.bytes, 16);
  122. }
  123. static Iface_DEFUN sendNode(struct Message* msg,
  124. struct Address* addr,
  125. uint32_t metric,
  126. struct Pathfinder_pvt* pf)
  127. {
  128. Message_reset(msg);
  129. Er_assert(Message_eshift(msg, PFChan_Node_SIZE));
  130. nodeForAddress((struct PFChan_Node*) msg->msgbytes, addr, metric);
  131. if (addr->path == UINT64_MAX) {
  132. ((struct PFChan_Node*) msg->msgbytes)->path_be = 0;
  133. }
  134. Er_assert(Message_epush32be(msg, PFChan_Pathfinder_NODE));
  135. return Iface_next(&pf->pub.eventIf, msg);
  136. }
  137. static void onBestPathChange(void* vPathfinder, struct Node_Two* node)
  138. {
  139. struct Pathfinder_pvt* pf = Identity_check((struct Pathfinder_pvt*) vPathfinder);
  140. struct Allocator* alloc = Allocator_child(pf->alloc);
  141. if (pf->bestPathChanges > 128) {
  142. String* addrPrinted = Address_toString(&node->address, alloc);
  143. Log_debug(pf->log, "Ignore best path change from NodeStore [%s]", addrPrinted->bytes);
  144. } else {
  145. pf->bestPathChanges++;
  146. struct Message* msg = Message_new(0, 256, alloc);
  147. Iface_CALL(sendNode, msg, &node->address,
  148. (Node_getCost(node) & Metric_DHT_MASK) | Metric_DHT,
  149. pf);
  150. }
  151. Allocator_free(alloc);
  152. }
  153. static Iface_DEFUN connected(struct Pathfinder_pvt* pf, struct Message* msg)
  154. {
  155. Log_debug(pf->log, "INIT");
  156. struct PFChan_Core_Connect conn;
  157. Er_assert(Message_epop(msg, &conn, PFChan_Core_Connect_SIZE));
  158. Assert_true(!Message_getLength(msg));
  159. Bits_memcpy(pf->myAddr.key, conn.publicKey, 32);
  160. Address_getPrefix(&pf->myAddr);
  161. pf->myAddr.path = 1;
  162. // begin
  163. pf->registry = DHTModuleRegistry_new(pf->alloc, pf->log);
  164. ReplyModule_register(pf->registry, pf->alloc);
  165. pf->rumorMill = RumorMill_new(pf->alloc, &pf->myAddr, RUMORMILL_CAPACITY, pf->log, "extern");
  166. pf->nodeStore = NodeStore_new(&pf->myAddr, pf->alloc, pf->base, pf->log, pf->rumorMill);
  167. if (pf->pub.fullVerify) {
  168. NodeStore_setFullVerify(pf->nodeStore, true);
  169. }
  170. pf->nodeStore->onBestPathChange = onBestPathChange;
  171. pf->nodeStore->onBestPathChangeCtx = pf;
  172. struct RouterModule* routerModule = RouterModule_register(pf->registry,
  173. pf->alloc,
  174. pf->myAddr.key,
  175. pf->base,
  176. pf->log,
  177. pf->rand,
  178. pf->nodeStore);
  179. pf->searchRunner = SearchRunner_new(pf->nodeStore,
  180. pf->log,
  181. pf->base,
  182. routerModule,
  183. pf->myAddr.ip6.bytes,
  184. pf->rumorMill,
  185. pf->alloc);
  186. pf->janitor = Janitor_new(routerModule,
  187. pf->nodeStore,
  188. pf->searchRunner,
  189. pf->rumorMill,
  190. pf->log,
  191. pf->alloc,
  192. pf->base,
  193. pf->rand);
  194. EncodingSchemeModule_register(pf->registry, pf->log, pf->alloc);
  195. SerializationModule_register(pf->registry, pf->log, pf->alloc);
  196. DHTModuleRegistry_register(&pf->dhtModule, pf->registry);
  197. pf->router = Router_new(routerModule, pf->nodeStore, pf->searchRunner, pf->alloc);
  198. // Now the admin stuff...
  199. if (pf->admin) {
  200. NodeStore_admin_register(pf->nodeStore, pf->admin, pf->alloc);
  201. RouterModule_admin_register(routerModule, pf->router, pf->admin, pf->alloc);
  202. SearchRunner_admin_register(pf->searchRunner, pf->admin, pf->alloc);
  203. Janitor_admin_register(pf->janitor, pf->admin, pf->alloc);
  204. }
  205. pf->state = Pathfinder_pvt_state_RUNNING;
  206. return NULL;
  207. }
  208. static void addressForNode(struct Address* addrOut, struct Message* msg)
  209. {
  210. struct PFChan_Node node;
  211. Er_assert(Message_epop(msg, &node, PFChan_Node_SIZE));
  212. Assert_true(!Message_getLength(msg));
  213. addrOut->protocolVersion = Endian_bigEndianToHost32(node.version_be);
  214. addrOut->path = Endian_bigEndianToHost64(node.path_be);
  215. Bits_memcpy(addrOut->key, node.publicKey, 32);
  216. Bits_memcpy(addrOut->ip6.bytes, node.ip6, 16);
  217. }
  218. static Iface_DEFUN switchErr(struct Message* msg, struct Pathfinder_pvt* pf)
  219. {
  220. struct PFChan_Core_SwitchErr switchErr;
  221. Er_assert(Message_epop(msg, &switchErr, PFChan_Core_SwitchErr_MIN_SIZE));
  222. uint64_t path = Endian_bigEndianToHost64(switchErr.sh.label_be);
  223. uint64_t pathAtErrorHop = Endian_bigEndianToHost64(switchErr.ctrlErr.cause.label_be);
  224. uint8_t pathStr[20];
  225. AddrTools_printPath(pathStr, path);
  226. int err = Endian_bigEndianToHost32(switchErr.ctrlErr.errorType_be);
  227. Log_debug(pf->log, "switch err from [%s] type [%d]", pathStr, err);
  228. struct Node_Link* link = NodeStore_linkForPath(pf->nodeStore, path);
  229. uint8_t nodeAddr[16];
  230. if (link) {
  231. Bits_memcpy(nodeAddr, link->child->address.ip6.bytes, 16);
  232. }
  233. NodeStore_brokenLink(pf->nodeStore, path, pathAtErrorHop);
  234. if (link) {
  235. // Don't touch the node again, it might be a dangling pointer
  236. SearchRunner_search(nodeAddr, 20, 3, pf->searchRunner, pf->alloc);
  237. }
  238. return NULL;
  239. }
  240. static Iface_DEFUN searchReq(struct Message* msg, struct Pathfinder_pvt* pf)
  241. {
  242. uint8_t addr[16];
  243. Er_assert(Message_epop(msg, addr, 16));
  244. Er_assert(Message_epop32be(msg));
  245. uint32_t version = Er_assert(Message_epop32be(msg));
  246. if (version && version >= 20) { return NULL; }
  247. Assert_true(!Message_getLength(msg));
  248. uint8_t printedAddr[40];
  249. AddrTools_printIp(printedAddr, addr);
  250. Log_debug(pf->log, "Search req [%s]", printedAddr);
  251. struct Node_Two* node = NodeStore_nodeForAddr(pf->nodeStore, addr);
  252. if (node) {
  253. onBestPathChange(pf, node);
  254. } else {
  255. SearchRunner_search(addr, 20, 3, pf->searchRunner, pf->alloc);
  256. }
  257. return NULL;
  258. }
  259. static Iface_DEFUN peer(struct Message* msg, struct Pathfinder_pvt* pf)
  260. {
  261. struct Address addr = {0};
  262. addressForNode(&addr, msg);
  263. String* str = Address_toString(&addr, Message_getAlloc(msg));
  264. Log_debug(pf->log, "Peer [%s]", str->bytes);
  265. struct Node_Link* link = NodeStore_linkForPath(pf->nodeStore, addr.path);
  266. // It exists, it's parent is the self-node, and it's label is equal to the switchLabel.
  267. if (link
  268. && Node_getBestParent(link->child)
  269. && Node_getBestParent(link->child)->parent->address.path == 1
  270. && Node_getBestParent(link->child)->cannonicalLabel == addr.path)
  271. {
  272. return NULL;
  273. }
  274. //RumorMill_addNode(pf->rumorMill, &addr);
  275. Router_sendGetPeers(pf->router, &addr, 0, 0, pf->alloc);
  276. return sendNode(msg, &addr, Metric_DHT_PEER, pf);
  277. }
  278. static Iface_DEFUN peerGone(struct Message* msg, struct Pathfinder_pvt* pf)
  279. {
  280. struct Address addr = {0};
  281. addressForNode(&addr, msg);
  282. String* str = Address_toString(&addr, Message_getAlloc(msg));
  283. Log_debug(pf->log, "Peer gone [%s]", str->bytes);
  284. NodeStore_disconnectedPeer(pf->nodeStore, addr.path);
  285. // We notify about the node but with max metric so it will be removed soon.
  286. return sendNode(msg, &addr, Metric_DEAD_LINK, pf);
  287. }
  288. static Iface_DEFUN session(struct Message* msg, struct Pathfinder_pvt* pf)
  289. {
  290. struct Address addr = {0};
  291. addressForNode(&addr, msg);
  292. String* str = Address_toString(&addr, Message_getAlloc(msg));
  293. Log_debug(pf->log, "Session [%s]", str->bytes);
  294. /* This triggers for every little ping we send to some random node out there which
  295. * sucks too much to ever get into the nodeStore.
  296. struct Node_Two* node = NodeStore_nodeForAddr(pf->nodeStore, addr.ip6.bytes);
  297. if (!node) {
  298. SearchRunner_search(addr.ip6.bytes, 20, 3, pf->searchRunner, pf->alloc);
  299. }*/
  300. return NULL;
  301. }
  302. static Iface_DEFUN sessionEnded(struct Message* msg, struct Pathfinder_pvt* pf)
  303. {
  304. struct Address addr = {0};
  305. addressForNode(&addr, msg);
  306. String* str = Address_toString(&addr, Message_getAlloc(msg));
  307. Log_debug(pf->log, "Session ended [%s]", str->bytes);
  308. return NULL;
  309. }
  310. static Iface_DEFUN discoveredPath(struct Message* msg, struct Pathfinder_pvt* pf)
  311. {
  312. struct Address addr = {0};
  313. addressForNode(&addr, msg);
  314. // We're somehow aware of this path (even if it's unused)
  315. if (NodeStore_linkForPath(pf->nodeStore, addr.path)) { return NULL; }
  316. // If we don't already care about the destination, then don't do anything.
  317. struct Node_Two* nn = NodeStore_nodeForAddr(pf->nodeStore, addr.ip6.bytes);
  318. if (!nn) { return NULL; }
  319. // Our best path is "shorter" (label bits which is somewhat representitive of hop count)
  320. // basically this is just to dampen the flood to the RM because otherwise it prevents Janitor
  321. // from getting any actual work done.
  322. if (nn->address.path < addr.path) { return NULL; }
  323. addr.protocolVersion = nn->address.protocolVersion;
  324. Log_debug(pf->log, "Discovered path [%s]", Address_toString(&addr, Message_getAlloc(msg))->bytes);
  325. RumorMill_addNode(pf->rumorMill, &addr);
  326. return NULL;
  327. }
  328. static Iface_DEFUN handlePing(struct Message* msg, struct Pathfinder_pvt* pf)
  329. {
  330. Log_debug(pf->log, "Received ping");
  331. Er_assert(Message_epush32be(msg, PFChan_Pathfinder_PONG));
  332. return Iface_next(&pf->pub.eventIf, msg);
  333. }
  334. static Iface_DEFUN handlePong(struct Message* msg, struct Pathfinder_pvt* pf)
  335. {
  336. Log_debug(pf->log, "Received pong");
  337. return NULL;
  338. }
  339. static Iface_DEFUN incomingMsg(struct Message* msg, struct Pathfinder_pvt* pf)
  340. {
  341. struct Address addr = {0};
  342. struct RouteHeader* hdr = (struct RouteHeader*) msg->msgbytes;
  343. Er_assert(Message_eshift(msg, -(RouteHeader_SIZE + DataHeader_SIZE)));
  344. Bits_memcpy(addr.ip6.bytes, hdr->ip6, 16);
  345. Bits_memcpy(addr.key, hdr->publicKey, 32);
  346. addr.protocolVersion = Endian_bigEndianToHost32(hdr->version_be);
  347. addr.padding = 0;
  348. addr.path = Endian_bigEndianToHost64(hdr->sh.label_be);
  349. //Log_debug(pf->log, "Incoming DHT");
  350. struct DHTMessage dht = {
  351. .address = &addr,
  352. .binMessage = msg,
  353. .allocator = Message_getAlloc(msg)
  354. };
  355. DHTModuleRegistry_handleIncoming(&dht, pf->registry);
  356. struct Message* nodeMsg = Message_new(0, 256, Message_getAlloc(msg));
  357. Iface_CALL(sendNode, nodeMsg, &addr, Metric_DHT_INCOMING, pf);
  358. if (dht.pleaseRespond) {
  359. // what a beautiful hack, see incomingFromDHT
  360. return Iface_next(&pf->pub.eventIf, msg);
  361. }
  362. return NULL;
  363. }
  364. static Iface_DEFUN incomingFromEventIf(struct Message* msg, struct Iface* eventIf)
  365. {
  366. struct Pathfinder_pvt* pf = Identity_containerOf(eventIf, struct Pathfinder_pvt, pub.eventIf);
  367. enum PFChan_Core ev = Er_assert(Message_epop32be(msg));
  368. if (Pathfinder_pvt_state_INITIALIZING == pf->state) {
  369. Assert_true(ev == PFChan_Core_CONNECT);
  370. return connected(pf, msg);
  371. }
  372. // Let the PF send another 128 path changes again because it's basically a new tick.
  373. pf->bestPathChanges = 0;
  374. switch (ev) {
  375. case PFChan_Core_SWITCH_ERR: return switchErr(msg, pf);
  376. case PFChan_Core_SEARCH_REQ: return searchReq(msg, pf);
  377. case PFChan_Core_PEER: return peer(msg, pf);
  378. case PFChan_Core_PEER_GONE: return peerGone(msg, pf);
  379. case PFChan_Core_SESSION: return session(msg, pf);
  380. case PFChan_Core_SESSION_ENDED: return sessionEnded(msg, pf);
  381. case PFChan_Core_DISCOVERED_PATH: return discoveredPath(msg, pf);
  382. case PFChan_Core_MSG: return incomingMsg(msg, pf);
  383. case PFChan_Core_PING: return handlePing(msg, pf);
  384. case PFChan_Core_PONG: return handlePong(msg, pf);
  385. case PFChan_Core_UNSETUP_SESSION:
  386. case PFChan_Core_LINK_STATE:
  387. case PFChan_Core_CTRL_MSG: return NULL;
  388. default:;
  389. }
  390. Assert_failure("unexpected event [%d]", ev);
  391. }
  392. static void sendEvent(struct Pathfinder_pvt* pf, enum PFChan_Pathfinder ev, void* data, int size)
  393. {
  394. struct Allocator* alloc = Allocator_child(pf->alloc);
  395. struct Message* msg = Message_new(0, 512+size, alloc);
  396. Er_assert(Message_epush(msg, data, size));
  397. Er_assert(Message_epush32be(msg, ev));
  398. Iface_send(&pf->pub.eventIf, msg);
  399. Allocator_free(alloc);
  400. }
  401. static void init(void* vpf)
  402. {
  403. struct Pathfinder_pvt* pf = Identity_check((struct Pathfinder_pvt*) vpf);
  404. struct PFChan_Pathfinder_Connect conn = {
  405. .superiority_be = Endian_hostToBigEndian32(1),
  406. .version_be = Endian_hostToBigEndian32(Version_CURRENT_PROTOCOL)
  407. };
  408. CString_safeStrncpy(conn.userAgent, "Cjdns internal pathfinder", 64);
  409. sendEvent(pf, PFChan_Pathfinder_CONNECT, &conn, PFChan_Pathfinder_Connect_SIZE);
  410. }
  411. struct Pathfinder* Pathfinder_register(struct Allocator* allocator,
  412. struct Log* log,
  413. struct EventBase* base,
  414. struct Random* rand,
  415. struct Admin* admin)
  416. {
  417. struct Allocator* alloc = Allocator_child(allocator);
  418. struct Pathfinder_pvt* pf = Allocator_calloc(alloc, sizeof(struct Pathfinder_pvt), 1);
  419. Identity_set(pf);
  420. pf->alloc = alloc;
  421. pf->log = log;
  422. pf->base = base;
  423. pf->rand = rand;
  424. pf->admin = admin;
  425. pf->pub.eventIf.send = incomingFromEventIf;
  426. pf->dhtModule.context = pf;
  427. pf->dhtModule.handleOutgoing = incomingFromDHT;
  428. // This needs to be done asynchronously so the pf can be plumbed to the core
  429. Timeout_setTimeout(init, pf, 0, base, alloc);
  430. return &pf->pub;
  431. }