ReachabilityCollector.c 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. */
  15. #include "crypto/AddressCalc.h"
  16. #include "dht/dhtcore/ReplySerializer.h"
  17. #include "subnode/ReachabilityCollector.h"
  18. #include "util/log/Log.h"
  19. #include "util/Identity.h"
  20. #include "util/events/Timeout.h"
  21. #include "util/AddrTools.h"
  22. #define TIMEOUT_MILLISECONDS 10000
  23. struct PeerInfo_pvt
  24. {
  25. struct ReachabilityCollector_PeerInfo pub;
  26. // Next path to check when sending getPeers requests to our peer looking for ourselves.
  27. uint64_t pathToCheck;
  28. // This peer is waiting for response
  29. bool waitForResponse;
  30. struct Allocator* alloc;
  31. Identity
  32. };
  33. #define ArrayList_NAME OfPeerInfo_pvt
  34. #define ArrayList_TYPE struct PeerInfo_pvt
  35. #include "util/ArrayList.h"
  36. struct ReachabilityCollector_pvt
  37. {
  38. struct ReachabilityCollector pub;
  39. struct MsgCore* msgCore;
  40. struct Allocator* alloc;
  41. struct Log* log;
  42. struct BoilerplateResponder* br;
  43. struct Address* myAddr;
  44. struct ArrayList_OfPeerInfo_pvt* piList;
  45. Identity
  46. };
  47. static void mkNextRequest(struct ReachabilityCollector_pvt* rcp);
  48. static void change0(struct ReachabilityCollector_pvt* rcp,
  49. struct Address* nodeAddr,
  50. struct Allocator* tempAlloc)
  51. {
  52. for (int i = 0; i < rcp->piList->length; i++) {
  53. struct PeerInfo_pvt* pi = ArrayList_OfPeerInfo_pvt_get(rcp->piList, i);
  54. if (Address_isSameIp(nodeAddr, &pi->pub.addr)) {
  55. if (nodeAddr->path == 0) {
  56. Log_debug(rcp->log, "Peer [%s] dropped",
  57. Address_toString(&pi->pub.addr, tempAlloc)->bytes);
  58. ArrayList_OfPeerInfo_pvt_remove(rcp->piList, i);
  59. Allocator_free(pi->alloc);
  60. } else if (nodeAddr->path != pi->pub.addr.path) {
  61. Log_debug(rcp->log, "Peer [%s] changed path",
  62. Address_toString(&pi->pub.addr, tempAlloc)->bytes);
  63. pi->pub.pathThemToUs = -1;
  64. pi->pathToCheck = 1;
  65. pi->pub.querying = true;
  66. pi->pub.addr.path = nodeAddr->path;
  67. }
  68. if (rcp->pub.onChange) {
  69. rcp->pub.onChange(&rcp->pub, nodeAddr->ip6.bytes, 0, 0, 0, 0xffff, 0xffff, 0xffff);
  70. }
  71. return;
  72. }
  73. }
  74. if (nodeAddr->path == 0) {
  75. Log_debug(rcp->log, "Nonexistant peer [%s] dropped",
  76. Address_toString(nodeAddr, tempAlloc)->bytes);
  77. return;
  78. }
  79. struct Allocator* piAlloc = Allocator_child(rcp->alloc);
  80. struct PeerInfo_pvt* pi = Allocator_calloc(piAlloc, sizeof(struct PeerInfo_pvt), 1);
  81. Identity_set(pi);
  82. Bits_memcpy(&pi->pub.addr, nodeAddr, Address_SIZE);
  83. pi->alloc = piAlloc;
  84. pi->pub.querying = true;
  85. pi->pathToCheck = 1;
  86. pi->pub.pathThemToUs = -1;
  87. pi->waitForResponse = false;
  88. ArrayList_OfPeerInfo_pvt_add(rcp->piList, pi);
  89. Log_debug(rcp->log, "Peer [%s] added", Address_toString(&pi->pub.addr, tempAlloc)->bytes);
  90. mkNextRequest(rcp);
  91. }
  92. void ReachabilityCollector_change(struct ReachabilityCollector* rc, struct Address* nodeAddr)
  93. {
  94. struct ReachabilityCollector_pvt* rcp = Identity_check((struct ReachabilityCollector_pvt*) rc);
  95. struct Allocator* tempAlloc = Allocator_child(rcp->alloc);
  96. change0(rcp, nodeAddr, tempAlloc);
  97. Allocator_free(tempAlloc);
  98. }
  99. struct Query {
  100. struct ReachabilityCollector_pvt* rcp;
  101. String* addr;
  102. uint8_t targetPath[20];
  103. };
  104. static void onReplyTimeout(struct MsgCore_Promise* prom)
  105. {
  106. struct Query* q = (struct Query*) prom->userData;
  107. struct ReachabilityCollector_pvt* rcp =
  108. Identity_check((struct ReachabilityCollector_pvt*) q->rcp);
  109. struct PeerInfo_pvt* pi = NULL;
  110. for (int j = 0; j < rcp->piList->length; j++) {
  111. pi = ArrayList_OfPeerInfo_pvt_get(rcp->piList, j);
  112. if (Address_isSameIp(&pi->pub.addr, prom->target)) {
  113. pi->waitForResponse = false;
  114. return;
  115. }
  116. }
  117. }
  118. static void onReply(Dict* msg, struct Address* src, struct MsgCore_Promise* prom)
  119. {
  120. struct Query* q = (struct Query*) prom->userData;
  121. struct ReachabilityCollector_pvt* rcp =
  122. Identity_check((struct ReachabilityCollector_pvt*) q->rcp);
  123. if (!src) {
  124. onReplyTimeout(prom);
  125. mkNextRequest(rcp);
  126. return;
  127. }
  128. struct PeerInfo_pvt* pi = NULL;
  129. for (int j = 0; j < rcp->piList->length; j++) {
  130. struct PeerInfo_pvt* pi0 = ArrayList_OfPeerInfo_pvt_get(rcp->piList, j);
  131. if (Address_isSameIp(&pi0->pub.addr, src)) {
  132. pi = pi0;
  133. break;
  134. }
  135. }
  136. if (!pi) {
  137. Log_debug(rcp->log, "Got message from peer which is gone from list");
  138. return;
  139. }
  140. pi->waitForResponse = false;
  141. struct Address_List* results = ReplySerializer_parse(src, msg, rcp->log, false, prom->alloc);
  142. uint64_t path = 1;
  143. if (!results) {
  144. Log_debug(rcp->log, "Got invalid getPeers reply from [%s]",
  145. Address_toString(src, prom->alloc)->bytes);
  146. return;
  147. }
  148. for (int i = results->length - 1; i >= 0; i--) {
  149. path = results->elems[i].path;
  150. Log_debug(rcp->log, "getPeers result [%s] [%s][%s]",
  151. Address_toString(&results->elems[i], prom->alloc)->bytes,
  152. q->addr->bytes, q->targetPath);
  153. if (Bits_memcmp(results->elems[i].ip6.bytes, rcp->myAddr->ip6.bytes, 16)) { continue; }
  154. if (pi->pub.pathThemToUs != path) {
  155. Log_debug(rcp->log, "Found back-route for [%s]",
  156. Address_toString(src, prom->alloc)->bytes);
  157. pi->pub.pathThemToUs = path;
  158. if (rcp->pub.onChange) {
  159. rcp->pub.onChange(
  160. &rcp->pub, src->ip6.bytes, path, src->path, 0, 0xffff, 0xffff, 0xffff);
  161. }
  162. }
  163. pi->pub.querying = false;
  164. mkNextRequest(rcp);
  165. return;
  166. }
  167. if (results->length < 8) {
  168. // Peer's gp response does not include my addr, meaning the peer might not know us yet.
  169. // should wait peer sendPing (see InterfaceControl.c).
  170. pi->pathToCheck = 1;
  171. return;
  172. } else {
  173. pi->pathToCheck = path;
  174. }
  175. mkNextRequest(rcp);
  176. }
  177. static void mkNextRequest(struct ReachabilityCollector_pvt* rcp)
  178. {
  179. struct PeerInfo_pvt* pi = NULL;
  180. for (int i = 0; i < rcp->piList->length; i++) {
  181. pi = ArrayList_OfPeerInfo_pvt_get(rcp->piList, i);
  182. if (pi->pub.querying && !pi->waitForResponse) { break; }
  183. }
  184. if (!pi || !pi->pub.querying) {
  185. Log_debug(rcp->log, "All [%u] peers have been queried", rcp->piList->length);
  186. return;
  187. }
  188. if (pi->waitForResponse) {
  189. Log_debug(rcp->log, "Peer is waiting for response.");
  190. return;
  191. }
  192. struct MsgCore_Promise* query =
  193. MsgCore_createQuery(rcp->msgCore, TIMEOUT_MILLISECONDS, rcp->alloc);
  194. struct Query* q = Allocator_calloc(query->alloc, sizeof(struct Query), 1);
  195. q->rcp = rcp;
  196. q->addr = Address_toString(&pi->pub.addr, query->alloc);
  197. query->userData = q;
  198. query->cb = onReply;
  199. Assert_true(AddressCalc_validAddress(pi->pub.addr.ip6.bytes));
  200. query->target = Address_clone(&pi->pub.addr, query->alloc);
  201. Dict* d = query->msg = Dict_new(query->alloc);
  202. Dict_putStringCC(d, "q", "gp", query->alloc);
  203. uint64_t label_be = Endian_hostToBigEndian64(pi->pathToCheck);
  204. uint8_t nearbyLabelBytes[8];
  205. Bits_memcpy(nearbyLabelBytes, &label_be, 8);
  206. AddrTools_printPath(q->targetPath, pi->pathToCheck);
  207. Log_debug(rcp->log, "Getting peers for peer [%s] tar [%s]", q->addr->bytes, q->targetPath);
  208. Dict_putStringC(d, "tar",
  209. String_newBinary(nearbyLabelBytes, 8, query->alloc), query->alloc);
  210. BoilerplateResponder_addBoilerplate(rcp->br, d, &pi->pub.addr, query->alloc);
  211. pi->waitForResponse = true;
  212. }
  213. static void cycle(void* vrc)
  214. {
  215. struct ReachabilityCollector_pvt* rcp = Identity_check((struct ReachabilityCollector_pvt*) vrc);
  216. mkNextRequest(rcp);
  217. }
  218. struct ReachabilityCollector_PeerInfo*
  219. ReachabilityCollector_getPeerInfo(struct ReachabilityCollector* rc, int peerNum)
  220. {
  221. struct ReachabilityCollector_pvt* rcp = Identity_check((struct ReachabilityCollector_pvt*) rc);
  222. struct PeerInfo_pvt* pi = ArrayList_OfPeerInfo_pvt_get(rcp->piList, peerNum);
  223. return pi ? &pi->pub : NULL;
  224. }
  225. struct ReachabilityCollector* ReachabilityCollector_new(struct Allocator* allocator,
  226. struct MsgCore* msgCore,
  227. struct Log* log,
  228. struct EventBase* base,
  229. struct BoilerplateResponder* br,
  230. struct Address* myAddr)
  231. {
  232. struct Allocator* alloc = Allocator_child(allocator);
  233. struct ReachabilityCollector_pvt* rcp =
  234. Allocator_calloc(alloc, sizeof(struct ReachabilityCollector_pvt), 1);
  235. rcp->myAddr = myAddr;
  236. rcp->msgCore = msgCore;
  237. rcp->alloc = alloc;
  238. rcp->piList = ArrayList_OfPeerInfo_pvt_new(alloc);
  239. rcp->log = log;
  240. rcp->br = br;
  241. Identity_set(rcp);
  242. Timeout_setInterval(cycle, rcp, 2000, base, alloc);
  243. return &rcp->pub;
  244. }