SessionManager.c 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620
  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. */
  15. #include "memory/Allocator.h"
  16. #include "wire/PFChan.h"
  17. #include "net/SessionManager.h"
  18. #include "crypto/AddressCalc.h"
  19. #include "util/AddrTools.h"
  20. #include "wire/Error.h"
  21. #include "util/events/Time.h"
  22. #include "util/Defined.h"
  23. #include "wire/RouteHeader.h"
  24. #include "util/events/Timeout.h"
  25. /** Handle numbers 0-3 are reserved for CryptoAuth nonces. */
  26. #define MIN_FIRST_HANDLE 4
  27. #define MAX_FIRST_HANDLE 100000
  28. struct BufferedMessage
  29. {
  30. struct Message* msg;
  31. struct Allocator* alloc;
  32. uint64_t timeSentMilliseconds;
  33. };
  34. struct Ip6 {
  35. uint8_t bytes[16];
  36. };
  37. #define Map_KEY_TYPE struct Ip6
  38. #define Map_VALUE_TYPE struct BufferedMessage*
  39. #define Map_NAME BufferedMessages
  40. #include "util/Map.h"
  41. #define Map_KEY_TYPE struct Ip6
  42. #define Map_VALUE_TYPE struct SessionManager_Session_pvt*
  43. #define Map_NAME OfSessionsByIp6
  44. #define Map_ENABLE_HANDLES
  45. #include "util/Map.h"
  46. struct SessionManager_pvt
  47. {
  48. struct SessionManager pub;
  49. struct Iface eventIf;
  50. struct Allocator* alloc;
  51. struct Map_BufferedMessages bufMap;
  52. struct Map_OfSessionsByIp6 ifaceMap;
  53. struct Log* log;
  54. struct CryptoAuth* cryptoAuth;
  55. struct EventBase* eventBase;
  56. uint32_t firstHandle;
  57. Identity
  58. };
  59. struct SessionManager_Session_pvt
  60. {
  61. struct SessionManager_Session pub;
  62. struct SessionManager_pvt* sessionManager;
  63. struct Allocator* alloc;
  64. Identity
  65. };
  66. #define debugHandlesAndLabel(logger, session, label, message, ...) \
  67. do { \
  68. if (!Defined(Log_DEBUG)) { break; } \
  69. uint8_t path[20]; \
  70. AddrTools_printPath(path, label); \
  71. uint8_t ip[40]; \
  72. AddrTools_printIp(ip, session->pub.caSession->herIp6); \
  73. Log_debug(logger, "ver[%u] send[%d] recv[%u] ip[%s] path[%s] " message, \
  74. session->pub.version, \
  75. session->pub.sendHandle, \
  76. session->pub.receiveHandle, \
  77. ip, \
  78. path, \
  79. __VA_ARGS__); \
  80. } while (0)
  81. //CHECKFILES_IGNORE expecting a ;
  82. #define debugHandlesAndLabel0(logger, session, label, message) \
  83. debugHandlesAndLabel(logger, session, label, "%s", message)
  84. #define debugSession(logger, session, message, ...) \
  85. do { \
  86. if (!Defined(Log_DEBUG)) { break; } \
  87. uint8_t sendPath[20]; \
  88. uint8_t recvPath[20]; \
  89. uint8_t ip[40]; \
  90. AddrTools_printPath(sendPath, (session)->pub.sendSwitchLabel); \
  91. AddrTools_printPath(recvPath, (session)->pub.recvSwitchLabel); \
  92. AddrTools_printIp(ip, (session)->pub.caSession->herIp6); \
  93. Log_debug((logger), "Session sendPath[%s] recvPath[%s] ip[%s] " message, \
  94. sendPath, \
  95. recvPath, \
  96. ip, \
  97. __VA_ARGS__); \
  98. } while (0)
  99. //CHECKFILES_IGNORE ;
  100. #define debugSession0(logger, session, message) \
  101. debugSession(logger, session, "%s", message)
  102. static void sendSession(struct SessionManager_Session_pvt* sess,
  103. uint64_t path,
  104. uint32_t destPf,
  105. enum PFChan_Core ev)
  106. {
  107. struct PFChan_Node session = {
  108. .path_be = Endian_hostToBigEndian64(path),
  109. .metric_be = 0xffffffff,
  110. .version_be = Endian_hostToBigEndian32(sess->pub.version)
  111. };
  112. Bits_memcpy(session.ip6, sess->pub.caSession->herIp6, 16);
  113. Bits_memcpy(session.publicKey, sess->pub.caSession->herPublicKey, 32);
  114. struct Allocator* alloc = Allocator_child(sess->alloc);
  115. struct Message* msg = Message_new(0, PFChan_Node_SIZE + 512, alloc);
  116. Message_push(msg, &session, PFChan_Node_SIZE, NULL);
  117. Message_push32(msg, destPf, NULL);
  118. Message_push32(msg, ev, NULL);
  119. Iface_send(&sess->sessionManager->eventIf, msg);
  120. Allocator_free(alloc);
  121. }
  122. static inline void check(struct SessionManager_pvt* sm, int mapIndex)
  123. {
  124. Assert_true(sm->ifaceMap.keys[mapIndex].bytes[0] == 0xfc);
  125. uint8_t* herPubKey = sm->ifaceMap.values[mapIndex]->pub.caSession->herPublicKey;
  126. if (!Bits_isZero(herPubKey, 32)) {
  127. uint8_t ip6[16];
  128. AddressCalc_addressForPublicKey(ip6, herPubKey);
  129. Assert_true(!Bits_memcmp(&sm->ifaceMap.keys[mapIndex], ip6, 16));
  130. }
  131. }
  132. static inline struct SessionManager_Session_pvt* sessionForHandle(uint32_t handle,
  133. struct SessionManager_pvt* sm)
  134. {
  135. int index = Map_OfSessionsByIp6_indexForHandle(handle - sm->firstHandle, &sm->ifaceMap);
  136. if (index < 0) { return NULL; }
  137. check(sm, index);
  138. return Identity_check(sm->ifaceMap.values[index]);
  139. }
  140. struct SessionManager_Session* SessionManager_sessionForHandle(uint32_t handle,
  141. struct SessionManager* manager)
  142. {
  143. struct SessionManager_pvt* sm = Identity_check((struct SessionManager_pvt*) manager);
  144. return (struct SessionManager_Session*) sessionForHandle(handle, sm);
  145. }
  146. static inline struct SessionManager_Session_pvt* sessionForIp6(uint8_t ip6[16],
  147. struct SessionManager_pvt* sm)
  148. {
  149. int ifaceIndex = Map_OfSessionsByIp6_indexForKey((struct Ip6*)ip6, &sm->ifaceMap);
  150. if (ifaceIndex == -1) { return NULL; }
  151. check(sm, ifaceIndex);
  152. return Identity_check(sm->ifaceMap.values[ifaceIndex]);
  153. }
  154. struct SessionManager_Session* SessionManager_sessionForIp6(uint8_t* ip6,
  155. struct SessionManager* manager)
  156. {
  157. struct SessionManager_pvt* sm = Identity_check((struct SessionManager_pvt*) manager);
  158. return (struct SessionManager_Session*) sessionForIp6(ip6, sm);
  159. }
  160. struct SessionManager_HandleList* SessionManager_getHandleList(struct SessionManager* manager,
  161. struct Allocator* alloc)
  162. {
  163. struct SessionManager_pvt* sm = Identity_check((struct SessionManager_pvt*) manager);
  164. struct SessionManager_HandleList* out =
  165. Allocator_calloc(alloc, sizeof(struct SessionManager_HandleList), 1);
  166. uint32_t* buff = Allocator_calloc(alloc, 4, sm->ifaceMap.count);
  167. out->length = sm->ifaceMap.count;
  168. out->handles = buff;
  169. for (int i = 0; i < out->length; i++) {
  170. buff[i] = sm->ifaceMap.handles[i] + sm->firstHandle;
  171. }
  172. return out;
  173. }
  174. static struct SessionManager_Session_pvt* getSession(struct SessionManager_pvt* sm,
  175. uint8_t ip6[16],
  176. uint8_t pubKey[32],
  177. uint32_t version,
  178. uint64_t label)
  179. {
  180. struct SessionManager_Session_pvt* sess = sessionForIp6(ip6, sm);
  181. if (sess) {
  182. sess->pub.version = (sess->pub.version) ? sess->pub.version : version;
  183. sess->pub.sendSwitchLabel = (sess->pub.sendSwitchLabel) ? sess->pub.sendSwitchLabel : label;
  184. return sess;
  185. }
  186. struct Allocator* alloc = Allocator_child(sm->alloc);
  187. sess = Allocator_calloc(alloc, sizeof(struct SessionManager_Session_pvt), 1);
  188. Identity_set(sess);
  189. sess->pub.caSession = CryptoAuth_newSession(sm->cryptoAuth, alloc, pubKey, false, "inner");
  190. int ifaceIndex = Map_OfSessionsByIp6_put((struct Ip6*)ip6, &sess, &sm->ifaceMap);
  191. sess->pub.receiveHandle = sm->ifaceMap.handles[ifaceIndex] + sm->firstHandle;
  192. sess->alloc = alloc;
  193. sess->sessionManager = sm;
  194. sess->pub.version = version;
  195. sess->pub.timeOfLastIn = Time_currentTimeMilliseconds(sm->eventBase);
  196. sess->pub.timeOfLastOut = Time_currentTimeMilliseconds(sm->eventBase);
  197. sess->pub.sendSwitchLabel = label;
  198. //Allocator_onFree(alloc, sessionCleanup, sess);
  199. sendSession(sess, label, 0xffffffff, PFChan_Core_SESSION);
  200. check(sm, ifaceIndex);
  201. return sess;
  202. }
  203. static Iface_DEFUN incomingFromSwitchIf(struct Message* msg, struct Iface* iface)
  204. {
  205. struct SessionManager_pvt* sm =
  206. Identity_containerOf(iface, struct SessionManager_pvt, pub.switchIf);
  207. // SwitchHeader, handle, small cryptoAuth header
  208. if (msg->length < SwitchHeader_SIZE + 4 + 20) {
  209. // This is triggered by Benchmark.c so we really don't want to print log lines constantly.
  210. //Log_debug(sm->log, "DROP runt");
  211. return NULL;
  212. }
  213. struct SwitchHeader* switchHeader = (struct SwitchHeader*) msg->bytes;
  214. Message_shift(msg, -SwitchHeader_SIZE, NULL);
  215. struct SessionManager_Session_pvt* session;
  216. uint32_t nonceOrHandle = Endian_bigEndianToHost32(((uint32_t*)msg->bytes)[0]);
  217. if (nonceOrHandle > 3) {
  218. // > 3 it's a handle.
  219. session = sessionForHandle(nonceOrHandle, sm);
  220. if (!session) {
  221. Log_debug(sm->log, "DROP message with unrecognized handle");
  222. return NULL;
  223. }
  224. Message_shift(msg, -4, NULL);
  225. } else {
  226. // handle + big cryptoauth header
  227. if (msg->length < CryptoHeader_SIZE + 4) {
  228. Log_debug(sm->log, "DROP runt");
  229. return NULL;
  230. }
  231. struct CryptoHeader* caHeader = (struct CryptoHeader*) msg->bytes;
  232. uint8_t ip6[16];
  233. // a packet which claims to be "from us" causes problems
  234. if (!AddressCalc_addressForPublicKey(ip6, caHeader->publicKey)) {
  235. Log_debug(sm->log, "DROP Handshake with non-fc key");
  236. return NULL;
  237. }
  238. if (!Bits_memcmp(caHeader->publicKey, sm->cryptoAuth->publicKey, 32)) {
  239. Log_debug(sm->log, "DROP Handshake from 'ourselves'");
  240. return NULL;
  241. }
  242. uint64_t label = Endian_bigEndianToHost64(switchHeader->label_be);
  243. session = getSession(sm, ip6, caHeader->publicKey, 0, label);
  244. CryptoAuth_resetIfTimeout(session->pub.caSession);
  245. debugHandlesAndLabel(sm->log, session, label, "new session nonce[%d]", nonceOrHandle);
  246. }
  247. if (CryptoAuth_decrypt(session->pub.caSession, msg)) {
  248. debugHandlesAndLabel(sm->log, session,
  249. Endian_bigEndianToHost64(switchHeader->label_be),
  250. "DROP Failed decrypting message NoH[%d] state[%s]",
  251. nonceOrHandle,
  252. CryptoAuth_stateString(CryptoAuth_getState(session->pub.caSession)));
  253. return NULL;
  254. }
  255. session->pub.timeOfLastIn = Time_currentTimeMilliseconds(sm->eventBase);
  256. session->pub.bytesIn += msg->length;
  257. bool currentMessageSetup = (nonceOrHandle <= 3);
  258. if (currentMessageSetup) {
  259. session->pub.sendHandle = Message_pop32(msg, NULL);
  260. }
  261. Message_shift(msg, RouteHeader_SIZE, NULL);
  262. struct RouteHeader* header = (struct RouteHeader*) msg->bytes;
  263. if (currentMessageSetup) {
  264. Bits_memcpy(&header->sh, switchHeader, SwitchHeader_SIZE);
  265. debugHandlesAndLabel0(sm->log,
  266. session,
  267. Endian_bigEndianToHost64(switchHeader->label_be),
  268. "received start message");
  269. } else {
  270. // RouteHeader is laid out such that no copy of switch header should be needed.
  271. Assert_true(&header->sh == switchHeader);
  272. if (0) { // noisey
  273. debugHandlesAndLabel0(sm->log,
  274. session,
  275. Endian_bigEndianToHost64(switchHeader->label_be),
  276. "received run message");
  277. }
  278. }
  279. header->version_be = Endian_hostToBigEndian32(session->pub.version);
  280. Bits_memcpy(header->ip6, session->pub.caSession->herIp6, 16);
  281. Bits_memcpy(header->publicKey, session->pub.caSession->herPublicKey, 32);
  282. uint64_t path = Endian_bigEndianToHost64(switchHeader->label_be);
  283. if (!session->pub.sendSwitchLabel) {
  284. session->pub.sendSwitchLabel = path;
  285. }
  286. if (path != session->pub.recvSwitchLabel) {
  287. session->pub.recvSwitchLabel = path;
  288. sendSession(session, path, 0xffffffff, PFChan_Core_DISCOVERED_PATH);
  289. }
  290. return Iface_next(&sm->pub.insideIf, msg);
  291. }
  292. static void checkTimedOutBuffers(struct SessionManager_pvt* sm)
  293. {
  294. for (int i = 0; i < (int)sm->bufMap.count; i++) {
  295. struct BufferedMessage* buffered = sm->bufMap.values[i];
  296. int64_t lag = Time_currentTimeMilliseconds(sm->eventBase) - buffered->timeSentMilliseconds;
  297. if (lag < 10000) { continue; }
  298. Map_BufferedMessages_remove(i, &sm->bufMap);
  299. Allocator_free(buffered->alloc);
  300. i--;
  301. }
  302. }
  303. static void triggerSearch(struct SessionManager_pvt* sm, uint8_t target[16])
  304. {
  305. struct Allocator* eventAlloc = Allocator_child(sm->alloc);
  306. struct Message* eventMsg = Message_new(0, 512, eventAlloc);
  307. Message_push(eventMsg, target, 16, NULL);
  308. Message_push32(eventMsg, 0xffffffff, NULL);
  309. Message_push32(eventMsg, PFChan_Core_SEARCH_REQ, NULL);
  310. Iface_send(&sm->eventIf, eventMsg);
  311. Allocator_free(eventAlloc);
  312. }
  313. static void checkTimedOutSessions(struct SessionManager_pvt* sm)
  314. {
  315. bool searchTriggered = false;
  316. for (int i = 0; i < (int)sm->ifaceMap.count; i++) {
  317. struct SessionManager_Session_pvt* sess = sm->ifaceMap.values[i];
  318. int64_t now = Time_currentTimeMilliseconds(sm->eventBase);
  319. if (now - sess->pub.timeOfLastOut >= sm->pub.sessionIdleAfterMilliseconds &&
  320. now - sess->pub.timeOfLastIn >= sm->pub.sessionIdleAfterMilliseconds)
  321. {
  322. // Session is in idle state
  323. } else if (now - sess->pub.lastSearchTime >= sm->pub.sessionSearchAfterMilliseconds) {
  324. // Session is not in idle state and requires a search
  325. // But we're only going to trigger one search per cycle.
  326. if (searchTriggered) { continue; }
  327. debugSession0(sm->log, sess, "triggering search");
  328. triggerSearch(sm, sess->pub.caSession->herIp6);
  329. sess->pub.lastSearchTime = now;
  330. searchTriggered = true;
  331. continue;
  332. }
  333. // Session is in idle state or doesn't need a search right now, check if it's timed out.
  334. if (now - sess->pub.timeOfLastIn > sm->pub.sessionTimeoutMilliseconds) {
  335. debugSession0(sm->log, sess, "ended");
  336. sendSession(sess, sess->pub.sendSwitchLabel, 0xffffffff, PFChan_Core_SESSION_ENDED);
  337. Map_OfSessionsByIp6_remove(i, &sm->ifaceMap);
  338. Allocator_free(sess->alloc);
  339. i--;
  340. }
  341. }
  342. }
  343. static void periodically(void* vSessionManager)
  344. {
  345. struct SessionManager_pvt* sm = Identity_check((struct SessionManager_pvt*) vSessionManager);
  346. checkTimedOutSessions(sm);
  347. checkTimedOutBuffers(sm);
  348. }
  349. static void needsLookup(struct SessionManager_pvt* sm, struct Message* msg)
  350. {
  351. struct RouteHeader* header = (struct RouteHeader*) msg->bytes;
  352. if (Defined(Log_DEBUG)) {
  353. uint8_t ipStr[40];
  354. AddrTools_printIp(ipStr, header->ip6);
  355. Log_debug(sm->log, "Buffering a packet to [%s] and beginning a search", ipStr);
  356. }
  357. int index = Map_BufferedMessages_indexForKey((struct Ip6*)header->ip6, &sm->bufMap);
  358. if (index > -1) {
  359. struct BufferedMessage* buffered = sm->bufMap.values[index];
  360. Map_BufferedMessages_remove(index, &sm->bufMap);
  361. Allocator_free(buffered->alloc);
  362. Log_debug(sm->log, "DROP message which needs lookup because new one received");
  363. }
  364. if ((int)sm->bufMap.count >= sm->pub.maxBufferedMessages) {
  365. checkTimedOutBuffers(sm);
  366. if ((int)sm->bufMap.count >= sm->pub.maxBufferedMessages) {
  367. Log_debug(sm->log, "DROP message needing lookup maxBufferedMessages ([%d]) is reached",
  368. sm->pub.maxBufferedMessages);
  369. return;
  370. }
  371. }
  372. struct Allocator* lookupAlloc = Allocator_child(sm->alloc);
  373. struct BufferedMessage* buffered =
  374. Allocator_calloc(lookupAlloc, sizeof(struct BufferedMessage), 1);
  375. buffered->msg = msg;
  376. buffered->alloc = lookupAlloc;
  377. buffered->timeSentMilliseconds = Time_currentTimeMilliseconds(sm->eventBase);
  378. Allocator_adopt(lookupAlloc, msg->alloc);
  379. Assert_true(Map_BufferedMessages_put((struct Ip6*)header->ip6, &buffered, &sm->bufMap) > -1);
  380. triggerSearch(sm, header->ip6);
  381. }
  382. static Iface_DEFUN readyToSend(struct Message* msg,
  383. struct SessionManager_pvt* sm,
  384. struct SessionManager_Session_pvt* sess)
  385. {
  386. struct RouteHeader* header = (struct RouteHeader*) msg->bytes;
  387. Message_shift(msg, -RouteHeader_SIZE, NULL);
  388. struct SwitchHeader* sh;
  389. CryptoAuth_resetIfTimeout(sess->pub.caSession);
  390. if (CryptoAuth_getState(sess->pub.caSession) < CryptoAuth_HANDSHAKE3) {
  391. // Put the handle into the message so that it's authenticated.
  392. Message_push32(msg, sess->pub.receiveHandle, NULL);
  393. // Copy back the SwitchHeader so it is not clobbered.
  394. Message_shift(msg, (CryptoHeader_SIZE + SwitchHeader_SIZE), NULL);
  395. Bits_memcpy(msg->bytes, &header->sh, SwitchHeader_SIZE);
  396. sh = (struct SwitchHeader*) msg->bytes;
  397. Message_shift(msg, -(CryptoHeader_SIZE + SwitchHeader_SIZE), NULL);
  398. } else {
  399. sh = &header->sh;
  400. }
  401. // This pointer ceases to be useful.
  402. header = NULL;
  403. sess->pub.timeOfLastOut = Time_currentTimeMilliseconds(sm->eventBase);
  404. sess->pub.bytesOut += msg->length;
  405. Assert_true(!CryptoAuth_encrypt(sess->pub.caSession, msg));
  406. if (CryptoAuth_getState(sess->pub.caSession) >= CryptoAuth_HANDSHAKE3) {
  407. if (0) { // Noisy
  408. debugHandlesAndLabel0(sm->log,
  409. sess,
  410. Endian_bigEndianToHost64(sh->label_be),
  411. "sending run message");
  412. }
  413. Message_push32(msg, sess->pub.sendHandle, NULL);
  414. } else {
  415. debugHandlesAndLabel0(sm->log,
  416. sess,
  417. Endian_bigEndianToHost64(sh->label_be),
  418. "sending start message");
  419. }
  420. // The SwitchHeader should have been moved to the correct location.
  421. Message_shift(msg, SwitchHeader_SIZE, NULL);
  422. Assert_true((uint8_t*)sh == msg->bytes);
  423. return Iface_next(&sm->pub.switchIf, msg);
  424. }
  425. static Iface_DEFUN incomingFromInsideIf(struct Message* msg, struct Iface* iface)
  426. {
  427. struct SessionManager_pvt* sm =
  428. Identity_containerOf(iface, struct SessionManager_pvt, pub.insideIf);
  429. Assert_true(msg->length >= RouteHeader_SIZE);
  430. struct RouteHeader* header = (struct RouteHeader*) msg->bytes;
  431. struct SessionManager_Session_pvt* sess = sessionForIp6(header->ip6, sm);
  432. if (!sess) {
  433. if (!Bits_isZero(header->publicKey, 32) && header->version_be) {
  434. sess = getSession(sm,
  435. header->ip6,
  436. header->publicKey,
  437. Endian_bigEndianToHost32(header->version_be),
  438. Endian_bigEndianToHost64(header->sh.label_be));
  439. } else {
  440. needsLookup(sm, msg);
  441. return NULL;
  442. }
  443. }
  444. if (header->version_be) { sess->pub.version = Endian_bigEndianToHost32(header->version_be); }
  445. if (!sess->pub.version) {
  446. needsLookup(sm, msg);
  447. return NULL;
  448. }
  449. if (header->sh.label_be) {
  450. // fallthrough
  451. } else if (sess->pub.sendSwitchLabel) {
  452. header->sh.label_be = Endian_hostToBigEndian64(sess->pub.sendSwitchLabel);
  453. } else {
  454. needsLookup(sm, msg);
  455. return NULL;
  456. }
  457. return readyToSend(msg, sm, sess);
  458. }
  459. static Iface_DEFUN sessions(struct SessionManager_pvt* sm,
  460. uint32_t sourcePf,
  461. struct Allocator* tempAlloc)
  462. {
  463. for (int i = 0; i < (int)sm->ifaceMap.count; i++) {
  464. struct SessionManager_Session_pvt* sess = sm->ifaceMap.values[i];
  465. sendSession(sess, sess->pub.sendSwitchLabel, sourcePf, PFChan_Core_SESSION);
  466. }
  467. return NULL;
  468. }
  469. static Iface_DEFUN incomingFromEventIf(struct Message* msg, struct Iface* iface)
  470. {
  471. struct SessionManager_pvt* sm = Identity_containerOf(iface, struct SessionManager_pvt, eventIf);
  472. enum PFChan_Pathfinder ev = Message_pop32(msg, NULL);
  473. uint32_t sourcePf = Message_pop32(msg, NULL);
  474. if (ev == PFChan_Pathfinder_SESSIONS) {
  475. Assert_true(!msg->length);
  476. return sessions(sm, sourcePf, msg->alloc);
  477. }
  478. Assert_true(ev == PFChan_Pathfinder_NODE);
  479. struct PFChan_Node node;
  480. Message_pop(msg, &node, PFChan_Node_SIZE, NULL);
  481. Assert_true(!msg->length);
  482. int index = Map_BufferedMessages_indexForKey((struct Ip6*)node.ip6, &sm->bufMap);
  483. struct SessionManager_Session_pvt* sess;
  484. if (index == -1) {
  485. sess = sessionForIp6(node.ip6, sm);
  486. // If we discovered a node we're not interested in ...
  487. if (!sess) { return NULL; }
  488. if (node.metric_be == 0xffffffff) {
  489. // this is a broken path
  490. if (sess->pub.sendSwitchLabel == Endian_bigEndianToHost64(node.path_be)) {
  491. debugSession0(sm->log, sess, "broken path");
  492. if (sess->pub.sendSwitchLabel == sess->pub.recvSwitchLabel) {
  493. sess->pub.sendSwitchLabel = 0;
  494. } else {
  495. sess->pub.sendSwitchLabel = sess->pub.recvSwitchLabel;
  496. }
  497. }
  498. } else {
  499. sess->pub.sendSwitchLabel = Endian_bigEndianToHost64(node.path_be);
  500. sess->pub.version = Endian_bigEndianToHost32(node.version_be);
  501. debugSession0(sm->log, sess, "discovered path");
  502. }
  503. } else {
  504. sess = getSession(sm,
  505. node.ip6,
  506. node.publicKey,
  507. Endian_bigEndianToHost32(node.version_be),
  508. Endian_bigEndianToHost64(node.path_be));
  509. }
  510. // Send what's on the buffer...
  511. if (index > -1) {
  512. struct BufferedMessage* bm = sm->bufMap.values[index];
  513. Iface_CALL(readyToSend, bm->msg, sm, sess);
  514. Map_BufferedMessages_remove(index, &sm->bufMap);
  515. Allocator_free(bm->alloc);
  516. }
  517. return NULL;
  518. }
  519. struct SessionManager* SessionManager_new(struct Allocator* allocator,
  520. struct EventBase* eventBase,
  521. struct CryptoAuth* cryptoAuth,
  522. struct Random* rand,
  523. struct Log* log,
  524. struct EventEmitter* ee)
  525. {
  526. struct Allocator* alloc = Allocator_child(allocator);
  527. struct SessionManager_pvt* sm = Allocator_calloc(alloc, sizeof(struct SessionManager_pvt), 1);
  528. sm->alloc = alloc;
  529. sm->pub.switchIf.send = incomingFromSwitchIf;
  530. sm->pub.insideIf.send = incomingFromInsideIf;
  531. sm->bufMap.allocator = alloc;
  532. sm->ifaceMap.allocator = alloc;
  533. sm->log = log;
  534. sm->cryptoAuth = cryptoAuth;
  535. sm->eventBase = eventBase;
  536. sm->pub.sessionTimeoutMilliseconds = SessionManager_SESSION_TIMEOUT_MILLISECONDS_DEFAULT;
  537. sm->pub.maxBufferedMessages = SessionManager_MAX_BUFFERED_MESSAGES_DEFAULT;
  538. sm->pub.sessionIdleAfterMilliseconds = SessionManager_SESSION_IDLE_AFTER_MILLISECONDS_DEFAULT;
  539. sm->pub.sessionSearchAfterMilliseconds =
  540. SessionManager_SESSION_SEARCH_AFTER_MILLISECONDS_DEFAULT;
  541. sm->eventIf.send = incomingFromEventIf;
  542. EventEmitter_regCore(ee, &sm->eventIf, PFChan_Pathfinder_NODE);
  543. EventEmitter_regCore(ee, &sm->eventIf, PFChan_Pathfinder_SESSIONS);
  544. sm->firstHandle =
  545. (Random_uint32(rand) % (MAX_FIRST_HANDLE - MIN_FIRST_HANDLE)) + MIN_FIRST_HANDLE;
  546. Timeout_setInterval(periodically, sm, 10000, eventBase, alloc);
  547. Identity_set(sm);
  548. return &sm->pub;
  549. }