1
0

UDPAddrIface.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. */
  15. #include "util/events/libuv/UvWrapper.h"
  16. #include "exception/Except.h"
  17. #include "interface/Iface.h"
  18. #include "util/events/UDPAddrIface.h"
  19. #include "memory/Allocator.h"
  20. #include "util/events/libuv/EventBase_pvt.h"
  21. #include "util/platform/Sockaddr.h"
  22. #include "util/Assert.h"
  23. #include "util/Identity.h"
  24. #include "wire/Message.h"
  25. #include "wire/Error.h"
  26. #include "util/Hex.h"
  27. struct UDPAddrIface_pvt
  28. {
  29. struct UDPAddrIface pub;
  30. struct Allocator* allocator;
  31. struct Log* logger;
  32. /** Job to close the handle when the allocator is freed */
  33. struct Allocator_OnFreeJob* closeHandleOnFree;
  34. /** Job which blocks the freeing until the callback completes */
  35. struct Allocator_OnFreeJob* blockFreeInsideCallback;
  36. uv_udp_t uvHandle;
  37. int queueLen;
  38. /** true if we are inside of the callback, used by blockFreeInsideCallback */
  39. int inCallback;
  40. Identity
  41. };
  42. struct UDPAddrIface_WriteRequest_pvt {
  43. uv_udp_send_t uvReq;
  44. int32_t length;
  45. struct UDPAddrIface_pvt* udp;
  46. struct Message* msg;
  47. struct Allocator* alloc;
  48. Identity
  49. };
  50. static struct UDPAddrIface_pvt* ifaceForHandle(uv_udp_t* handle)
  51. {
  52. char* hp = ((char*)handle) - offsetof(struct UDPAddrIface_pvt, uvHandle);
  53. return Identity_check((struct UDPAddrIface_pvt*) hp);
  54. }
  55. static void sendComplete(uv_udp_send_t* uvReq, int error)
  56. {
  57. struct UDPAddrIface_WriteRequest_pvt* req =
  58. Identity_check((struct UDPAddrIface_WriteRequest_pvt*) uvReq);
  59. if (error) {
  60. Log_debug(req->udp->logger, "DROP Failed to write to UDPAddrIface [%s]",
  61. uv_strerror(error) );
  62. }
  63. Assert_true(req->msg->length == req->length);
  64. req->udp->queueLen -= req->msg->length;
  65. Assert_true(req->udp->queueLen >= 0);
  66. Allocator_free(req->alloc);
  67. }
  68. static Iface_DEFUN incomingFromIface(struct Message* m, struct Iface* iface)
  69. {
  70. struct UDPAddrIface_pvt* context = Identity_check((struct UDPAddrIface_pvt*) iface);
  71. Assert_true(m->length >= Sockaddr_OVERHEAD);
  72. if (((struct Sockaddr*)m->bytes)->flags & Sockaddr_flags_BCAST) {
  73. Log_debug(context->logger, "Attempted bcast, bcast unsupported");
  74. // bcast not supported.
  75. return Error(UNHANDLED);
  76. }
  77. if (context->queueLen > UDPAddrIface_MAX_QUEUE) {
  78. Log_warn(context->logger, "DROP msg length [%d] to [%s] maximum queue length reached",
  79. m->length, Sockaddr_print(context->pub.generic.addr, m->alloc));
  80. return Error(OVERFLOW);
  81. }
  82. // This allocator will hold the message allocator in existance after it is freed.
  83. struct Allocator* reqAlloc = Allocator_child(context->allocator);
  84. if (m->alloc) {
  85. Allocator_adopt(reqAlloc, m->alloc);
  86. } else {
  87. m = Message_clone(m, reqAlloc);
  88. }
  89. struct UDPAddrIface_WriteRequest_pvt* req =
  90. Allocator_clone(reqAlloc, (&(struct UDPAddrIface_WriteRequest_pvt) {
  91. .udp = context,
  92. .msg = m,
  93. .alloc = reqAlloc
  94. }));
  95. Identity_set(req);
  96. struct Sockaddr_storage ss;
  97. Er_assert(Message_epop(m, &ss, context->pub.generic.addr->addrLen));
  98. Assert_true(ss.addr.addrLen == context->pub.generic.addr->addrLen);
  99. req->length = m->length;
  100. uv_buf_t buffers[] = {
  101. { .base = (char*)m->bytes, .len = m->length }
  102. };
  103. int ret = uv_udp_send(&req->uvReq, &context->uvHandle, buffers, 1,
  104. (const struct sockaddr*)ss.nativeAddr, (uv_udp_send_cb)&sendComplete);
  105. if (ret) {
  106. Log_info(context->logger, "DROP Failed writing to UDPAddrIface [%s]",
  107. uv_strerror(ret));
  108. Allocator_free(req->alloc);
  109. return Error(UNHANDLED);
  110. }
  111. context->queueLen += m->length;
  112. return Error(NONE);
  113. }
  114. #if UDPAddrIface_PADDING_AMOUNT < 8
  115. #error
  116. #endif
  117. #define ALLOC(buff) (((struct Allocator**) &(buff[-(8 + (((uintptr_t)buff) % 8))]))[0])
  118. static void incoming(uv_udp_t* handle,
  119. ssize_t nread,
  120. const uv_buf_t* buf,
  121. const struct sockaddr* addr,
  122. unsigned flags)
  123. {
  124. struct UDPAddrIface_pvt* context = ifaceForHandle(handle);
  125. context->inCallback = 1;
  126. // Grab out the allocator which was placed there by allocate()
  127. struct Allocator* alloc = buf->base ? ALLOC(buf->base) : NULL;
  128. // if nread < 0, we used to log uv_last_error, which doesn't exist anymore.
  129. if (nread == 0) {
  130. // Happens constantly
  131. //Log_debug(context->logger, "0 length read");
  132. } else {
  133. struct Message* m = Allocator_calloc(alloc, sizeof(struct Message), 1);
  134. m->length = nread;
  135. m->padding = UDPAddrIface_PADDING_AMOUNT + context->pub.generic.addr->addrLen;
  136. m->capacity = buf->len;
  137. m->bytes = (uint8_t*)buf->base;
  138. m->alloc = alloc;
  139. Er_assert(Message_epush(m, addr, context->pub.generic.addr->addrLen - Sockaddr_OVERHEAD));
  140. // make sure the sockaddr doesn't have crap in it which will
  141. // prevent it from being used as a lookup key
  142. Sockaddr_normalizeNative((struct sockaddr*) m->bytes);
  143. Er_assert(Message_epush(m, context->pub.generic.addr, Sockaddr_OVERHEAD));
  144. /*uint8_t buff[256] = {0};
  145. Assert_true(Hex_encode(buff, 255, m->bytes, context->pub.generic.addr->addrLen));
  146. Log_debug(context->logger, "Message from [%s]", buff);*/
  147. Iface_send(&context->pub.generic.iface, m);
  148. }
  149. if (alloc) {
  150. Allocator_free(alloc);
  151. }
  152. context->inCallback = 0;
  153. if (context->blockFreeInsideCallback) {
  154. Allocator_onFreeComplete((struct Allocator_OnFreeJob*) context->blockFreeInsideCallback);
  155. }
  156. }
  157. static void allocate(uv_handle_t* handle, size_t size, uv_buf_t* buf)
  158. {
  159. struct UDPAddrIface_pvt* context = ifaceForHandle((uv_udp_t*)handle);
  160. size = UDPAddrIface_BUFFER_CAP;
  161. size_t fullSize = size + UDPAddrIface_PADDING_AMOUNT + context->pub.generic.addr->addrLen;
  162. struct Allocator* child = Allocator_child(context->allocator);
  163. char* buff = Allocator_malloc(child, fullSize);
  164. buff += UDPAddrIface_PADDING_AMOUNT + context->pub.generic.addr->addrLen;
  165. ALLOC(buff) = child;
  166. buf->base = buff;
  167. buf->len = size;
  168. }
  169. static void onClosed(uv_handle_t* wasClosed)
  170. {
  171. struct UDPAddrIface_pvt* context =
  172. Identity_check((struct UDPAddrIface_pvt*) wasClosed->data);
  173. Allocator_onFreeComplete((struct Allocator_OnFreeJob*) context->closeHandleOnFree);
  174. }
  175. static int closeHandleOnFree(struct Allocator_OnFreeJob* job)
  176. {
  177. struct UDPAddrIface_pvt* context =
  178. Identity_check((struct UDPAddrIface_pvt*) job->userData);
  179. context->closeHandleOnFree = job;
  180. uv_close((uv_handle_t*)&context->uvHandle, onClosed);
  181. return Allocator_ONFREE_ASYNC;
  182. }
  183. static int blockFreeInsideCallback(struct Allocator_OnFreeJob* job)
  184. {
  185. struct UDPAddrIface_pvt* context =
  186. Identity_check((struct UDPAddrIface_pvt*) job->userData);
  187. if (!context->inCallback) {
  188. return 0;
  189. }
  190. context->blockFreeInsideCallback = job;
  191. return Allocator_ONFREE_ASYNC;
  192. }
  193. int UDPAddrIface_setDSCP(struct UDPAddrIface* iface, uint8_t dscp)
  194. {
  195. int res = 0;
  196. /* For win32 setsockopt is unable to mark the TOS field in IP header, do not support it now */
  197. #ifndef win32
  198. struct UDPAddrIface_pvt* context = Identity_check((struct UDPAddrIface_pvt*) iface);
  199. /* 6-bit DSCP, 2-bit ENC(useless for UDP) */
  200. int tos = dscp << 2;
  201. if (Sockaddr_getFamily(context->pub.generic.addr) == Sockaddr_AF_INET) {
  202. res = setsockopt(context->uvHandle.io_watcher.fd, IPPROTO_IP, IP_TOS,
  203. &tos, sizeof(tos));
  204. } else if (Sockaddr_getFamily(context->pub.generic.addr) == Sockaddr_AF_INET6) {
  205. res = setsockopt(context->uvHandle.io_watcher.fd, IPPROTO_IPV6, IPV6_TCLASS,
  206. &tos, sizeof(tos));
  207. }
  208. #endif
  209. return res;
  210. }
  211. int UDPAddrIface_getFd(struct UDPAddrIface* iface)
  212. {
  213. int out = -1;
  214. #ifndef win32
  215. struct UDPAddrIface_pvt* context = Identity_check((struct UDPAddrIface_pvt*) iface);
  216. out = context->uvHandle.io_watcher.fd;
  217. #endif
  218. return out;
  219. }
  220. int UDPAddrIface_setBroadcast(struct UDPAddrIface* iface, bool enable)
  221. {
  222. struct UDPAddrIface_pvt* context = Identity_check((struct UDPAddrIface_pvt*) iface);
  223. return uv_udp_set_broadcast(&context->uvHandle, enable ? 1 : 0);
  224. }
  225. Er_DEFUN(struct UDPAddrIface* UDPAddrIface_new(struct EventBase* eventBase,
  226. struct Sockaddr* addr,
  227. struct Allocator* alloc,
  228. struct Log* logger))
  229. {
  230. struct EventBase_pvt* base = EventBase_privatize(eventBase);
  231. struct UDPAddrIface_pvt* context =
  232. Allocator_clone(alloc, (&(struct UDPAddrIface_pvt) {
  233. .logger = logger,
  234. .allocator = alloc
  235. }));
  236. context->pub.generic.alloc = alloc;
  237. context->pub.generic.iface.send = incomingFromIface;
  238. Identity_set(context);
  239. if (addr) {
  240. Log_debug(logger, "Binding to address [%s]", Sockaddr_print(addr, alloc));
  241. }
  242. struct Sockaddr_storage ss;
  243. if (!addr) {
  244. Sockaddr_parse("0.0.0.0:0", &ss);
  245. addr = &ss.addr;
  246. }
  247. uv_udp_init(base->loop, &context->uvHandle);
  248. context->uvHandle.data = context;
  249. int ret;
  250. void* native = Sockaddr_asNative(addr);
  251. ret = uv_udp_bind(&context->uvHandle, (const struct sockaddr*)native, 0);
  252. if (ret) {
  253. Er_raise(alloc, "call to uv_udp_bind() failed [%s]", uv_strerror(ret));
  254. }
  255. ret = uv_udp_recv_start(&context->uvHandle, allocate, incoming);
  256. if (ret) {
  257. const char* err = uv_strerror(ret);
  258. uv_close((uv_handle_t*) &context->uvHandle, NULL);
  259. Er_raise(alloc, "uv_udp_recv_start() failed [%s]", err);
  260. }
  261. int nameLen = sizeof(struct Sockaddr_storage);
  262. Bits_memset(&ss, 0, sizeof(struct Sockaddr_storage));
  263. ret = uv_udp_getsockname(&context->uvHandle, (void*)ss.nativeAddr, &nameLen);
  264. if (ret) {
  265. const char* err = uv_strerror(ret);
  266. uv_close((uv_handle_t*) &context->uvHandle, NULL);
  267. Er_raise(alloc, "uv_udp_getsockname() failed [%s]", err);
  268. }
  269. ss.addr.addrLen = nameLen + 8;
  270. context->pub.generic.addr = Sockaddr_clone(&ss.addr, alloc);
  271. Log_debug(logger, "Bound to address [%s]", Sockaddr_print(context->pub.generic.addr, alloc));
  272. Allocator_onFree(alloc, closeHandleOnFree, context);
  273. Allocator_onFree(alloc, blockFreeInsideCallback, context);
  274. Er_ret(&context->pub);
  275. }