SwitchPinger.c 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470
  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. */
  15. #include "benc/String.h"
  16. #include "crypto/AddressCalc.h"
  17. #include "net/SwitchPinger.h"
  18. #include "dht/Address.h"
  19. #include "util/Bits.h"
  20. #include "util/Checksum.h"
  21. #include "util/Endian.h"
  22. #include "util/Pinger.h"
  23. #include "util/version/Version.h"
  24. #include "util/Identity.h"
  25. #include "wire/RouteHeader.h"
  26. #include "wire/Control.h"
  27. #include "wire/Error.h"
  28. struct SwitchPinger_pvt
  29. {
  30. struct SwitchPinger pub;
  31. struct Pinger* pinger;
  32. struct Admin* admin;
  33. struct Log* logger;
  34. struct Allocator* allocator;
  35. struct Address* myAddr;
  36. /**
  37. * The label is stored here while the message is sent through the pinger
  38. * and it decides which ping the incoming message belongs to.
  39. */
  40. uint64_t incomingLabel;
  41. /** The version of the node which sent the message. */
  42. uint32_t incomingVersion;
  43. uint8_t incomingKey[32];
  44. struct Address incomingSnodeAddr;
  45. uint32_t incomingSnodeKbps;
  46. // If it's an rpath message
  47. uint64_t rpath;
  48. struct Control_LlAddr lladdrMsg;
  49. /** The error code if an error has been returned (see Error.h) */
  50. int error;
  51. /** Pings which are currently waiting for responses. */
  52. int outstandingPings;
  53. /** Maximum number of pings which can be outstanding at one time. */
  54. int maxConcurrentPings;
  55. Identity
  56. };
  57. struct Ping
  58. {
  59. struct SwitchPinger_Ping pub;
  60. uint64_t label;
  61. String* data;
  62. struct SwitchPinger_pvt* context;
  63. SwitchPinger_ResponseCallback onResponse;
  64. void* onResponseContext;
  65. struct Pinger_Ping* pingerPing;
  66. Identity
  67. };
  68. // incoming message from network, pointing to the beginning of the switch header.
  69. static Iface_DEFUN messageFromControlHandler(Message_t* msg, struct Iface* iface)
  70. {
  71. struct SwitchPinger_pvt* ctx = Identity_check((struct SwitchPinger_pvt*) iface);
  72. struct RouteHeader rh;
  73. Er_assert(Message_epop(msg, &rh, RouteHeader_SIZE));
  74. ctx->incomingLabel = Endian_bigEndianToHost64(rh.sh.label_be);
  75. ctx->incomingVersion = 0;
  76. Bits_memset(&ctx->incomingSnodeAddr, 0, sizeof ctx->incomingSnodeAddr);
  77. Bits_memset(ctx->incomingKey, 0, sizeof ctx->incomingKey);
  78. ctx->incomingSnodeKbps = 0;
  79. ctx->rpath = 0;
  80. Bits_memset(&ctx->lladdrMsg, 0, sizeof ctx->lladdrMsg);
  81. struct Control* ctrl = (struct Control*) Message_bytes(msg);
  82. if (ctrl->header.type_be == Control_PONG_be) {
  83. Er_assert(Message_eshift(msg, -Control_Header_SIZE));
  84. ctx->error = Error_NONE;
  85. if (Message_getLength(msg) >= Control_Pong_MIN_SIZE) {
  86. struct Control_Ping* pongHeader = (struct Control_Ping*) Message_bytes(msg);
  87. ctx->incomingVersion = Endian_bigEndianToHost32(pongHeader->version_be);
  88. if (pongHeader->magic != Control_Pong_MAGIC) {
  89. Log_debug(ctx->logger, "dropped invalid switch pong");
  90. return Error(msg, "INVALID");
  91. }
  92. Er_assert(Message_eshift(msg, -Control_Pong_HEADER_SIZE));
  93. } else {
  94. Log_debug(ctx->logger, "got runt pong message, length: [%d]", Message_getLength(msg));
  95. return Error(msg, "RUNT");
  96. }
  97. } else if (ctrl->header.type_be == Control_KEYPONG_be) {
  98. Er_assert(Message_eshift(msg, -Control_Header_SIZE));
  99. ctx->error = Error_NONE;
  100. if (Message_getLength(msg) >= Control_KeyPong_HEADER_SIZE && Message_getLength(msg) <= Control_KeyPong_MAX_SIZE) {
  101. struct Control_KeyPing* pongHeader = (struct Control_KeyPing*) Message_bytes(msg);
  102. ctx->incomingVersion = Endian_bigEndianToHost32(pongHeader->version_be);
  103. if (pongHeader->magic != Control_KeyPong_MAGIC) {
  104. Log_debug(ctx->logger, "dropped invalid switch key-pong");
  105. return Error(msg, "INVALID");
  106. }
  107. Bits_memcpy(ctx->incomingKey, pongHeader->key, 32);
  108. Er_assert(Message_eshift(msg, -Control_KeyPong_HEADER_SIZE));
  109. } else if (Message_getLength(msg) > Control_KeyPong_MAX_SIZE) {
  110. Log_debug(ctx->logger, "got overlong key-pong message, length: [%d]", Message_getLength(msg));
  111. return Error(msg, "INVALID");
  112. } else {
  113. Log_debug(ctx->logger, "got runt key-pong message, length: [%d]", Message_getLength(msg));
  114. return Error(msg, "RUNT");
  115. }
  116. } else if (ctrl->header.type_be == Control_GETSNODE_REPLY_be) {
  117. Er_assert(Message_eshift(msg, -Control_Header_SIZE));
  118. ctx->error = Error_NONE;
  119. if (Message_getLength(msg) < Control_GetSnode_HEADER_SIZE) {
  120. Log_debug(ctx->logger, "got runt GetSnode message, length: [%d]", Message_getLength(msg));
  121. return Error(msg, "RUNT");
  122. }
  123. struct Control_GetSnode* hdr = (struct Control_GetSnode*) Message_bytes(msg);
  124. if (hdr->magic != Control_GETSNODE_REPLY_MAGIC) {
  125. Log_debug(ctx->logger, "dropped invalid GetSnode");
  126. return Error(msg, "INVALID");
  127. }
  128. if (Bits_isZero(hdr->snodeKey, 32)) {
  129. Log_debug(ctx->logger, "Peer doesn't have an snode");
  130. return NULL;
  131. }
  132. if (!AddressCalc_addressForPublicKey(ctx->incomingSnodeAddr.ip6.bytes, hdr->snodeKey)) {
  133. Log_debug(ctx->logger, "dropped invalid GetSnode key");
  134. return Error(msg, "INVALID");
  135. }
  136. ctx->incomingVersion = Endian_hostToBigEndian32(hdr->version_be);
  137. Bits_memcpy(ctx->incomingSnodeAddr.key, hdr->snodeKey, 32);
  138. uint64_t pathToSnode_be;
  139. Bits_memcpy(&pathToSnode_be, hdr->pathToSnode_be, 8);
  140. ctx->incomingSnodeAddr.path = Endian_bigEndianToHost64(pathToSnode_be);
  141. ctx->incomingSnodeAddr.protocolVersion = Endian_bigEndianToHost32(hdr->snodeVersion_be);
  142. ctx->incomingSnodeKbps = Endian_bigEndianToHost32(hdr->kbps_be);
  143. Er_assert(Message_eshift(msg, -Control_GetSnode_HEADER_SIZE));
  144. } else if (ctrl->header.type_be == Control_RPATH_REPLY_be) {
  145. Er_assert(Message_eshift(msg, -Control_Header_SIZE));
  146. ctx->error = Error_NONE;
  147. if (Message_getLength(msg) < Control_RPath_HEADER_SIZE) {
  148. Log_debug(ctx->logger, "got runt RPath message, length: [%d]", Message_getLength(msg));
  149. return Error(msg, "RUNT");
  150. }
  151. struct Control_RPath* hdr = (struct Control_RPath*) Message_bytes(msg);
  152. if (hdr->magic != Control_RPATH_REPLY_MAGIC) {
  153. Log_debug(ctx->logger, "dropped invalid RPATH (bad magic)");
  154. return Error(msg, "INVALID");
  155. }
  156. ctx->incomingVersion = Endian_hostToBigEndian32(hdr->version_be);
  157. uint64_t rpath_be;
  158. Bits_memcpy(&rpath_be, hdr->rpath_be, 8);
  159. ctx->rpath = Endian_bigEndianToHost64(rpath_be);
  160. Er_assert(Message_eshift(msg, -Control_RPath_HEADER_SIZE));
  161. } else if (ctrl->header.type_be == Control_ERROR_be) {
  162. Er_assert(Message_eshift(msg, -Control_Header_SIZE));
  163. Assert_true((uint8_t*)&ctrl->content.error.errorType_be == Message_bytes(msg));
  164. if (Message_getLength(msg) < (Control_Error_HEADER_SIZE + SwitchHeader_SIZE + Control_Header_SIZE)) {
  165. Log_debug(ctx->logger, "runt error packet");
  166. return Error(msg, "RUNT");
  167. }
  168. ctx->error = Er_assert(Message_epop32be(msg));
  169. Er_assert(Message_epush32be(msg, 0));
  170. Er_assert(Message_eshift(msg, -(Control_Error_HEADER_SIZE + SwitchHeader_SIZE)));
  171. struct Control* origCtrl = (struct Control*) Message_bytes(msg);
  172. Log_debug(ctx->logger, "error [%d] was caused by our [%s]",
  173. ctx->error,
  174. Control_typeString(origCtrl->header.type_be));
  175. int shift;
  176. if (origCtrl->header.type_be == Control_PING_be) {
  177. shift = -(Control_Header_SIZE + Control_Ping_HEADER_SIZE);
  178. } else if (origCtrl->header.type_be == Control_KEYPING_be) {
  179. shift = -(Control_Header_SIZE + Control_KeyPing_HEADER_SIZE);
  180. } else {
  181. Assert_failure("problem in Ducttape.c");
  182. }
  183. if (Message_getLength(msg) < -shift) {
  184. Log_debug(ctx->logger, "runt error packet");
  185. }
  186. Er_assert(Message_eshift(msg, shift));
  187. } else if (ctrl->header.type_be == Control_LlAddr_REPLY_be) {
  188. Er_assert(Message_eshift(msg, -Control_Header_SIZE));
  189. ctx->error = Error_NONE;
  190. if (Message_getLength(msg) < Control_LlAddr_HEADER_SIZE) {
  191. Log_debug(ctx->logger, "got runt LlAddr message, length: [%d]", Message_getLength(msg));
  192. return Error(msg, "RUNT");
  193. }
  194. Er_assert(Message_epop(msg, &ctx->lladdrMsg, sizeof ctx->lladdrMsg));
  195. if (ctx->lladdrMsg.magic != Control_LlAddr_REPLY_MAGIC) {
  196. Log_debug(ctx->logger, "dropped invalid LLADDR reply (bad magic)");
  197. return Error(msg, "INVALID");
  198. }
  199. } else {
  200. // If it gets here then Ducttape.c is failing.
  201. Assert_true(false);
  202. }
  203. String* msgStr = &(String) { .bytes = (char*) Message_bytes(msg), .len = Message_getLength(msg) };
  204. Pinger_pongReceived(msgStr, ctx->pinger);
  205. Bits_memset(ctx->incomingKey, 0, 32);
  206. return NULL;
  207. }
  208. static void onPingResponse(String* data, uint32_t milliseconds, void* vping)
  209. {
  210. struct Ping* p = Identity_check((struct Ping*) vping);
  211. enum SwitchPinger_Result err = SwitchPinger_Result_OK;
  212. uint64_t label = p->context->incomingLabel;
  213. if (data) {
  214. if (label != p->label) {
  215. err = SwitchPinger_Result_LABEL_MISMATCH;
  216. } else if ((p->data || data->len > 0) && !String_equals(data, p->data)) {
  217. err = SwitchPinger_Result_WRONG_DATA;
  218. } else if (p->context->error == Error_LOOP_ROUTE) {
  219. err = SwitchPinger_Result_LOOP_ROUTE;
  220. } else if (p->context->error) {
  221. err = SwitchPinger_Result_ERROR_RESPONSE;
  222. }
  223. } else {
  224. err = SwitchPinger_Result_TIMEOUT;
  225. label = p->label;
  226. }
  227. uint32_t version = p->context->incomingVersion;
  228. struct SwitchPinger_Response* resp =
  229. Allocator_calloc(p->pub.pingAlloc, sizeof(struct SwitchPinger_Response), 1);
  230. resp->version = p->context->incomingVersion;
  231. resp->res = err;
  232. resp->label = label;
  233. resp->data = data;
  234. resp->milliseconds = milliseconds;
  235. resp->version = version;
  236. Bits_memcpy(resp->key, p->context->incomingKey, 32);
  237. Bits_memcpy(&resp->snode, &p->context->incomingSnodeAddr, sizeof(struct Address));
  238. resp->kbpsLimit = p->context->incomingSnodeKbps;
  239. resp->rpath = p->context->rpath;
  240. resp->ping = &p->pub;
  241. Bits_memcpy(&resp->lladdr, &p->context->lladdrMsg, sizeof p->context->lladdrMsg);
  242. p->onResponse(resp, p->pub.onResponseContext);
  243. }
  244. static void sendPing(String* data, void* sendPingContext)
  245. {
  246. struct Ping* p = Identity_check((struct Ping*) sendPingContext);
  247. Message_t* msg = Message_new(0, data->len + 512, p->pub.pingAlloc);
  248. while (((uintptr_t)Message_bytes(msg) - data->len) % 4) {
  249. Er_assert(Message_epush8(msg, 0));
  250. }
  251. Er_assert(Message_truncate(msg, 0));
  252. Er_assert(Message_epush(msg, data->bytes, data->len));
  253. Assert_true(!((uintptr_t)Message_bytes(msg) % 4) && "alignment fault");
  254. if (p->pub.type == SwitchPinger_Type_KEYPING) {
  255. Er_assert(Message_epush(msg, NULL, Control_KeyPing_HEADER_SIZE));
  256. struct Control_KeyPing* keyPingHeader = (struct Control_KeyPing*) Message_bytes(msg);
  257. keyPingHeader->magic = Control_KeyPing_MAGIC;
  258. keyPingHeader->version_be = Endian_hostToBigEndian32(Version_CURRENT_PROTOCOL);
  259. Bits_memcpy(keyPingHeader->key, p->context->myAddr->key, 32);
  260. } else if (p->pub.type == SwitchPinger_Type_PING) {
  261. Er_assert(Message_epush(msg, NULL, Control_Ping_HEADER_SIZE));
  262. struct Control_Ping* pingHeader = (struct Control_Ping*) Message_bytes(msg);
  263. pingHeader->magic = Control_Ping_MAGIC;
  264. pingHeader->version_be = Endian_hostToBigEndian32(Version_CURRENT_PROTOCOL);
  265. } else if (p->pub.type == SwitchPinger_Type_GETSNODE) {
  266. Er_assert(Message_epush(msg, NULL, Control_GetSnode_HEADER_SIZE));
  267. struct Control_GetSnode* hdr = (struct Control_GetSnode*) Message_bytes(msg);
  268. hdr->magic = Control_GETSNODE_QUERY_MAGIC;
  269. hdr->version_be = Endian_hostToBigEndian32(Version_CURRENT_PROTOCOL);
  270. hdr->kbps_be = Endian_hostToBigEndian32(p->pub.kbpsLimit);
  271. Bits_memcpy(hdr->snodeKey, p->pub.snode.key, 32);
  272. uint64_t pathToSnode_be = Endian_hostToBigEndian64(p->pub.snode.path);
  273. Bits_memcpy(hdr->pathToSnode_be, &pathToSnode_be, 8);
  274. hdr->snodeVersion_be = Endian_hostToBigEndian32(p->pub.snode.protocolVersion);
  275. } else if (p->pub.type == SwitchPinger_Type_RPATH) {
  276. Er_assert(Message_epush(msg, NULL, Control_RPath_HEADER_SIZE));
  277. struct Control_RPath* hdr = (struct Control_RPath*) Message_bytes(msg);
  278. hdr->magic = Control_RPATH_QUERY_MAGIC;
  279. hdr->version_be = Endian_hostToBigEndian32(Version_CURRENT_PROTOCOL);
  280. uint64_t path_be = Endian_hostToBigEndian64(p->label);
  281. Bits_memcpy(hdr->rpath_be, &path_be, 8);
  282. } else if (p->pub.type == SwitchPinger_Type_LLADDR) {
  283. struct Control_LlAddr addr = {
  284. .magic = Control_LlAddr_QUERY_MAGIC,
  285. .version_be = Endian_hostToBigEndian32(Version_CURRENT_PROTOCOL),
  286. // Lazy
  287. .addr.payload = { .type = 0, .len = 2, }
  288. };
  289. Er_assert(Message_epush(msg, &addr, sizeof addr));
  290. } else {
  291. Assert_failure("unexpected ping type");
  292. }
  293. Er_assert(Message_eshift(msg, Control_Header_SIZE));
  294. struct Control* ctrl = (struct Control*) Message_bytes(msg);
  295. ctrl->header.checksum_be = 0;
  296. if (p->pub.type == SwitchPinger_Type_PING) {
  297. ctrl->header.type_be = Control_PING_be;
  298. } else if (p->pub.type == SwitchPinger_Type_KEYPING) {
  299. ctrl->header.type_be = Control_KEYPING_be;
  300. } else if (p->pub.type == SwitchPinger_Type_GETSNODE) {
  301. ctrl->header.type_be = Control_GETSNODE_QUERY_be;
  302. } else if (p->pub.type == SwitchPinger_Type_RPATH) {
  303. ctrl->header.type_be = Control_RPATH_QUERY_be;
  304. } else if (p->pub.type == SwitchPinger_Type_LLADDR) {
  305. ctrl->header.type_be = Control_LlAddr_QUERY_be;
  306. } else {
  307. Assert_failure("unexpected type");
  308. }
  309. ctrl->header.checksum_be = Checksum_engine_be(Message_bytes(msg), Message_getLength(msg));
  310. struct RouteHeader rh;
  311. Bits_memset(&rh, 0, RouteHeader_SIZE);
  312. rh.flags |= RouteHeader_flags_CTRLMSG;
  313. rh.sh.label_be = Endian_hostToBigEndian64(p->label);
  314. SwitchHeader_setVersion(&rh.sh, SwitchHeader_CURRENT_VERSION);
  315. Er_assert(Message_epush(msg, &rh, RouteHeader_SIZE));
  316. Iface_send(&p->context->pub.controlHandlerIf, msg);
  317. }
  318. static String* RESULT_STRING_OK = String_CONST_SO("pong");
  319. static String* RESULT_STRING_LABEL_MISMATCH = String_CONST_SO("diff_label");
  320. static String* RESULT_STRING_WRONG_DATA = String_CONST_SO("diff_data");
  321. static String* RESULT_STRING_ERROR_RESPONSE = String_CONST_SO("err_switch");
  322. static String* RESULT_STRING_TIMEOUT = String_CONST_SO("timeout");
  323. static String* RESULT_STRING_UNKNOWN = String_CONST_SO("err_unknown");
  324. static String* RESULT_STRING_LOOP = String_CONST_SO("err_loop");
  325. String* SwitchPinger_resultString(enum SwitchPinger_Result result)
  326. {
  327. switch (result) {
  328. case SwitchPinger_Result_OK:
  329. return RESULT_STRING_OK;
  330. case SwitchPinger_Result_LABEL_MISMATCH:
  331. return RESULT_STRING_LABEL_MISMATCH;
  332. case SwitchPinger_Result_WRONG_DATA:
  333. return RESULT_STRING_WRONG_DATA;
  334. case SwitchPinger_Result_ERROR_RESPONSE:
  335. return RESULT_STRING_ERROR_RESPONSE;
  336. case SwitchPinger_Result_TIMEOUT:
  337. return RESULT_STRING_TIMEOUT;
  338. case SwitchPinger_Result_LOOP_ROUTE:
  339. return RESULT_STRING_LOOP;
  340. default:
  341. return RESULT_STRING_UNKNOWN;
  342. };
  343. }
  344. static void onPingFree(struct Allocator_OnFreeJob* job)
  345. {
  346. struct Ping* ping = Identity_check((struct Ping*)job->userData);
  347. struct SwitchPinger_pvt* ctx = Identity_check(ping->context);
  348. ctx->outstandingPings--;
  349. Assert_true(ctx->outstandingPings >= 0);
  350. }
  351. struct SwitchPinger_Ping* SwitchPinger_newPing(uint64_t label,
  352. String* data,
  353. uint32_t timeoutMilliseconds,
  354. SwitchPinger_ResponseCallback onResponse,
  355. struct Allocator* alloc,
  356. struct SwitchPinger* context)
  357. {
  358. struct SwitchPinger_pvt* ctx = Identity_check((struct SwitchPinger_pvt*)context);
  359. if (data && data->len > Control_Ping_MAX_SIZE) {
  360. return NULL;
  361. }
  362. if (ctx->outstandingPings > ctx->maxConcurrentPings) {
  363. Log_debug(ctx->logger, "Skipping switch ping because there are already [%d] outstanding",
  364. ctx->outstandingPings);
  365. return NULL;
  366. }
  367. struct Pinger_Ping* pp =
  368. Pinger_newPing(data, onPingResponse, sendPing, timeoutMilliseconds, alloc, ctx->pinger);
  369. struct Ping* ping = Allocator_clone(pp->pingAlloc, (&(struct Ping) {
  370. .pub = {
  371. .pingAlloc = pp->pingAlloc
  372. },
  373. .label = label,
  374. .data = String_clone(data, pp->pingAlloc),
  375. .context = ctx,
  376. .onResponse = onResponse,
  377. .pingerPing = pp
  378. }));
  379. Identity_set(ping);
  380. Allocator_onFree(pp->pingAlloc, onPingFree, ping);
  381. pp->context = ping;
  382. ctx->outstandingPings++;
  383. return &ping->pub;
  384. }
  385. struct SwitchPinger* SwitchPinger_new(EventBase_t* eventBase,
  386. struct Random* rand,
  387. struct Log* logger,
  388. struct Address* myAddr,
  389. struct Allocator* allocator)
  390. {
  391. struct Allocator* alloc = Allocator_child(allocator);
  392. struct SwitchPinger_pvt* sp = Allocator_malloc(alloc, sizeof(struct SwitchPinger_pvt));
  393. Bits_memcpy(sp, (&(struct SwitchPinger_pvt) {
  394. .pub = {
  395. .controlHandlerIf = {
  396. .send = messageFromControlHandler
  397. }
  398. },
  399. .pinger = Pinger_new(eventBase, rand, logger, alloc),
  400. .logger = logger,
  401. .allocator = alloc,
  402. .myAddr = myAddr,
  403. .maxConcurrentPings = SwitchPinger_DEFAULT_MAX_CONCURRENT_PINGS,
  404. }), sizeof(struct SwitchPinger_pvt));
  405. Identity_set(sp);
  406. return &sp->pub;
  407. }