1
0

SwitchPinger.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. */
  15. #include "benc/String.h"
  16. #include "net/SwitchPinger.h"
  17. #include "dht/Address.h"
  18. #include "util/Bits.h"
  19. #include "util/Checksum.h"
  20. #include "util/Endian.h"
  21. #include "util/Pinger.h"
  22. #include "util/version/Version.h"
  23. #include "util/Identity.h"
  24. #include "wire/SwitchHeader.h"
  25. #include "wire/Control.h"
  26. #include "wire/Error.h"
  27. struct SwitchPinger_pvt
  28. {
  29. struct SwitchPinger pub;
  30. struct Pinger* pinger;
  31. struct Admin* admin;
  32. struct Log* logger;
  33. struct Allocator* allocator;
  34. struct Address* myAddr;
  35. /**
  36. * The label is stored here while the message is sent through the pinger
  37. * and it decides which ping the incoming message belongs to.
  38. */
  39. uint64_t incomingLabel;
  40. /** The version of the node which sent the message. */
  41. uint32_t incomingVersion;
  42. uint8_t incomingKey[32];
  43. /** The error code if an error has been returned (see Error.h) */
  44. int error;
  45. /** Pings which are currently waiting for responses. */
  46. int outstandingPings;
  47. /** Maximum number of pings which can be outstanding at one time. */
  48. int maxConcurrentPings;
  49. Identity
  50. };
  51. struct Ping
  52. {
  53. struct SwitchPinger_Ping pub;
  54. uint64_t label;
  55. String* data;
  56. struct SwitchPinger_pvt* context;
  57. SwitchPinger_ResponseCallback onResponse;
  58. void* onResponseContext;
  59. struct Pinger_Ping* pingerPing;
  60. Identity
  61. };
  62. // incoming message from network, pointing to the beginning of the switch header.
  63. static Iface_DEFUN messageFromControlHandler(struct Message* msg, struct Iface* iface)
  64. {
  65. struct SwitchPinger_pvt* ctx = Identity_check((struct SwitchPinger_pvt*) iface);
  66. struct SwitchHeader* switchHeader = (struct SwitchHeader*) msg->bytes;
  67. ctx->incomingLabel = Endian_bigEndianToHost64(switchHeader->label_be);
  68. ctx->incomingVersion = 0;
  69. Message_shift(msg, -SwitchHeader_SIZE, NULL);
  70. uint32_t handle = Message_pop32(msg, NULL);
  71. #ifdef Version_7_COMPAT
  72. if (handle != 0xffffffff) {
  73. Message_push32(msg, handle, NULL);
  74. handle = 0xffffffff;
  75. Assert_true(SwitchHeader_isV7Ctrl(switchHeader));
  76. }
  77. #endif
  78. Assert_true(handle == 0xffffffff);
  79. struct Control* ctrl = (struct Control*) msg->bytes;
  80. if (ctrl->header.type_be == Control_PONG_be) {
  81. Message_shift(msg, -Control_Header_SIZE, NULL);
  82. ctx->error = Error_NONE;
  83. if (msg->length >= Control_Pong_MIN_SIZE) {
  84. struct Control_Ping* pongHeader = (struct Control_Ping*) msg->bytes;
  85. ctx->incomingVersion = Endian_bigEndianToHost32(pongHeader->version_be);
  86. if (pongHeader->magic != Control_Pong_MAGIC) {
  87. Log_debug(ctx->logger, "dropped invalid switch pong");
  88. return NULL;
  89. }
  90. Message_shift(msg, -Control_Pong_HEADER_SIZE, NULL);
  91. } else {
  92. Log_debug(ctx->logger, "got runt pong message, length: [%d]", msg->length);
  93. return NULL;
  94. }
  95. } else if (ctrl->header.type_be == Control_KEYPONG_be) {
  96. Message_shift(msg, -Control_Header_SIZE, NULL);
  97. ctx->error = Error_NONE;
  98. if (msg->length >= Control_KeyPong_HEADER_SIZE && msg->length <= Control_KeyPong_MAX_SIZE) {
  99. struct Control_KeyPing* pongHeader = (struct Control_KeyPing*) msg->bytes;
  100. ctx->incomingVersion = Endian_bigEndianToHost32(pongHeader->version_be);
  101. if (pongHeader->magic != Control_KeyPong_MAGIC) {
  102. Log_debug(ctx->logger, "dropped invalid switch key-pong");
  103. return NULL;
  104. }
  105. Bits_memcpyConst(ctx->incomingKey, pongHeader->key, 32);
  106. Message_shift(msg, -Control_KeyPong_HEADER_SIZE, NULL);
  107. } else if (msg->length > Control_KeyPong_MAX_SIZE) {
  108. Log_debug(ctx->logger, "got overlong key-pong message, length: [%d]", msg->length);
  109. return NULL;
  110. } else {
  111. Log_debug(ctx->logger, "got runt key-pong message, length: [%d]", msg->length);
  112. return NULL;
  113. }
  114. } else if (ctrl->header.type_be == Control_ERROR_be) {
  115. Message_shift(msg, -Control_Header_SIZE, NULL);
  116. Assert_true((uint8_t*)&ctrl->content.error.errorType_be == msg->bytes);
  117. if (msg->length < (Control_Error_HEADER_SIZE + SwitchHeader_SIZE + Control_Header_SIZE)) {
  118. Log_debug(ctx->logger, "runt error packet");
  119. return NULL;
  120. }
  121. ctx->error = Message_pop32(msg, NULL);
  122. Message_push32(msg, 0, NULL);
  123. Message_shift(msg, -(Control_Error_HEADER_SIZE + SwitchHeader_SIZE), NULL);
  124. struct Control* origCtrl = (struct Control*) msg->bytes;
  125. Log_debug(ctx->logger, "error [%s] was caused by our [%s]",
  126. Error_strerror(ctx->error),
  127. Control_typeString(origCtrl->header.type_be));
  128. int shift;
  129. if (origCtrl->header.type_be == Control_PING_be) {
  130. shift = -(Control_Header_SIZE + Control_Ping_HEADER_SIZE);
  131. } else if (origCtrl->header.type_be == Control_KEYPING_be) {
  132. shift = -(Control_Header_SIZE + Control_KeyPing_HEADER_SIZE);
  133. } else {
  134. Assert_failure("problem in Ducttape.c");
  135. }
  136. if (msg->length < -shift) {
  137. Log_debug(ctx->logger, "runt error packet");
  138. }
  139. Message_shift(msg, shift, NULL);
  140. } else {
  141. // If it gets here then Ducttape.c is failing.
  142. Assert_true(false);
  143. }
  144. String* msgStr = &(String) { .bytes = (char*) msg->bytes, .len = msg->length };
  145. Pinger_pongReceived(msgStr, ctx->pinger);
  146. Bits_memset(ctx->incomingKey, 0, 32);
  147. return NULL;
  148. }
  149. static void onPingResponse(String* data, uint32_t milliseconds, void* vping)
  150. {
  151. struct Ping* p = Identity_check((struct Ping*) vping);
  152. enum SwitchPinger_Result err = SwitchPinger_Result_OK;
  153. uint64_t label = p->context->incomingLabel;
  154. if (data) {
  155. if (label != p->label) {
  156. err = SwitchPinger_Result_LABEL_MISMATCH;
  157. } else if ((p->data || data->len > 0) && !String_equals(data, p->data)) {
  158. err = SwitchPinger_Result_WRONG_DATA;
  159. } else if (p->context->error == Error_LOOP_ROUTE) {
  160. err = SwitchPinger_Result_LOOP_ROUTE;
  161. } else if (p->context->error) {
  162. err = SwitchPinger_Result_ERROR_RESPONSE;
  163. }
  164. } else {
  165. err = SwitchPinger_Result_TIMEOUT;
  166. }
  167. uint32_t version = p->context->incomingVersion;
  168. struct SwitchPinger_Response* resp =
  169. Allocator_calloc(p->pub.pingAlloc, sizeof(struct SwitchPinger_Response), 1);
  170. resp->version = p->context->incomingVersion;
  171. resp->res = err;
  172. resp->label = label;
  173. resp->data = data;
  174. resp->milliseconds = milliseconds;
  175. resp->version = version;
  176. Bits_memcpyConst(resp->key, p->context->incomingKey, 32);
  177. resp->ping = &p->pub;
  178. p->onResponse(resp, p->pub.onResponseContext);
  179. }
  180. static void sendPing(String* data, void* sendPingContext)
  181. {
  182. struct Ping* p = Identity_check((struct Ping*) sendPingContext);
  183. struct Message* msg = Message_new(0, data->len + 512, p->pub.pingAlloc);
  184. while (((uintptr_t)msg->bytes - data->len) % 4) {
  185. Message_push8(msg, 0, NULL);
  186. }
  187. msg->length = 0;
  188. Message_push(msg, data->bytes, data->len, NULL);
  189. Assert_true(!((uintptr_t)msg->bytes % 4) && "alignment fault");
  190. if (p->pub.keyPing) {
  191. Message_shift(msg, Control_KeyPing_HEADER_SIZE, NULL);
  192. struct Control_KeyPing* keyPingHeader = (struct Control_KeyPing*) msg->bytes;
  193. keyPingHeader->magic = Control_KeyPing_MAGIC;
  194. keyPingHeader->version_be = Endian_hostToBigEndian32(Version_CURRENT_PROTOCOL);
  195. Bits_memcpyConst(keyPingHeader->key, p->context->myAddr->key, 32);
  196. } else {
  197. Message_shift(msg, Control_Ping_HEADER_SIZE, NULL);
  198. struct Control_Ping* pingHeader = (struct Control_Ping*) msg->bytes;
  199. pingHeader->magic = Control_Ping_MAGIC;
  200. pingHeader->version_be = Endian_hostToBigEndian32(Version_CURRENT_PROTOCOL);
  201. }
  202. Message_shift(msg, Control_Header_SIZE, NULL);
  203. struct Control* ctrl = (struct Control*) msg->bytes;
  204. ctrl->header.checksum_be = 0;
  205. ctrl->header.type_be = (p->pub.keyPing) ? Control_KEYPING_be : Control_PING_be;
  206. ctrl->header.checksum_be = Checksum_engine(msg->bytes, msg->length);
  207. #ifdef Version_7_COMPAT
  208. if (0) {
  209. #endif
  210. Message_push32(msg, 0xffffffff, NULL);
  211. #ifdef Version_7_COMPAT
  212. }
  213. #endif
  214. Message_shift(msg, SwitchHeader_SIZE, NULL);
  215. struct SwitchHeader* switchHeader = (struct SwitchHeader*) msg->bytes;
  216. Bits_memset(switchHeader, 0, SwitchHeader_SIZE);
  217. switchHeader->label_be = Endian_hostToBigEndian64(p->label);
  218. SwitchHeader_setVersion(switchHeader, SwitchHeader_CURRENT_VERSION);
  219. #ifdef Version_7_COMPAT
  220. // v7 detects ctrl packets by the bit which has been
  221. // re-appropriated for suppression of errors.
  222. switchHeader->congestAndSuppressErrors = 1;
  223. SwitchHeader_setVersion(switchHeader, 0);
  224. #endif
  225. Iface_send(&p->context->pub.controlHandlerIf, msg);
  226. }
  227. static String* RESULT_STRING_OK = String_CONST_SO("pong");
  228. static String* RESULT_STRING_LABEL_MISMATCH = String_CONST_SO("diff_label");
  229. static String* RESULT_STRING_WRONG_DATA = String_CONST_SO("diff_data");
  230. static String* RESULT_STRING_ERROR_RESPONSE = String_CONST_SO("err_switch");
  231. static String* RESULT_STRING_TIMEOUT = String_CONST_SO("timeout");
  232. static String* RESULT_STRING_UNKNOWN = String_CONST_SO("err_unknown");
  233. static String* RESULT_STRING_LOOP = String_CONST_SO("err_loop");
  234. String* SwitchPinger_resultString(enum SwitchPinger_Result result)
  235. {
  236. switch (result) {
  237. case SwitchPinger_Result_OK:
  238. return RESULT_STRING_OK;
  239. case SwitchPinger_Result_LABEL_MISMATCH:
  240. return RESULT_STRING_LABEL_MISMATCH;
  241. case SwitchPinger_Result_WRONG_DATA:
  242. return RESULT_STRING_WRONG_DATA;
  243. case SwitchPinger_Result_ERROR_RESPONSE:
  244. return RESULT_STRING_ERROR_RESPONSE;
  245. case SwitchPinger_Result_TIMEOUT:
  246. return RESULT_STRING_TIMEOUT;
  247. case SwitchPinger_Result_LOOP_ROUTE:
  248. return RESULT_STRING_LOOP;
  249. default:
  250. return RESULT_STRING_UNKNOWN;
  251. };
  252. }
  253. static int onPingFree(struct Allocator_OnFreeJob* job)
  254. {
  255. struct Ping* ping = Identity_check((struct Ping*)job->userData);
  256. struct SwitchPinger_pvt* ctx = Identity_check(ping->context);
  257. ctx->outstandingPings--;
  258. Assert_true(ctx->outstandingPings >= 0);
  259. return 0;
  260. }
  261. struct SwitchPinger_Ping* SwitchPinger_newPing(uint64_t label,
  262. String* data,
  263. uint32_t timeoutMilliseconds,
  264. SwitchPinger_ResponseCallback onResponse,
  265. struct Allocator* alloc,
  266. struct SwitchPinger* context)
  267. {
  268. struct SwitchPinger_pvt* ctx = Identity_check((struct SwitchPinger_pvt*)context);
  269. if (data && data->len > Control_Ping_MAX_SIZE) {
  270. return NULL;
  271. }
  272. if (ctx->outstandingPings > ctx->maxConcurrentPings) {
  273. Log_debug(ctx->logger, "Skipping switch ping because there are already [%d] outstanding",
  274. ctx->outstandingPings);
  275. return NULL;
  276. }
  277. struct Pinger_Ping* pp =
  278. Pinger_newPing(data, onPingResponse, sendPing, timeoutMilliseconds, alloc, ctx->pinger);
  279. struct Ping* ping = Allocator_clone(pp->pingAlloc, (&(struct Ping) {
  280. .pub = {
  281. .pingAlloc = pp->pingAlloc
  282. },
  283. .label = label,
  284. .data = String_clone(data, pp->pingAlloc),
  285. .context = ctx,
  286. .onResponse = onResponse,
  287. .pingerPing = pp
  288. }));
  289. Identity_set(ping);
  290. Allocator_onFree(pp->pingAlloc, onPingFree, ping);
  291. pp->context = ping;
  292. ctx->outstandingPings++;
  293. return &ping->pub;
  294. }
  295. struct SwitchPinger* SwitchPinger_new(struct EventBase* eventBase,
  296. struct Random* rand,
  297. struct Log* logger,
  298. struct Address* myAddr,
  299. struct Allocator* allocator)
  300. {
  301. struct Allocator* alloc = Allocator_child(allocator);
  302. struct SwitchPinger_pvt* sp = Allocator_malloc(alloc, sizeof(struct SwitchPinger_pvt));
  303. Bits_memcpyConst(sp, (&(struct SwitchPinger_pvt) {
  304. .pub = {
  305. .controlHandlerIf = {
  306. .send = messageFromControlHandler
  307. }
  308. },
  309. .pinger = Pinger_new(eventBase, rand, logger, alloc),
  310. .logger = logger,
  311. .allocator = alloc,
  312. .myAddr = myAddr,
  313. .maxConcurrentPings = SwitchPinger_DEFAULT_MAX_CONCURRENT_PINGS,
  314. }), sizeof(struct SwitchPinger_pvt));
  315. Identity_set(sp);
  316. return &sp->pub;
  317. }