1
0

ReachabilityAnnouncer.c 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709
  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. */
  15. #include "subnode/ReachabilityAnnouncer.h"
  16. #include "util/events/Timeout.h"
  17. #include "util/Identity.h"
  18. #include "util/events/Time.h"
  19. #include "wire/Announce.h"
  20. #include "crypto/AddressCalc.h"
  21. #include "crypto/Sign.h"
  22. #include "util/AddrTools.h"
  23. #include "crypto_hash_sha512.h"
  24. // This is the time between the timestamp of the newest message and the point where
  25. // snode and subnode agree to drop messages from the snode state.
  26. #define AGREED_TIMEOUT_MS (1000 * 60 * 60 * 20)
  27. #define ArrayList_TYPE struct Message
  28. #define ArrayList_NAME OfMessages
  29. #include "util/ArrayList.h"
  30. #define ArrayList_TYPE struct Announce_Peer
  31. #define ArrayList_NAME OfPeers
  32. #include "util/ArrayList.h"
  33. // -- Generic Functions -- //
  34. static struct Announce_Peer* peerFromMsg(struct Message* msg, uint8_t ip[16])
  35. {
  36. if (!msg) { return NULL; }
  37. struct Announce_Peer* p = NULL;
  38. do {
  39. p = Announce_Peer_next(msg, p);
  40. if (p && !Bits_memcmp(p->ipv6, ip, 16)) { return p; }
  41. } while (p);
  42. return NULL;
  43. }
  44. static struct Announce_Peer* peerFromLocalState(struct ArrayList_OfPeers* localState,
  45. uint8_t addr[16])
  46. {
  47. for (int i = 0; i < localState->length; i++) {
  48. struct Announce_Peer* peer = ArrayList_OfPeers_get(localState, i);
  49. if (!Bits_memcmp(peer->ipv6, addr, 16)) { return peer; }
  50. }
  51. return NULL;
  52. }
  53. static int64_t timestampFromMsg(struct Message* msg)
  54. {
  55. struct Announce_Header* hdr = (struct Announce_Header*) msg->bytes;
  56. Assert_true(msg->length >= Announce_Header_SIZE);
  57. return Announce_Header_getTimestamp(hdr);
  58. }
  59. static struct Announce_Peer* peerFromSnodeState(struct ArrayList_OfMessages* snodeState,
  60. uint8_t ip[16],
  61. int64_t sinceTime)
  62. {
  63. for (int i = snodeState->length - 1; i >= 0; i--) {
  64. struct Message* msg = ArrayList_OfMessages_get(snodeState, i);
  65. if (sinceTime && sinceTime <= timestampFromMsg(msg)) { return NULL; }
  66. struct Announce_Peer* p = peerFromMsg(msg, ip);
  67. if (p) { return p; }
  68. }
  69. return NULL;
  70. }
  71. // Calculate the sha512 of a message list where a given set of signed messages will corrispond
  72. // to a given hash.
  73. static void hashMsgList(struct ArrayList_OfMessages* msgList, uint8_t out[64])
  74. {
  75. uint8_t hash[64] = {0};
  76. for (int i = 0; i < msgList->length; i++) {
  77. struct Message* msg = ArrayList_OfMessages_get(msgList, i);
  78. Message_push(msg, hash, 64, NULL);
  79. crypto_hash_sha512(hash, msg->bytes, msg->length);
  80. Message_pop(msg, NULL, 64, NULL);
  81. }
  82. Bits_memcpy(out, hash, 64);
  83. }
  84. static int64_t estimateClockSkew(int64_t sentTime, int64_t snodeRecvTime, int64_t now)
  85. {
  86. // We estimate that the snode received our message at time: 1/2 the RTT
  87. int64_t halfRtt = sentTime + ((now - sentTime) / 2);
  88. return halfRtt - snodeRecvTime;
  89. }
  90. // We'll try to halve our estimated clock skew each RTT so on average it should eventually
  91. // target in on the exact skew. Ideal would be to use a rolling average such that one
  92. // screwy RTT has little effect but that's more work.
  93. static int64_t estimateImprovedClockSkew(int64_t sentTime,
  94. int64_t snodeRecvTime,
  95. int64_t now,
  96. int64_t lastSkew)
  97. {
  98. int64_t thisSkew = estimateClockSkew(sentTime, snodeRecvTime, now);
  99. int64_t skewDiff = thisSkew - lastSkew;
  100. return lastSkew + (skewDiff / 2);
  101. }
  102. // -- Context -- //
  103. // Depending on what news we have learned, we will adopt one of a set of possible states
  104. // whcih inform how often we contact our supernode. The numeric representation of the
  105. // state corrisponds to the number of milliseconds between messages to be sent to our
  106. // supernode.
  107. enum ReachabilityAnnouncer_State
  108. {
  109. // The message we build up from our local state is full, we obviously need to send it
  110. // asap in order that we can finish informing the snode of our peers.
  111. ReachabilityAnnouncer_State_MSGFULL = 500,
  112. // In this state we know how to reach the snode but we have no announced reachability
  113. // (so we are effectively offline) we have to announce quickly in order to be online.
  114. ReachabilityAnnouncer_State_FIRSTPEER = 1000,
  115. // We have just dropped a peer, we should announce quickly in order to help the snode
  116. // know that our link is dead.
  117. ReachabilityAnnouncer_State_PEERGONE = 6000,
  118. // We have picked up a new peer, we should announce moderately fast in order to make
  119. // sure that the snode picks the best path out of the possible options.
  120. ReachabilityAnnouncer_State_NEWPEER = 12000,
  121. // No new peers or dropped peers, we'll just send announcements at a low interval in
  122. // order to keep our snode up to date on latencies and drop percentages of different
  123. // links.
  124. ReachabilityAnnouncer_State_NORMAL = 60000
  125. };
  126. struct ReachabilityAnnouncer_pvt
  127. {
  128. struct ReachabilityAnnouncer pub;
  129. struct Timeout* announceCycle;
  130. struct Allocator* alloc;
  131. struct Log* log;
  132. struct EventBase* base;
  133. struct MsgCore* msgCore;
  134. struct Random* rand;
  135. struct SupernodeHunter* snh;
  136. struct EncodingScheme* myScheme;
  137. String* encodingSchemeStr;
  138. uint8_t signingKeypair[64];
  139. uint8_t pubSigningKey[32];
  140. struct ArrayList_OfPeers* localState;
  141. int64_t timeOfLastReply;
  142. // The cjdns clock is monotonic and is calibrated once on launch so clockSkew
  143. // will be reliable even if the machine also has NTP and NTP also changes the clock
  144. // clockSkew is literally the number of milliseconds which we believe our clock is ahead of
  145. // our supernode's clock.
  146. int64_t clockSkew;
  147. struct Address snode;
  148. // This is effectively a log which means we add messages to it as time goes but we remove
  149. // messages which are more than AGREED_TIMEOUT_MS (20 minutes) older than the most recent
  150. // message in the list (the one at the highest index). We also identify messages in the list
  151. // which update only peers that have been updated again since and we remove those as well.
  152. // IMPORTANT: The removal of messages from this list is using the same algorithm that is used
  153. // on the supernode and if it changes then they will desync and go into a reset
  154. // loop.
  155. struct ArrayList_OfMessages* snodeState;
  156. // This message has a head pad of size Announce_Header_SIZE but that pad is garbage
  157. // the point of the pad is so that it will work correctly with peerFromMsg()
  158. struct Message* nextMsg;
  159. struct Message* msgOnWire;
  160. // this is by our clock, not skewed to the snode time.
  161. int64_t msgOnWireSentTime;
  162. // If true then when we send nextMsg, it will be a state reset of the node.
  163. bool resetState;
  164. enum ReachabilityAnnouncer_State state;
  165. Identity
  166. };
  167. // -- "Methods" -- //
  168. static int64_t ourTime(struct ReachabilityAnnouncer_pvt* rap)
  169. {
  170. uint64_t now = Time_currentTimeMilliseconds(rap->base);
  171. Assert_true(!(now >> 63));
  172. return (int64_t) now;
  173. }
  174. static int64_t snTime(struct ReachabilityAnnouncer_pvt* rap)
  175. {
  176. return ourTime(rap) - rap->clockSkew;
  177. }
  178. // Insert or update the state information for a peer in a msgList
  179. #define updatePeer_NOOP 0
  180. #define updatePeer_ADD 1
  181. #define updatePeer_UPDATE 2
  182. #define updatePeer_ENOSPACE -1
  183. static int updatePeer(struct ReachabilityAnnouncer_pvt* rap,
  184. struct Announce_Peer* refPeer,
  185. int64_t sinceTime)
  186. {
  187. if (Defined(Log_DEBUG)) {
  188. uint8_t peerIpPrinted[40];
  189. AddrTools_printIp(peerIpPrinted, refPeer->ipv6);
  190. Log_debug(rap->log, "updatePeer [%s]", peerIpPrinted);
  191. }
  192. struct Announce_Peer* peer = peerFromMsg(rap->nextMsg, refPeer->ipv6);
  193. if (!peer) {
  194. // not in nextMsg
  195. } else if (Bits_memcmp(peer, refPeer, Announce_Peer_SIZE)) {
  196. // Already exists in the nextMsg but is different, update it.
  197. Bits_memcpy(peer, refPeer, Announce_Peer_SIZE);
  198. return updatePeer_UPDATE;
  199. } else {
  200. return updatePeer_NOOP;
  201. }
  202. peer = peerFromMsg(rap->msgOnWire, refPeer->ipv6);
  203. if (!peer) {
  204. peer = peerFromSnodeState(rap->snodeState, refPeer->ipv6, sinceTime);
  205. if (peer && !Bits_memcmp(peer, refPeer, Announce_Peer_SIZE)) {
  206. Log_debug(rap->log, "Peer found in snodeState, noop");
  207. return updatePeer_NOOP;
  208. } else if (peer) {
  209. Log_debug(rap->log, "Peer found in snodeState but needs update");
  210. } else {
  211. Log_debug(rap->log, "Peer not found in snodeState");
  212. }
  213. } else if (!Bits_memcmp(peer, refPeer, Announce_Peer_SIZE)) {
  214. Log_debug(rap->log, "Peer found in msgOnWire, noop");
  215. return updatePeer_NOOP;
  216. } else {
  217. Log_debug(rap->log, "Peer found in msgOnWire but needs update");
  218. }
  219. if (rap->nextMsg->length > 700) {
  220. Log_debug(rap->log, "nextMsg is too big to [%s] peer",
  221. peer ? "UPDATE" : "INSERT");
  222. return updatePeer_ENOSPACE;
  223. }
  224. Message_pop(rap->nextMsg, NULL, Announce_Header_SIZE, NULL);
  225. Message_push(rap->nextMsg, refPeer, Announce_Peer_SIZE, NULL);
  226. Message_push(rap->nextMsg, NULL, Announce_Header_SIZE, NULL);
  227. return (peer) ? updatePeer_UPDATE : updatePeer_ADD;
  228. }
  229. static void stateUpdate(struct ReachabilityAnnouncer_pvt* rap, enum ReachabilityAnnouncer_State st)
  230. {
  231. if (rap->state < st) { return; }
  232. rap->state = st;
  233. }
  234. static void removeLocalStatePeer(struct ReachabilityAnnouncer_pvt* rap, int i)
  235. {
  236. struct Announce_Peer* peer = ArrayList_OfPeers_remove(rap->localState, i);
  237. Allocator_realloc(rap->alloc, peer, 0);
  238. }
  239. static struct Announce_Peer* addLocalStatePeer(struct ReachabilityAnnouncer_pvt* rap,
  240. struct Announce_Peer* p)
  241. {
  242. struct Announce_Peer* peer = Allocator_clone(rap->alloc, p);
  243. ArrayList_OfPeers_add(rap->localState, peer);
  244. Log_debug(rap->log, "addLocalStatePeer() now [%u] peers", rap->localState->length);
  245. return peer;
  246. }
  247. static void loadAllState(struct ReachabilityAnnouncer_pvt* rap, bool assertNoChange)
  248. {
  249. Log_debug(rap->log, "loadAllState() [%u] peers", rap->localState->length);
  250. for (int i = rap->localState->length - 1; i >= 0; i--) {
  251. struct Announce_Peer* peer = ArrayList_OfPeers_get(rap->localState, i);
  252. int ret = updatePeer(rap, peer, 0);
  253. Assert_true(!assertNoChange || !ret);
  254. if (updatePeer_ENOSPACE == ret) {
  255. stateUpdate(rap, ReachabilityAnnouncer_State_MSGFULL);
  256. return;
  257. }
  258. }
  259. }
  260. static void setupNextMsg(struct ReachabilityAnnouncer_pvt* rap)
  261. {
  262. struct Allocator* msgAlloc = Allocator_child(rap->alloc);
  263. struct Message* nextMsg = Message_new(0, 1024, msgAlloc);
  264. Message_push(nextMsg, NULL, Announce_Header_SIZE, NULL);
  265. rap->nextMsg = nextMsg;
  266. }
  267. static void stateReset(struct ReachabilityAnnouncer_pvt* rap)
  268. {
  269. for (int i = rap->snodeState->length - 1; i >= 0; i--) {
  270. struct Message* msg = ArrayList_OfMessages_remove(rap->snodeState, i);
  271. Allocator_free(msg->alloc);
  272. }
  273. if (rap->nextMsg) {
  274. struct Message* nm = rap->nextMsg;
  275. setupNextMsg(rap);
  276. Assert_true(nm != rap->nextMsg);
  277. Allocator_free(nm->alloc);
  278. }
  279. if (rap->msgOnWire) {
  280. Allocator_free(rap->msgOnWire->alloc);
  281. rap->msgOnWire = NULL;
  282. }
  283. for (int i = rap->localState->length - 1; i >= 0; i--) {
  284. struct Announce_Peer* peer = ArrayList_OfPeers_get(rap->localState, i);
  285. if (!peer->label_be) { removeLocalStatePeer(rap, i); }
  286. }
  287. // we must force the state to FIRSTPEER
  288. //stateUpdate(rap, ReachabilityAnnouncer_State_FIRSTPEER);
  289. rap->state = ReachabilityAnnouncer_State_FIRSTPEER;
  290. loadAllState(rap, false);
  291. rap->resetState = true;
  292. }
  293. static void addServerStateMsg(struct ReachabilityAnnouncer_pvt* rap, struct Message* msg)
  294. {
  295. Assert_true(msg->length >= Announce_Header_SIZE);
  296. int64_t mostRecentTime = timestampFromMsg(msg);
  297. int64_t sinceTime = mostRecentTime - AGREED_TIMEOUT_MS;
  298. ArrayList_OfMessages_add(rap->snodeState, msg);
  299. // Filter completely redundant messages and messages older than sinceTime
  300. struct Allocator* tempAlloc = Allocator_child(rap->alloc);
  301. struct ArrayList_OfPeers* peerList = ArrayList_OfPeers_new(tempAlloc);
  302. for (int i = rap->snodeState->length - 1; i >= 0; i--) {
  303. bool redundant = true;
  304. struct Message* m = ArrayList_OfMessages_get(rap->snodeState, i);
  305. struct Announce_Peer* p;
  306. for (p = Announce_Peer_next(m, NULL); p; p = Announce_Peer_next(m, p)) {
  307. bool inList = false;
  308. for (int j = 0; j < peerList->length; j++) {
  309. struct Announce_Peer* knownPeer = ArrayList_OfPeers_get(peerList, j);
  310. if (!Bits_memcmp(knownPeer->ipv6, p->ipv6, 16)) {
  311. inList = true;
  312. break;
  313. }
  314. }
  315. if (!inList) {
  316. ArrayList_OfPeers_add(peerList, p);
  317. redundant = false;
  318. break;
  319. }
  320. }
  321. if (redundant) {
  322. ArrayList_OfMessages_remove(rap->snodeState, i);
  323. Allocator_free(m->alloc);
  324. // TODO(cjd): else if the peer is dropped...
  325. } else {
  326. // We should be adding back all of the peers necessary to make redundant anything older
  327. // than the most recent message, make sure that is the case.
  328. Assert_true(timestampFromMsg(m) >= sinceTime);
  329. }
  330. }
  331. Allocator_free(tempAlloc);
  332. }
  333. // -- Public -- //
  334. void ReachabilityAnnouncer_updatePeer(struct ReachabilityAnnouncer* ra,
  335. uint8_t ipv6[16],
  336. uint64_t pathThemToUs,
  337. uint64_t pathUsToThem,
  338. uint32_t mtu,
  339. uint16_t drops,
  340. uint16_t latency,
  341. uint16_t penalty)
  342. {
  343. struct ReachabilityAnnouncer_pvt* rap = Identity_check((struct ReachabilityAnnouncer_pvt*) ra);
  344. uint8_t ipPrinted[40];
  345. AddrTools_printIp(ipPrinted, ipv6);
  346. Log_debug(rap->log, "Update peer [%s] [%08llx]", ipPrinted, (long long) pathThemToUs);
  347. if (pathThemToUs > 0xffffffff) {
  348. Log_warn(rap->log, "oversize path for [%08llx]", (long long) pathThemToUs);
  349. return;
  350. }
  351. struct Announce_Peer refPeer;
  352. Announce_Peer_init(&refPeer);
  353. refPeer.label_be = Endian_hostToBigEndian32(pathThemToUs);
  354. refPeer.mtu8_be = Endian_hostToBigEndian16((mtu / 8));
  355. refPeer.drops_be = Endian_hostToBigEndian16(drops);
  356. refPeer.latency_be = Endian_hostToBigEndian16(latency);
  357. refPeer.penalty_be = Endian_hostToBigEndian16(penalty);
  358. refPeer.encodingFormNum = EncodingScheme_getFormNum(rap->myScheme, pathUsToThem);
  359. Bits_memcpy(refPeer.ipv6, ipv6, 16);
  360. struct Announce_Peer* peer = NULL;
  361. bool unreachable = true;
  362. for (int i = 0; i < rap->localState->length; i++) {
  363. peer = ArrayList_OfPeers_get(rap->localState, i);
  364. if (peer->label_be) { unreachable = false; }
  365. if (Bits_memcmp(refPeer.ipv6, peer->ipv6, 16)) {
  366. peer = NULL;
  367. continue;
  368. }
  369. if (!Bits_memcmp(&refPeer, peer, Announce_Peer_SIZE)) {
  370. Log_debug(rap->log, "Update peer [%s] peer exists and needs no update", ipPrinted);
  371. return;
  372. }
  373. Bits_memcpy(peer, &refPeer, Announce_Peer_SIZE);
  374. break;
  375. }
  376. if (!peer) {
  377. if (!pathThemToUs) {
  378. Log_debug(rap->log, "[%s] didnt exist before and is now unreachable", ipPrinted);
  379. return;
  380. }
  381. peer = addLocalStatePeer(rap, &refPeer);
  382. }
  383. switch (updatePeer(rap, &refPeer, 0)) {
  384. case updatePeer_NOOP: {
  385. Log_debug(rap->log, "noop");
  386. return;
  387. }
  388. case updatePeer_UPDATE: {
  389. if (drops == 0xffff) {
  390. Log_debug(rap->log, "update (peergone)");
  391. stateUpdate(rap, ReachabilityAnnouncer_State_PEERGONE);
  392. } else {
  393. Log_debug(rap->log, "update");
  394. }
  395. return;
  396. }
  397. case updatePeer_ADD: {
  398. if (unreachable) {
  399. Log_debug(rap->log, "first peer");
  400. stateUpdate(rap, ReachabilityAnnouncer_State_FIRSTPEER);
  401. } else {
  402. Log_debug(rap->log, "new peer");
  403. stateUpdate(rap, ReachabilityAnnouncer_State_NEWPEER);
  404. }
  405. return;
  406. }
  407. case updatePeer_ENOSPACE: {
  408. Log_debug(rap->log, "msgfull");
  409. stateUpdate(rap, ReachabilityAnnouncer_State_MSGFULL);
  410. return;
  411. }
  412. default: break;
  413. }
  414. Assert_failure("wut");
  415. }
  416. // -- Event Callbacks -- //
  417. static void onReplyTimeout(struct ReachabilityAnnouncer_pvt* rap, struct Address* snodeAddr)
  418. {
  419. // time out -> re-integrate the content of the message onWire into unsent
  420. struct Message* mow = rap->msgOnWire;
  421. rap->msgOnWire = NULL;
  422. struct Announce_Peer* p;
  423. for (p = Announce_Peer_next(mow, NULL); p; p = Announce_Peer_next(mow, p)) {
  424. struct Announce_Peer* lPeer = peerFromLocalState(rap->localState, p->ipv6);
  425. if (!lPeer) { continue; }
  426. int ret = updatePeer(rap, lPeer, 0);
  427. if (updatePeer_ENOSPACE == ret) {
  428. stateUpdate(rap, ReachabilityAnnouncer_State_MSGFULL);
  429. break;
  430. }
  431. }
  432. Allocator_free(mow->alloc);
  433. if (!Bits_memcmp(snodeAddr, &rap->snode, Address_SIZE)) {
  434. rap->snh->snodeIsReachable = false;
  435. if (rap->snh->onSnodeUnreachable) {
  436. rap->snh->onSnodeUnreachable(rap->snh, 0, 0);
  437. }
  438. }
  439. }
  440. struct Query {
  441. struct Address target;
  442. struct ReachabilityAnnouncer_pvt* rap;
  443. };
  444. static void onReply(Dict* msg, struct Address* src, struct MsgCore_Promise* prom)
  445. {
  446. struct Query* q = (struct Query*) prom->userData;
  447. struct ReachabilityAnnouncer_pvt* rap = Identity_check(q->rap);
  448. if (!rap->msgOnWire) {
  449. Log_debug(rap->log,"local reset but not send the peers out");
  450. Log_warn(rap->log,"Drop the snode response before ann cycle deal the reset");
  451. return;
  452. }
  453. if (!src) {
  454. onReplyTimeout(rap, &q->target);
  455. return;
  456. }
  457. int64_t* snodeRecvTime = Dict_getIntC(msg, "recvTime");
  458. if (!snodeRecvTime) {
  459. Log_warn(rap->log, "snode did not send back recvTime");
  460. onReplyTimeout(rap, &q->target);
  461. return;
  462. }
  463. int64_t sentTime = rap->msgOnWireSentTime;
  464. addServerStateMsg(rap, rap->msgOnWire);
  465. rap->msgOnWire = NULL;
  466. rap->resetState = false;
  467. int64_t now = rap->timeOfLastReply = ourTime(rap);
  468. int64_t oldClockSkew = rap->clockSkew;
  469. Log_debug(rap->log, "sentTime [%lld]", (long long int) sentTime);
  470. Log_debug(rap->log, "snodeRecvTime [%lld]", (long long int) *snodeRecvTime);
  471. Log_debug(rap->log, "now [%lld]", (long long int) now);
  472. Log_debug(rap->log, "oldClockSkew [%lld]", (long long int) oldClockSkew);
  473. rap->clockSkew = estimateImprovedClockSkew(sentTime, *snodeRecvTime, now, oldClockSkew);
  474. Log_debug(rap->log, "Adjusting clock skew by [%lld]",
  475. (long long int) (rap->clockSkew - oldClockSkew));
  476. // We reset the state to NORMAL unless the synchronization of state took more space than
  477. // the last message could hold, however if the state was MSGFULL but then another message
  478. // was sent and now all state is synced (nothing new to send), we set to NORMAL.
  479. // TODO(cjd): This implies a risk of oscillation wherein there is always a tiny bit of
  480. // additional state keeps being added (bouncing link?)
  481. if (ReachabilityAnnouncer_State_MSGFULL != rap->state ||
  482. Announce_Header_SIZE == rap->nextMsg->length)
  483. {
  484. rap->state = ReachabilityAnnouncer_State_NORMAL;
  485. }
  486. String* snodeStateHash = Dict_getStringC(msg, "stateHash");
  487. uint8_t ourStateHash[64];
  488. hashMsgList(rap->snodeState, ourStateHash);
  489. if (!snodeStateHash) {
  490. Log_warn(rap->log, "no stateHash in reply from snode");
  491. } else if (snodeStateHash->len != 64) {
  492. Log_warn(rap->log, "bad stateHash in reply from snode");
  493. } else if (Bits_memcmp(snodeStateHash->bytes, ourStateHash, 64)) {
  494. Log_warn(rap->log, "state mismatch with snode, [%u] announces", rap->snodeState->length);
  495. } else {
  496. return;
  497. }
  498. Log_warn(rap->log, "desynchronized with snode, resetting state");
  499. stateReset(rap);
  500. }
  501. static void onAnnounceCycle(void* vRap)
  502. {
  503. struct ReachabilityAnnouncer_pvt* rap =
  504. Identity_check((struct ReachabilityAnnouncer_pvt*) vRap);
  505. // Message out on the wire...
  506. if (rap->msgOnWire) { return; }
  507. if (!rap->snode.path) { return; }
  508. int64_t now = ourTime(rap);
  509. int64_t snNow = snTime(rap);
  510. // Not time to send yet?
  511. if (now < rap->timeOfLastReply + rap->state) { return; }
  512. struct Message* msg = rap->msgOnWire = rap->nextMsg;
  513. rap->msgOnWireSentTime = now;
  514. // re-announce any peer which is older than AGREED_TIMEOUT_MS
  515. int64_t sinceTime = snNow - AGREED_TIMEOUT_MS;
  516. for (int i = 0; i < rap->snodeState->length; i++) {
  517. struct Message* snm = ArrayList_OfMessages_get(rap->snodeState, i);
  518. int64_t msgTime = timestampFromMsg(snm);
  519. if (msgTime < sinceTime) { break; }
  520. struct Announce_Peer* p;
  521. int ret = updatePeer_NOOP;
  522. for (p = Announce_Peer_next(msg, NULL); p; p = Announce_Peer_next(msg, p)) {
  523. struct Announce_Peer* lPeer = peerFromLocalState(rap->localState, p->ipv6);
  524. if (!lPeer) { continue; }
  525. ret = updatePeer(rap, lPeer, sinceTime);
  526. if (updatePeer_ENOSPACE == ret) {
  527. stateUpdate(rap, ReachabilityAnnouncer_State_MSGFULL);
  528. break;
  529. }
  530. }
  531. if (updatePeer_ENOSPACE == ret) {
  532. break;
  533. }
  534. }
  535. setupNextMsg(rap);
  536. if (ReachabilityAnnouncer_State_MSGFULL == rap->state) {
  537. // there was lost state, load everything we can into the next message...
  538. loadAllState(rap, false);
  539. } else if (Defined(PARANOIA)) {
  540. // Purely a test, this will blow up if anything is changed by loading all peers in.
  541. loadAllState(rap, true);
  542. }
  543. if (rap->resetState) {
  544. Message_pop(msg, NULL, Announce_Header_SIZE, NULL);
  545. Announce_EncodingScheme_push(msg, rap->encodingSchemeStr);
  546. struct Announce_Version version;
  547. Announce_Version_init(&version);
  548. Message_push(msg, &version, Announce_Version_SIZE, NULL);
  549. Message_push(msg, NULL, Announce_Header_SIZE, NULL);
  550. }
  551. struct Announce_Header* hdr = (struct Announce_Header*) msg->bytes;
  552. Bits_memset(hdr, 0, Announce_Header_SIZE);
  553. Announce_Header_setVersion(hdr, Announce_Header_CURRENT_VERSION);
  554. Announce_Header_setReset(hdr, rap->resetState);
  555. Assert_true(Announce_Header_isReset(hdr) == rap->resetState);
  556. Announce_Header_setTimestamp(hdr, snNow);
  557. Bits_memcpy(hdr->pubSigningKey, rap->pubSigningKey, 32);
  558. Bits_memcpy(hdr->snodeIp, rap->snode.ip6.bytes, 16);
  559. Message_pop(msg, NULL, 64, NULL);
  560. Sign_signMsg(rap->signingKeypair, msg, rap->rand);
  561. struct MsgCore_Promise* qp = MsgCore_createQuery(rap->msgCore, 0, rap->alloc);
  562. Dict* dict = qp->msg = Dict_new(qp->alloc);
  563. qp->cb = onReply;
  564. struct Query* q = Allocator_calloc(qp->alloc, sizeof(struct Query), 1);
  565. q->rap = rap;
  566. Assert_true(AddressCalc_validAddress(rap->snode.ip6.bytes));
  567. Bits_memcpy(&q->target, &rap->snode, Address_SIZE);
  568. qp->userData = q;
  569. qp->target = &q->target;
  570. Dict_putStringCC(dict, "sq", "ann", qp->alloc);
  571. String* annString = String_newBinary(msg->bytes, msg->length, qp->alloc);
  572. Dict_putStringC(dict, "ann", annString, qp->alloc);
  573. }
  574. static void onSnodeChange(struct SupernodeHunter* sh,
  575. int64_t sendTime,
  576. int64_t snodeRecvTime)
  577. {
  578. struct ReachabilityAnnouncer_pvt* rap =
  579. Identity_check((struct ReachabilityAnnouncer_pvt*) sh->userData);
  580. int64_t clockSkew = estimateClockSkew(sendTime, snodeRecvTime, ourTime(rap));
  581. uint64_t clockSkewDiff = (clockSkew - rap->clockSkew) & ~(((uint64_t)1)<<63);
  582. // If the node is the same and the clock skew difference is less than 10 seconds,
  583. // just change path and continue.
  584. if (!Bits_memcmp(rap->snode.key, sh->snodeAddr.key, 32) && clockSkewDiff < 5000) {
  585. Log_debug(rap->log, "Change Supernode (path only)");
  586. Bits_memcpy(&rap->snode, &sh->snodeAddr, Address_SIZE);
  587. return;
  588. }
  589. Log_debug(rap->log, "Change Supernode");
  590. Bits_memcpy(&rap->snode, &sh->snodeAddr, Address_SIZE);
  591. rap->clockSkew = estimateClockSkew(sendTime, snodeRecvTime, ourTime(rap));
  592. stateReset(rap);
  593. }
  594. struct ReachabilityAnnouncer* ReachabilityAnnouncer_new(struct Allocator* allocator,
  595. struct Log* log,
  596. struct EventBase* base,
  597. struct Random* rand,
  598. struct MsgCore* msgCore,
  599. struct SupernodeHunter* snh,
  600. uint8_t* privateKey,
  601. struct EncodingScheme* myScheme)
  602. {
  603. struct Allocator* alloc = Allocator_child(allocator);
  604. struct ReachabilityAnnouncer_pvt* rap =
  605. Allocator_calloc(alloc, sizeof(struct ReachabilityAnnouncer_pvt), 1);
  606. Identity_set(rap);
  607. rap->alloc = alloc;
  608. rap->log = log;
  609. rap->base = base;
  610. rap->msgCore = msgCore;
  611. rap->announceCycle = Timeout_setInterval(onAnnounceCycle, rap, 1000, base, alloc);
  612. rap->rand = rand;
  613. rap->snodeState = ArrayList_OfMessages_new(alloc);
  614. rap->localState = ArrayList_OfPeers_new(alloc);
  615. rap->myScheme = myScheme;
  616. rap->encodingSchemeStr = EncodingScheme_serialize(myScheme, alloc);
  617. rap->snh = snh;
  618. snh->onSnodeChange = onSnodeChange;
  619. snh->userData = rap;
  620. setupNextMsg(rap);
  621. Sign_signingKeyPairFromCurve25519(rap->signingKeypair, privateKey);
  622. Sign_publicKeyFromKeyPair(rap->pubSigningKey, rap->signingKeypair);
  623. return &rap->pub;
  624. }