SwitchPinger.c 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. */
  15. #include "benc/String.h"
  16. #include "net/SwitchPinger.h"
  17. #include "dht/Address.h"
  18. #include "util/Bits.h"
  19. #include "util/Checksum.h"
  20. #include "util/Endian.h"
  21. #include "util/Pinger.h"
  22. #include "util/version/Version.h"
  23. #include "util/Identity.h"
  24. #include "wire/Headers.h"
  25. #include "wire/Control.h"
  26. #include "wire/Error.h"
  27. struct SwitchPinger
  28. {
  29. struct Interface* iface;
  30. struct Pinger* pinger;
  31. struct Admin* admin;
  32. struct Log* logger;
  33. struct Allocator* allocator;
  34. /**
  35. * The label is stored here while the message is sent through the pinger
  36. * and it decides which ping the incoming message belongs to.
  37. */
  38. uint64_t incomingLabel;
  39. /** The version of the node which sent the message. */
  40. uint32_t incomingVersion;
  41. /** The error code if an error has been returned (see Error.h) */
  42. int error;
  43. /** Pings which are currently waiting for responses. */
  44. int outstandingPings;
  45. /** Maximum number of pings which can be outstanding at one time. */
  46. int maxConcurrentPings;
  47. Identity
  48. };
  49. struct Ping
  50. {
  51. struct SwitchPinger_Ping public;
  52. uint64_t label;
  53. String* data;
  54. struct SwitchPinger* context;
  55. SwitchPinger_ResponseCallback onResponse;
  56. void* onResponseContext;
  57. struct Pinger_Ping* pingerPing;
  58. Identity
  59. };
  60. // incoming message from network, pointing to the beginning of the switch header.
  61. static uint8_t receiveMessage(struct Message* msg, struct Interface* iface)
  62. {
  63. struct SwitchPinger* ctx = Identity_check((struct SwitchPinger*) iface->receiverContext);
  64. struct Headers_SwitchHeader* switchHeader = (struct Headers_SwitchHeader*) msg->bytes;
  65. ctx->incomingLabel = Endian_bigEndianToHost64(switchHeader->label_be);
  66. ctx->incomingVersion = 0;
  67. Assert_true(Headers_getMessageType(switchHeader) == Headers_SwitchHeader_TYPE_CONTROL);
  68. Message_shift(msg, -Headers_SwitchHeader_SIZE, NULL);
  69. struct Control* ctrl = (struct Control*) msg->bytes;
  70. if (ctrl->type_be == Control_PONG_be) {
  71. Message_shift(msg, -Control_HEADER_SIZE, NULL);
  72. ctx->error = Error_NONE;
  73. struct Control_Ping* pongHeader = (struct Control_Ping*) msg->bytes;
  74. if (msg->length >= Control_Pong_MIN_SIZE) {
  75. ctx->incomingVersion = Endian_bigEndianToHost32(pongHeader->version_be);
  76. Message_shift(msg, -Control_Pong_HEADER_SIZE, NULL);
  77. if (pongHeader->magic != Control_Pong_MAGIC) {
  78. Log_debug(ctx->logger, "dropped invalid switch pong");
  79. return Error_INVALID;
  80. }
  81. } else {
  82. Log_debug(ctx->logger, "got runt pong message, length: [%d]", msg->length);
  83. return Error_INVALID;
  84. }
  85. } else if (ctrl->type_be == Control_ERROR_be) {
  86. Message_shift(msg, -Control_HEADER_SIZE, NULL);
  87. Assert_true((uint8_t*)&ctrl->content.error.errorType_be == msg->bytes);
  88. ctx->error = Message_pop32(msg, NULL);
  89. Log_debug(ctx->logger, "error was caused by our ping [%s]", Error_strerror(ctx->error));
  90. Message_push32(msg, 0, NULL);
  91. Message_shift(msg, -(
  92. Control_Error_HEADER_SIZE
  93. + Headers_SwitchHeader_SIZE
  94. + Control_HEADER_SIZE
  95. + Control_Ping_HEADER_SIZE
  96. ), NULL);
  97. } else {
  98. // If it gets here then Ducttape.c is failing.
  99. Assert_always(false);
  100. }
  101. String* msgStr = &(String) { .bytes = (char*) msg->bytes, .len = msg->length };
  102. Pinger_pongReceived(msgStr, ctx->pinger);
  103. return Error_NONE;
  104. }
  105. static void onPingResponse(String* data, uint32_t milliseconds, void* vping)
  106. {
  107. struct Ping* p = Identity_check((struct Ping*) vping);
  108. enum SwitchPinger_Result err = SwitchPinger_Result_OK;
  109. uint64_t label = p->context->incomingLabel;
  110. if (data) {
  111. if (label != p->label) {
  112. err = SwitchPinger_Result_LABEL_MISMATCH;
  113. } else if ((p->data || data->len > 0) && !String_equals(data, p->data)) {
  114. err = SwitchPinger_Result_WRONG_DATA;
  115. } else if (p->context->error == Error_LOOP_ROUTE) {
  116. err = SwitchPinger_Result_LOOP_ROUTE;
  117. } else if (p->context->error) {
  118. err = SwitchPinger_Result_ERROR_RESPONSE;
  119. }
  120. } else {
  121. err = SwitchPinger_Result_TIMEOUT;
  122. }
  123. uint32_t version = p->context->incomingVersion;
  124. p->onResponse(err, label, data, milliseconds, version, p->public.onResponseContext);
  125. }
  126. static void sendPing(String* data, void* sendPingContext)
  127. {
  128. struct Ping* p = Identity_check((struct Ping*) sendPingContext);
  129. #define BUFFER_SZ 4096
  130. uint8_t buffer[BUFFER_SZ];
  131. struct Message msg = {
  132. .length = data->len,
  133. // Make it aligned along an 8 byte boundry (assuming the buffer is)
  134. .bytes = &buffer[BUFFER_SZ - (data->len + 8 - (data->len % 8))]
  135. };
  136. msg.padding = msg.bytes - buffer;
  137. Assert_true(data->len < (BUFFER_SZ / 2));
  138. Bits_memcpy(msg.bytes, data->bytes, data->len);
  139. Message_shift(&msg, Control_Ping_HEADER_SIZE, NULL);
  140. struct Control_Ping* pingHeader = (struct Control_Ping*) msg.bytes;
  141. pingHeader->magic = Control_Ping_MAGIC;
  142. pingHeader->version_be = Endian_hostToBigEndian32(Version_CURRENT_PROTOCOL);
  143. Message_shift(&msg, Control_HEADER_SIZE, NULL);
  144. struct Control* ctrl = (struct Control*) msg.bytes;
  145. ctrl->checksum_be = 0;
  146. ctrl->type_be = Control_PING_be;
  147. ctrl->checksum_be = Checksum_engine(msg.bytes, msg.length);
  148. Message_shift(&msg, Headers_SwitchHeader_SIZE, NULL);
  149. struct Headers_SwitchHeader* switchHeader = (struct Headers_SwitchHeader*) msg.bytes;
  150. switchHeader->label_be = Endian_hostToBigEndian64(p->label);
  151. Headers_setPriorityAndMessageType(switchHeader, 0, Headers_SwitchHeader_TYPE_CONTROL);
  152. p->context->iface->sendMessage(&msg, p->context->iface);
  153. }
  154. static String* RESULT_STRING_OK = String_CONST_SO("pong");
  155. static String* RESULT_STRING_LABEL_MISMATCH = String_CONST_SO("diff_label");
  156. static String* RESULT_STRING_WRONG_DATA = String_CONST_SO("diff_data");
  157. static String* RESULT_STRING_ERROR_RESPONSE = String_CONST_SO("err_switch");
  158. static String* RESULT_STRING_TIMEOUT = String_CONST_SO("timeout");
  159. static String* RESULT_STRING_UNKNOWN = String_CONST_SO("err_unknown");
  160. static String* RESULT_STRING_LOOP = String_CONST_SO("err_loop");
  161. String* SwitchPinger_resultString(enum SwitchPinger_Result result)
  162. {
  163. switch (result) {
  164. case SwitchPinger_Result_OK:
  165. return RESULT_STRING_OK;
  166. case SwitchPinger_Result_LABEL_MISMATCH:
  167. return RESULT_STRING_LABEL_MISMATCH;
  168. case SwitchPinger_Result_WRONG_DATA:
  169. return RESULT_STRING_WRONG_DATA;
  170. case SwitchPinger_Result_ERROR_RESPONSE:
  171. return RESULT_STRING_ERROR_RESPONSE;
  172. case SwitchPinger_Result_TIMEOUT:
  173. return RESULT_STRING_TIMEOUT;
  174. case SwitchPinger_Result_LOOP_ROUTE:
  175. return RESULT_STRING_LOOP;
  176. default:
  177. return RESULT_STRING_UNKNOWN;
  178. };
  179. }
  180. static int onPingFree(struct Allocator_OnFreeJob* job)
  181. {
  182. struct Ping* ping = Identity_check((struct Ping*)job->userData);
  183. struct SwitchPinger* ctx = Identity_check(ping->context);
  184. ctx->outstandingPings--;
  185. Assert_true(ctx->outstandingPings >= 0);
  186. return 0;
  187. }
  188. struct SwitchPinger_Ping* SwitchPinger_newPing(uint64_t label,
  189. String* data,
  190. uint32_t timeoutMilliseconds,
  191. SwitchPinger_ResponseCallback onResponse,
  192. struct Allocator* alloc,
  193. struct SwitchPinger* ctx)
  194. {
  195. if (data && data->len > Control_Ping_MAX_SIZE) {
  196. return NULL;
  197. }
  198. if (ctx->outstandingPings > ctx->maxConcurrentPings) {
  199. Log_debug(ctx->logger, "Skipping switch ping because there are already [%d] outstanding",
  200. ctx->outstandingPings);
  201. return NULL;
  202. }
  203. struct Pinger_Ping* pp =
  204. Pinger_newPing(data, onPingResponse, sendPing, timeoutMilliseconds, alloc, ctx->pinger);
  205. struct Ping* ping = Allocator_clone(pp->pingAlloc, (&(struct Ping) {
  206. .public = {
  207. .pingAlloc = pp->pingAlloc
  208. },
  209. .label = label,
  210. .data = String_clone(data, pp->pingAlloc),
  211. .context = ctx,
  212. .onResponse = onResponse,
  213. .pingerPing = pp
  214. }));
  215. Identity_set(ping);
  216. Allocator_onFree(pp->pingAlloc, onPingFree, ping);
  217. pp->context = ping;
  218. ctx->outstandingPings++;
  219. return &ping->public;
  220. }
  221. struct SwitchPinger* SwitchPinger_new(struct Interface* iface,
  222. struct EventBase* eventBase,
  223. struct Random* rand,
  224. struct Log* logger,
  225. struct Address* myAddr,
  226. struct Allocator* alloc)
  227. {
  228. struct SwitchPinger* sp = Allocator_malloc(alloc, sizeof(struct SwitchPinger));
  229. Bits_memcpyConst(sp, (&(struct SwitchPinger) {
  230. .iface = iface,
  231. .pinger = Pinger_new(eventBase, rand, logger, alloc),
  232. .logger = logger,
  233. .allocator = alloc,
  234. .maxConcurrentPings = SwitchPinger_DEFAULT_MAX_CONCURRENT_PINGS,
  235. }), sizeof(struct SwitchPinger));
  236. iface->receiveMessage = receiveMessage;
  237. iface->receiverContext = sp;
  238. Identity_set(sp);
  239. return sp;
  240. }