SwitchPinger.c 18 KB


  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. */
  15. #include "benc/String.h"
  16. #include "crypto/AddressCalc.h"
  17. #include "net/SwitchPinger.h"
  18. #include "dht/Address.h"
  19. #include "util/Bits.h"
  20. #include "util/Checksum.h"
  21. #include "util/Endian.h"
  22. #include "util/Pinger.h"
  23. #include "util/version/Version.h"
  24. #include "util/Identity.h"
  25. #include "wire/RouteHeader.h"
  26. #include "wire/Control.h"
  27. #include "wire/Error.h"
  28. struct SwitchPinger_pvt
  29. {
  30. struct SwitchPinger pub;
  31. struct Pinger* pinger;
  32. struct Admin* admin;
  33. struct Log* logger;
  34. struct Allocator* allocator;
  35. struct Address* myAddr;
  36. /**
  37. * The label is stored here while the message is sent through the pinger
  38. * and it decides which ping the incoming message belongs to.
  39. */
  40. uint64_t incomingLabel;
  41. /** The version of the node which sent the message. */
  42. uint32_t incomingVersion;
  43. uint8_t incomingKey[32];
  44. struct Address incomingSnodeAddr;
  45. uint32_t incomingSnodeKbps;
  46. // If it's an rpath message
  47. uint64_t rpath;
  48. /** The error code if an error has been returned (see Error.h) */
  49. int error;
  50. /** Pings which are currently waiting for responses. */
  51. int outstandingPings;
  52. /** Maximum number of pings which can be outstanding at one time. */
  53. int maxConcurrentPings;
  54. Identity
  55. };
  56. struct Ping
  57. {
  58. struct SwitchPinger_Ping pub;
  59. uint64_t label;
  60. String* data;
  61. struct SwitchPinger_pvt* context;
  62. SwitchPinger_ResponseCallback onResponse;
  63. void* onResponseContext;
  64. struct Pinger_Ping* pingerPing;
  65. Identity
  66. };
  67. // incoming message from network, pointing to the beginning of the switch header.
  68. static Iface_DEFUN messageFromControlHandler(struct Message* msg, struct Iface* iface)
  69. {
  70. struct SwitchPinger_pvt* ctx = Identity_check((struct SwitchPinger_pvt*) iface);
  71. struct RouteHeader rh;
  72. Er_assert(Message_epop(msg, &rh, RouteHeader_SIZE));
  73. ctx->incomingLabel = Endian_bigEndianToHost64(rh.sh.label_be);
  74. ctx->incomingVersion = 0;
  75. Bits_memset(&ctx->incomingSnodeAddr, 0, sizeof ctx->incomingSnodeAddr);
  76. Bits_memset(ctx->incomingKey, 0, sizeof ctx->incomingKey);
  77. ctx->incomingSnodeKbps = 0;
  78. ctx->rpath = 0;
  79. struct Control* ctrl = (struct Control*) msg->msgbytes;
  80. if (ctrl->header.type_be == Control_PONG_be) {
  81. Er_assert(Message_eshift(msg, -Control_Header_SIZE));
  82. ctx->error = Error_NONE;
  83. if (Message_getLength(msg) >= Control_Pong_MIN_SIZE) {
  84. struct Control_Ping* pongHeader = (struct Control_Ping*) msg->msgbytes;
  85. ctx->incomingVersion = Endian_bigEndianToHost32(pongHeader->version_be);
  86. if (pongHeader->magic != Control_Pong_MAGIC) {
  87. Log_debug(ctx->logger, "dropped invalid switch pong");
  88. return Error(msg, "INVALID");
  89. }
  90. Er_assert(Message_eshift(msg, -Control_Pong_HEADER_SIZE));
  91. } else {
  92. Log_debug(ctx->logger, "got runt pong message, length: [%d]", Message_getLength(msg));
  93. return Error(msg, "RUNT");
  94. }
  95. } else if (ctrl->header.type_be == Control_KEYPONG_be) {
  96. Er_assert(Message_eshift(msg, -Control_Header_SIZE));
  97. ctx->error = Error_NONE;
  98. if (Message_getLength(msg) >= Control_KeyPong_HEADER_SIZE && Message_getLength(msg) <= Control_KeyPong_MAX_SIZE) {
  99. struct Control_KeyPing* pongHeader = (struct Control_KeyPing*) msg->msgbytes;
  100. ctx->incomingVersion = Endian_bigEndianToHost32(pongHeader->version_be);
  101. if (pongHeader->magic != Control_KeyPong_MAGIC) {
  102. Log_debug(ctx->logger, "dropped invalid switch key-pong");
  103. return Error(msg, "INVALID");
  104. }
  105. Bits_memcpy(ctx->incomingKey, pongHeader->key, 32);
  106. Er_assert(Message_eshift(msg, -Control_KeyPong_HEADER_SIZE));
  107. } else if (Message_getLength(msg) > Control_KeyPong_MAX_SIZE) {
  108. Log_debug(ctx->logger, "got overlong key-pong message, length: [%d]", Message_getLength(msg));
  109. return Error(msg, "INVALID");
  110. } else {
  111. Log_debug(ctx->logger, "got runt key-pong message, length: [%d]", Message_getLength(msg));
  112. return Error(msg, "RUNT");
  113. }
  114. } else if (ctrl->header.type_be == Control_GETSNODE_REPLY_be) {
  115. Er_assert(Message_eshift(msg, -Control_Header_SIZE));
  116. ctx->error = Error_NONE;
  117. if (Message_getLength(msg) < Control_GetSnode_HEADER_SIZE) {
  118. Log_debug(ctx->logger, "got runt GetSnode message, length: [%d]", Message_getLength(msg));
  119. return Error(msg, "RUNT");
  120. }
  121. struct Control_GetSnode* hdr = (struct Control_GetSnode*) msg->msgbytes;
  122. if (hdr->magic != Control_GETSNODE_REPLY_MAGIC) {
  123. Log_debug(ctx->logger, "dropped invalid GetSnode");
  124. return Error(msg, "INVALID");
  125. }
  126. if (Bits_isZero(hdr->snodeKey, 32)) {
  127. Log_debug(ctx->logger, "Peer doesn't have an snode");
  128. return NULL;
  129. }
  130. if (!AddressCalc_addressForPublicKey(ctx->incomingSnodeAddr.ip6.bytes, hdr->snodeKey)) {
  131. Log_debug(ctx->logger, "dropped invalid GetSnode key");
  132. return Error(msg, "INVALID");
  133. }
  134. ctx->incomingVersion = Endian_hostToBigEndian32(hdr->version_be);
  135. Bits_memcpy(ctx->incomingSnodeAddr.key, hdr->snodeKey, 32);
  136. uint64_t pathToSnode_be;
  137. Bits_memcpy(&pathToSnode_be, hdr->pathToSnode_be, 8);
  138. ctx->incomingSnodeAddr.path = Endian_bigEndianToHost64(pathToSnode_be);
  139. ctx->incomingSnodeAddr.protocolVersion = Endian_bigEndianToHost32(hdr->snodeVersion_be);
  140. ctx->incomingSnodeKbps = Endian_bigEndianToHost32(hdr->kbps_be);
  141. Er_assert(Message_eshift(msg, -Control_GetSnode_HEADER_SIZE));
  142. } else if (ctrl->header.type_be == Control_RPATH_REPLY_be) {
  143. Er_assert(Message_eshift(msg, -Control_Header_SIZE));
  144. ctx->error = Error_NONE;
  145. if (Message_getLength(msg) < Control_RPath_HEADER_SIZE) {
  146. Log_debug(ctx->logger, "got runt RPath message, length: [%d]", Message_getLength(msg));
  147. return Error(msg, "RUNT");
  148. }
  149. struct Control_RPath* hdr = (struct Control_RPath*) msg->msgbytes;
  150. if (hdr->magic != Control_RPATH_REPLY_MAGIC) {
  151. Log_debug(ctx->logger, "dropped invalid RPATH (bad magic)");
  152. return Error(msg, "INVALID");
  153. }
  154. ctx->incomingVersion = Endian_hostToBigEndian32(hdr->version_be);
  155. uint64_t rpath_be;
  156. Bits_memcpy(&rpath_be, hdr->rpath_be, 8);
  157. ctx->rpath = Endian_bigEndianToHost64(rpath_be);
  158. Er_assert(Message_eshift(msg, -Control_RPath_HEADER_SIZE));
  159. } else if (ctrl->header.type_be == Control_ERROR_be) {
  160. Er_assert(Message_eshift(msg, -Control_Header_SIZE));
  161. Assert_true((uint8_t*)&ctrl->content.error.errorType_be == msg->msgbytes);
  162. if (Message_getLength(msg) < (Control_Error_HEADER_SIZE + SwitchHeader_SIZE + Control_Header_SIZE)) {
  163. Log_debug(ctx->logger, "runt error packet");
  164. return Error(msg, "RUNT");
  165. }
  166. ctx->error = Er_assert(Message_epop32be(msg));
  167. Er_assert(Message_epush32be(msg, 0));
  168. Er_assert(Message_eshift(msg, -(Control_Error_HEADER_SIZE + SwitchHeader_SIZE)));
  169. struct Control* origCtrl = (struct Control*) msg->msgbytes;
  170. Log_debug(ctx->logger, "error [%d] was caused by our [%s]",
  171. ctx->error,
  172. Control_typeString(origCtrl->header.type_be));
  173. int shift;
  174. if (origCtrl->header.type_be == Control_PING_be) {
  175. shift = -(Control_Header_SIZE + Control_Ping_HEADER_SIZE);
  176. } else if (origCtrl->header.type_be == Control_KEYPING_be) {
  177. shift = -(Control_Header_SIZE + Control_KeyPing_HEADER_SIZE);
  178. } else {
  179. Assert_failure("problem in Ducttape.c");
  180. }
  181. if (Message_getLength(msg) < -shift) {
  182. Log_debug(ctx->logger, "runt error packet");
  183. }
  184. Er_assert(Message_eshift(msg, shift));
  185. } else {
  186. // If it gets here then Ducttape.c is failing.
  187. Assert_true(false);
  188. }
  189. String* msgStr = &(String) { .bytes = (char*) msg->msgbytes, .len = Message_getLength(msg) };
  190. Pinger_pongReceived(msgStr, ctx->pinger);
  191. Bits_memset(ctx->incomingKey, 0, 32);
  192. return NULL;
  193. }
  194. static void onPingResponse(String* data, uint32_t milliseconds, void* vping)
  195. {
  196. struct Ping* p = Identity_check((struct Ping*) vping);
  197. enum SwitchPinger_Result err = SwitchPinger_Result_OK;
  198. uint64_t label = p->context->incomingLabel;
  199. if (data) {
  200. if (label != p->label) {
  201. err = SwitchPinger_Result_LABEL_MISMATCH;
  202. } else if ((p->data || data->len > 0) && !String_equals(data, p->data)) {
  203. err = SwitchPinger_Result_WRONG_DATA;
  204. } else if (p->context->error == Error_LOOP_ROUTE) {
  205. err = SwitchPinger_Result_LOOP_ROUTE;
  206. } else if (p->context->error) {
  207. err = SwitchPinger_Result_ERROR_RESPONSE;
  208. }
  209. } else {
  210. err = SwitchPinger_Result_TIMEOUT;
  211. label = p->label;
  212. }
  213. uint32_t version = p->context->incomingVersion;
  214. struct SwitchPinger_Response* resp =
  215. Allocator_calloc(p->pub.pingAlloc, sizeof(struct SwitchPinger_Response), 1);
  216. resp->version = p->context->incomingVersion;
  217. resp->res = err;
  218. resp->label = label;
  219. resp->data = data;
  220. resp->milliseconds = milliseconds;
  221. resp->version = version;
  222. Bits_memcpy(resp->key, p->context->incomingKey, 32);
  223. Bits_memcpy(&resp->snode, &p->context->incomingSnodeAddr, sizeof(struct Address));
  224. resp->kbpsLimit = p->context->incomingSnodeKbps;
  225. resp->rpath = p->context->rpath;
  226. resp->ping = &p->pub;
  227. p->onResponse(resp, p->pub.onResponseContext);
  228. }
  229. static void sendPing(String* data, void* sendPingContext)
  230. {
  231. struct Ping* p = Identity_check((struct Ping*) sendPingContext);
  232. struct Message* msg = Message_new(0, data->len + 512, p->pub.pingAlloc);
  233. while (((uintptr_t)msg->msgbytes - data->len) % 4) {
  234. Er_assert(Message_epush8(msg, 0));
  235. }
  236. Er_assert(Message_truncate(msg, 0));
  237. Er_assert(Message_epush(msg, data->bytes, data->len));
  238. Assert_true(!((uintptr_t)msg->msgbytes % 4) && "alignment fault");
  239. if (p->pub.type == SwitchPinger_Type_KEYPING) {
  240. Er_assert(Message_epush(msg, NULL, Control_KeyPing_HEADER_SIZE));
  241. struct Control_KeyPing* keyPingHeader = (struct Control_KeyPing*) msg->msgbytes;
  242. keyPingHeader->magic = Control_KeyPing_MAGIC;
  243. keyPingHeader->version_be = Endian_hostToBigEndian32(Version_CURRENT_PROTOCOL);
  244. Bits_memcpy(keyPingHeader->key, p->context->myAddr->key, 32);
  245. } else if (p->pub.type == SwitchPinger_Type_PING) {
  246. Er_assert(Message_epush(msg, NULL, Control_Ping_HEADER_SIZE));
  247. struct Control_Ping* pingHeader = (struct Control_Ping*) msg->msgbytes;
  248. pingHeader->magic = Control_Ping_MAGIC;
  249. pingHeader->version_be = Endian_hostToBigEndian32(Version_CURRENT_PROTOCOL);
  250. } else if (p->pub.type == SwitchPinger_Type_GETSNODE) {
  251. Er_assert(Message_epush(msg, NULL, Control_GetSnode_HEADER_SIZE));
  252. struct Control_GetSnode* hdr = (struct Control_GetSnode*) msg->msgbytes;
  253. hdr->magic = Control_GETSNODE_QUERY_MAGIC;
  254. hdr->version_be = Endian_hostToBigEndian32(Version_CURRENT_PROTOCOL);
  255. hdr->kbps_be = Endian_hostToBigEndian32(p->pub.kbpsLimit);
  256. Bits_memcpy(hdr->snodeKey, p->pub.snode.key, 32);
  257. uint64_t pathToSnode_be = Endian_hostToBigEndian64(p->pub.snode.path);
  258. Bits_memcpy(hdr->pathToSnode_be, &pathToSnode_be, 8);
  259. hdr->snodeVersion_be = Endian_hostToBigEndian32(p->pub.snode.protocolVersion);
  260. } else if (p->pub.type == SwitchPinger_Type_RPATH) {
  261. Er_assert(Message_epush(msg, NULL, Control_RPath_HEADER_SIZE));
  262. struct Control_RPath* hdr = (struct Control_RPath*) msg->msgbytes;
  263. hdr->magic = Control_RPATH_QUERY_MAGIC;
  264. hdr->version_be = Endian_hostToBigEndian32(Version_CURRENT_PROTOCOL);
  265. uint64_t path_be = Endian_hostToBigEndian64(p->label);
  266. Bits_memcpy(hdr->rpath_be, &path_be, 8);
  267. } else {
  268. Assert_failure("unexpected ping type");
  269. }
  270. Er_assert(Message_eshift(msg, Control_Header_SIZE));
  271. struct Control* ctrl = (struct Control*) msg->msgbytes;
  272. ctrl->header.checksum_be = 0;
  273. if (p->pub.type == SwitchPinger_Type_PING) {
  274. ctrl->header.type_be = Control_PING_be;
  275. } else if (p->pub.type == SwitchPinger_Type_KEYPING) {
  276. ctrl->header.type_be = Control_KEYPING_be;
  277. } else if (p->pub.type == SwitchPinger_Type_GETSNODE) {
  278. ctrl->header.type_be = Control_GETSNODE_QUERY_be;
  279. } else if (p->pub.type == SwitchPinger_Type_RPATH) {
  280. ctrl->header.type_be = Control_RPATH_QUERY_be;
  281. } else {
  282. Assert_failure("unexpected type");
  283. }
  284. ctrl->header.checksum_be = Checksum_engine_be(msg->msgbytes, Message_getLength(msg));
  285. struct RouteHeader rh;
  286. Bits_memset(&rh, 0, RouteHeader_SIZE);
  287. rh.flags |= RouteHeader_flags_CTRLMSG;
  288. rh.sh.label_be = Endian_hostToBigEndian64(p->label);
  289. SwitchHeader_setVersion(&rh.sh, SwitchHeader_CURRENT_VERSION);
  290. Er_assert(Message_epush(msg, &rh, RouteHeader_SIZE));
  291. Iface_send(&p->context->pub.controlHandlerIf, msg);
  292. }
  293. static String* RESULT_STRING_OK = String_CONST_SO("pong");
  294. static String* RESULT_STRING_LABEL_MISMATCH = String_CONST_SO("diff_label");
  295. static String* RESULT_STRING_WRONG_DATA = String_CONST_SO("diff_data");
  296. static String* RESULT_STRING_ERROR_RESPONSE = String_CONST_SO("err_switch");
  297. static String* RESULT_STRING_TIMEOUT = String_CONST_SO("timeout");
  298. static String* RESULT_STRING_UNKNOWN = String_CONST_SO("err_unknown");
  299. static String* RESULT_STRING_LOOP = String_CONST_SO("err_loop");
  300. String* SwitchPinger_resultString(enum SwitchPinger_Result result)
  301. {
  302. switch (result) {
  303. case SwitchPinger_Result_OK:
  304. return RESULT_STRING_OK;
  305. case SwitchPinger_Result_LABEL_MISMATCH:
  306. return RESULT_STRING_LABEL_MISMATCH;
  307. case SwitchPinger_Result_WRONG_DATA:
  308. return RESULT_STRING_WRONG_DATA;
  309. case SwitchPinger_Result_ERROR_RESPONSE:
  310. return RESULT_STRING_ERROR_RESPONSE;
  311. case SwitchPinger_Result_TIMEOUT:
  312. return RESULT_STRING_TIMEOUT;
  313. case SwitchPinger_Result_LOOP_ROUTE:
  314. return RESULT_STRING_LOOP;
  315. default:
  316. return RESULT_STRING_UNKNOWN;
  317. };
  318. }
  319. static int onPingFree(struct Allocator_OnFreeJob* job)
  320. {
  321. struct Ping* ping = Identity_check((struct Ping*)job->userData);
  322. struct SwitchPinger_pvt* ctx = Identity_check(ping->context);
  323. ctx->outstandingPings--;
  324. Assert_true(ctx->outstandingPings >= 0);
  325. return 0;
  326. }
  327. struct SwitchPinger_Ping* SwitchPinger_newPing(uint64_t label,
  328. String* data,
  329. uint32_t timeoutMilliseconds,
  330. SwitchPinger_ResponseCallback onResponse,
  331. struct Allocator* alloc,
  332. struct SwitchPinger* context)
  333. {
  334. struct SwitchPinger_pvt* ctx = Identity_check((struct SwitchPinger_pvt*)context);
  335. if (data && data->len > Control_Ping_MAX_SIZE) {
  336. return NULL;
  337. }
  338. if (ctx->outstandingPings > ctx->maxConcurrentPings) {
  339. Log_debug(ctx->logger, "Skipping switch ping because there are already [%d] outstanding",
  340. ctx->outstandingPings);
  341. return NULL;
  342. }
  343. struct Pinger_Ping* pp =
  344. Pinger_newPing(data, onPingResponse, sendPing, timeoutMilliseconds, alloc, ctx->pinger);
  345. struct Ping* ping = Allocator_clone(pp->pingAlloc, (&(struct Ping) {
  346. .pub = {
  347. .pingAlloc = pp->pingAlloc
  348. },
  349. .label = label,
  350. .data = String_clone(data, pp->pingAlloc),
  351. .context = ctx,
  352. .onResponse = onResponse,
  353. .pingerPing = pp
  354. }));
  355. Identity_set(ping);
  356. Allocator_onFree(pp->pingAlloc, onPingFree, ping);
  357. pp->context = ping;
  358. ctx->outstandingPings++;
  359. return &ping->pub;
  360. }
  361. struct SwitchPinger* SwitchPinger_new(struct EventBase* eventBase,
  362. struct Random* rand,
  363. struct Log* logger,
  364. struct Address* myAddr,
  365. struct Allocator* allocator)
  366. {
  367. struct Allocator* alloc = Allocator_child(allocator);
  368. struct SwitchPinger_pvt* sp = Allocator_malloc(alloc, sizeof(struct SwitchPinger_pvt));
  369. Bits_memcpy(sp, (&(struct SwitchPinger_pvt) {
  370. .pub = {
  371. .controlHandlerIf = {
  372. .send = messageFromControlHandler
  373. }
  374. },
  375. .pinger = Pinger_new(eventBase, rand, logger, alloc),
  376. .logger = logger,
  377. .allocator = alloc,
  378. .myAddr = myAddr,
  379. .maxConcurrentPings = SwitchPinger_DEFAULT_MAX_CONCURRENT_PINGS,
  380. }), sizeof(struct SwitchPinger_pvt));
  381. Identity_set(sp);
  382. return &sp->pub;
  383. }