UDPBcastIface.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447
  1. /* vim: set expandtab ts=4 sw=4: */
  2. /*
  3. * You may redistribute this program and/or modify it under the terms of
  4. * the GNU General Public License as published by the Free Software Foundation,
  5. * either version 3 of the License, or (at your option) any later version.
  6. *
  7. * This program is distributed in the hope that it will be useful,
  8. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. * GNU General Public License for more details.
  11. *
  12. * You should have received a copy of the GNU General Public License
  13. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. */
  15. #include "util/events/libuv/UvWrapper.h"
  16. #include "exception/Except.h"
  17. #include "interface/Iface.h"
  18. #include "util/events/UDPBcastIface.h"
  19. #include "memory/Allocator.h"
  20. #include "util/events/libuv/EventBase_pvt.h"
  21. #include "util/platform/Sockaddr.h"
  22. #include "util/Assert.h"
  23. #include "util/Identity.h"
  24. #include "wire/Message.h"
  25. #include "wire/Error.h"
  26. #include "util/Hex.h"
  27. #include "benc/String.h"
  28. struct UDPBcastIface_pvt
  29. {
  30. struct UDPBcastIface pub;
  31. struct Allocator* allocator;
  32. struct Log* logger;
  33. /** Job to close the handle when the allocator is freed */
  34. struct Allocator_OnFreeJob* closeHandleOnFree;
  35. /** Job which blocks the freeing until the callback completes */
  36. struct Allocator_OnFreeJob* blockFreeInsideCallback;
  37. uv_udp_t uvHandle;
  38. int queueLen;
  39. bool bcast;
  40. List* devices;
  41. /** true if we are inside of the callback, used by blockFreeInsideCallback */
  42. int inCallback;
  43. Identity
  44. };
  45. struct UDPBcastIface_WriteRequest_pvt {
  46. uv_udp_send_t uvReq;
  47. int32_t length;
  48. struct UDPBcastIface_pvt* udp;
  49. struct Message* msg;
  50. struct Allocator* alloc;
  51. Identity
  52. };
  53. static struct UDPBcastIface_pvt* ifaceForHandle(uv_udp_t* handle)
  54. {
  55. char* hp = ((char*)handle) - offsetof(struct UDPBcastIface_pvt, uvHandle);
  56. return Identity_check((struct UDPBcastIface_pvt*) hp);
  57. }
  58. static void sendComplete(uv_udp_send_t* uvReq, int error)
  59. {
  60. struct UDPBcastIface_WriteRequest_pvt* req =
  61. Identity_check((struct UDPBcastIface_WriteRequest_pvt*) uvReq);
  62. if (error) {
  63. Log_debug(req->udp->logger, "DROP Failed to write to UDPBcastIface [%s]",
  64. uv_strerror(error) );
  65. }
  66. Assert_true(req->msg->length == req->length);
  67. req->udp->queueLen -= req->msg->length;
  68. Assert_true(req->udp->queueLen >= 0);
  69. Allocator_free(req->alloc);
  70. }
  71. static void sendPacket(struct Message* m, struct sockaddr* addr, struct Iface* iface)
  72. {
  73. struct Message* msg;
  74. struct UDPBcastIface_pvt* context = Identity_check((struct UDPBcastIface_pvt*) iface);
  75. // This allocator will hold the message allocator in existance after it is freed.
  76. struct Allocator* reqAlloc = Allocator_child(context->allocator);
  77. msg = Message_clone(m, reqAlloc);
  78. struct UDPBcastIface_WriteRequest_pvt* req =
  79. Allocator_clone(reqAlloc, (&(struct UDPBcastIface_WriteRequest_pvt) {
  80. .udp = context,
  81. .msg = msg,
  82. .alloc = reqAlloc
  83. }));
  84. Identity_set(req);
  85. req->length = msg->length;
  86. uv_buf_t buffers[] = {
  87. { .base = (char*)msg->bytes, .len = msg->length }
  88. };
  89. int ret = uv_udp_send(&req->uvReq, &context->uvHandle, buffers, 1,
  90. addr, (uv_udp_send_cb)&sendComplete);
  91. if (ret) {
  92. Log_info(context->logger, "DROP Failed writing to UDPBcastIface [%s]",
  93. uv_strerror(ret));
  94. Allocator_free(req->alloc);
  95. return;
  96. }
  97. context->queueLen += msg->length;
  98. }
  99. static Iface_DEFUN incomingFromIface(struct Message* m, struct Iface* iface)
  100. {
  101. struct UDPBcastIface_pvt* context = Identity_check((struct UDPBcastIface_pvt*) iface);
  102. Assert_true(m->length >= Sockaddr_OVERHEAD);
  103. if ((((struct Sockaddr*)m->bytes)->flags & Sockaddr_flags_BCAST) && !context->bcast) {
  104. Log_debug(context->logger, "Attempted bcast with bcast disabled");
  105. return NULL;
  106. }
  107. if (context->queueLen > UDPBcastIface_MAX_QUEUE) {
  108. Log_warn(context->logger, "DROP Maximum queue length reached");
  109. return NULL;
  110. }
  111. struct Sockaddr* sa = (struct Sockaddr*) m->bytes;
  112. struct Sockaddr_storage ss;
  113. Message_pop(m, &ss, sa->addrLen, NULL);
  114. struct UDPBcastIface_Header hdr = {
  115. .version = UDPBcastIface_CURRENT_VERSION,
  116. .zero = 0,
  117. .length_be = Endian_hostToBigEndian16(m->length + UDPBcastIface_Header_SIZE),
  118. .bcast = 0,
  119. .reversed = 0,
  120. .magic_be= Endian_hostToBigEndian16(0xfc00),
  121. };
  122. if (sa->flags & Sockaddr_flags_BCAST) {
  123. // We will send the message to bcast addr of all selected interfaces
  124. uv_interface_address_t* interfaces;
  125. int i, j, count;
  126. int res = uv_interface_addresses(&interfaces, &count);
  127. if (res) {
  128. Log_warn(context->logger, "DROP message for none interface available");
  129. return NULL;
  130. }
  131. struct Allocator* tmpAlloc = Allocator_child(context->allocator);
  132. int32_t bcastCount = List_size(context->devices);
  133. hdr.bcast = 1;
  134. Message_push(m, &hdr, UDPBcastIface_Header_SIZE, NULL);
  135. for (i = 0; i < count; i++) {
  136. if (interfaces[i].is_internal) { continue; }
  137. if (interfaces[i].address.address4.sin_family != AF_INET) { continue; }
  138. for (j = 0; j < bcastCount; j++) {
  139. String* device = List_getString(context->devices, j);
  140. if (!CString_strcmp(interfaces[i].name, device->bytes)) { break; }
  141. }
  142. if (j == bcastCount) { continue; }
  143. // calculate the broadcast address
  144. struct sockaddr_in bcast4 = {
  145. .sin_family = AF_INET,
  146. .sin_port = htons(Sockaddr_getPort(context->pub.generic.addr)),
  147. .sin_addr = { .s_addr =
  148. (interfaces[i].address.address4.sin_addr.s_addr &
  149. interfaces[i].netmask.netmask4.sin_addr.s_addr) |
  150. ~interfaces[i].netmask.netmask4.sin_addr.s_addr}
  151. };
  152. sendPacket(m, (struct sockaddr*)&bcast4, iface);
  153. }
  154. uv_free_interface_addresses(interfaces, count);
  155. Allocator_free(tmpAlloc);
  156. } else {
  157. Message_push(m, &hdr, UDPBcastIface_Header_SIZE, NULL);
  158. sendPacket(m, (struct sockaddr*)ss.nativeAddr, iface);
  159. }
  160. return NULL;
  161. }
  162. #if UDPBcastIface_PADDING_AMOUNT < 8
  163. #error
  164. #endif
  165. #define ALLOC(buff) (((struct Allocator**) &(buff[-(8 + (((uintptr_t)buff) % 8))]))[0])
  166. static void incoming(uv_udp_t* handle,
  167. ssize_t nread,
  168. const uv_buf_t* buf,
  169. const struct sockaddr* addr,
  170. unsigned flags)
  171. {
  172. struct UDPBcastIface_pvt* context = ifaceForHandle(handle);
  173. context->inCallback = 1;
  174. // Grab out the allocator which was placed there by allocate()
  175. struct Allocator* alloc = buf->base ? ALLOC(buf->base) : NULL;
  176. // if nread < 0, we used to log uv_last_error, which doesn't exist anymore.
  177. if (nread == 0) {
  178. // Happens constantly
  179. //Log_debug(context->logger, "0 length read");
  180. } else if (nread < UDPBcastIface_Header_SIZE) {
  181. Log_debug(context->logger, "Failed to receive udp bcast frame");
  182. } else {
  183. do {
  184. struct Message* m = Allocator_calloc(alloc, sizeof(struct Message), 1);
  185. m->length = nread;
  186. m->padding = UDPBcastIface_PADDING_AMOUNT + context->pub.generic.addr->addrLen;
  187. m->capacity = buf->len;
  188. m->bytes = (uint8_t*)buf->base;
  189. m->alloc = alloc;
  190. struct UDPBcastIface_Header hdr;
  191. Message_pop(m, &hdr, UDPBcastIface_Header_SIZE, NULL);
  192. if (hdr.version != UDPBcastIface_CURRENT_VERSION) {
  193. Log_debug(context->logger, "DROP unknown version");
  194. break;
  195. }
  196. uint16_t reportedLength = Endian_bigEndianToHost16(hdr.length_be);
  197. reportedLength -= UDPBcastIface_Header_SIZE;
  198. if (m->length != reportedLength) {
  199. if (m->length < reportedLength) {
  200. Log_debug(context->logger, "DROP size field is larger than frame");
  201. break;
  202. }
  203. m->length = reportedLength;
  204. }
  205. if (hdr.magic_be != Endian_hostToBigEndian16(0xfc00)) {
  206. Log_debug(context->logger, "DROP bad magic");
  207. break;
  208. }
  209. if (!context->bcast) {
  210. Log_debug(context->logger, "Drop packet with bcast disabled");
  211. break;
  212. }
  213. struct Sockaddr laddr;
  214. Bits_memcpy(&laddr, context->pub.generic.addr, Sockaddr_OVERHEAD);
  215. if (hdr.bcast) {
  216. laddr.flags |= Sockaddr_flags_BCAST;
  217. }
  218. Message_push(m, addr, context->pub.generic.addr->addrLen - Sockaddr_OVERHEAD, NULL);
  219. // make sure the sockaddr doesn't have crap in it which will
  220. // prevent it from being used as a lookup key
  221. Sockaddr_normalizeNative((struct sockaddr*) m->bytes);
  222. Message_push(m, &laddr, Sockaddr_OVERHEAD, NULL);
  223. Iface_send(&context->pub.generic.iface, m);
  224. } while (0);
  225. }
  226. if (alloc) {
  227. Allocator_free(alloc);
  228. }
  229. context->inCallback = 0;
  230. if (context->blockFreeInsideCallback) {
  231. Allocator_onFreeComplete((struct Allocator_OnFreeJob*) context->blockFreeInsideCallback);
  232. }
  233. }
  234. static void allocate(uv_handle_t* handle, size_t size, uv_buf_t* buf)
  235. {
  236. struct UDPBcastIface_pvt* context = ifaceForHandle((uv_udp_t*)handle);
  237. size = UDPBcastIface_BUFFER_CAP;
  238. size_t fullSize = size + UDPBcastIface_PADDING_AMOUNT + context->pub.generic.addr->addrLen;
  239. struct Allocator* child = Allocator_child(context->allocator);
  240. char* buff = Allocator_malloc(child, fullSize);
  241. buff += UDPBcastIface_PADDING_AMOUNT + context->pub.generic.addr->addrLen;
  242. ALLOC(buff) = child;
  243. buf->base = buff;
  244. buf->len = size;
  245. }
  246. static void onClosed(uv_handle_t* wasClosed)
  247. {
  248. struct UDPBcastIface_pvt* context =
  249. Identity_check((struct UDPBcastIface_pvt*) wasClosed->data);
  250. Allocator_onFreeComplete((struct Allocator_OnFreeJob*) context->closeHandleOnFree);
  251. }
  252. static int closeHandleOnFree(struct Allocator_OnFreeJob* job)
  253. {
  254. struct UDPBcastIface_pvt* context =
  255. Identity_check((struct UDPBcastIface_pvt*) job->userData);
  256. context->closeHandleOnFree = job;
  257. uv_close((uv_handle_t*)&context->uvHandle, onClosed);
  258. return Allocator_ONFREE_ASYNC;
  259. }
  260. static int blockFreeInsideCallback(struct Allocator_OnFreeJob* job)
  261. {
  262. struct UDPBcastIface_pvt* context =
  263. Identity_check((struct UDPBcastIface_pvt*) job->userData);
  264. if (!context->inCallback) {
  265. return 0;
  266. }
  267. context->blockFreeInsideCallback = job;
  268. return Allocator_ONFREE_ASYNC;
  269. }
  270. List* UDPBcastIface_listDevices(struct Allocator* alloc, struct Except* eh)
  271. {
  272. List* out = List_new(alloc);
  273. uv_interface_address_t* interfaces;
  274. int i, count;
  275. int res = uv_interface_addresses(&interfaces, &count);
  276. if (res) {
  277. return out;
  278. }
  279. for (i = 0; i < count; i++) {
  280. if (interfaces[i].is_internal) { continue; }
  281. if (interfaces[i].address.address4.sin_family != AF_INET) { continue; }
  282. List_addString(out, String_new(interfaces[i].name, alloc), alloc);
  283. }
  284. uv_free_interface_addresses(interfaces, count);
  285. return out;
  286. }
  287. int UDPBcastIface_setBroadcast(struct UDPBcastIface* iface, bool enable)
  288. {
  289. struct UDPBcastIface_pvt* context = Identity_check((struct UDPBcastIface_pvt*) iface);
  290. int res = uv_udp_set_broadcast(&context->uvHandle, enable ? 1 : 0);
  291. if (!res) {
  292. context->bcast = enable;
  293. }
  294. return res;
  295. }
  296. struct UDPBcastIface* UDPBcastIface_new(struct EventBase* eventBase,
  297. struct Sockaddr* addr,
  298. const List* devices,
  299. struct Allocator* alloc,
  300. struct Except* exHandler,
  301. struct Log* logger)
  302. {
  303. struct EventBase_pvt* base = EventBase_privatize(eventBase);
  304. struct UDPBcastIface_pvt* context =
  305. Allocator_clone(alloc, (&(struct UDPBcastIface_pvt) {
  306. .logger = logger,
  307. .bcast = true,
  308. .allocator = alloc
  309. }));
  310. context->pub.generic.alloc = alloc;
  311. context->pub.generic.iface.send = incomingFromIface;
  312. Identity_set(context);
  313. if (!addr) {
  314. Except_throw(exHandler, "Must assign the bcast address.");
  315. }
  316. Log_debug(logger, "Binding to address [%s]", Sockaddr_print(addr, alloc));
  317. if (!Sockaddr_getPort(addr)) {
  318. Except_throw(exHandler, "Must assign the bcast port.");
  319. }
  320. if (Sockaddr_getFamily(addr) != Sockaddr_AF_INET) {
  321. Except_throw(exHandler, "UDP broadcast only supported by ipv4.");
  322. }
  323. uv_udp_init(base->loop, &context->uvHandle);
  324. context->uvHandle.data = context;
  325. void* native = Sockaddr_asNative(addr);
  326. int ret = uv_udp_bind(&context->uvHandle, (const struct sockaddr*)native, 0);
  327. if (ret) {
  328. Except_throw(exHandler, "call to uv_udp_bind() failed [%s]",
  329. uv_strerror(ret));
  330. }
  331. ret = uv_udp_set_broadcast(&context->uvHandle, 1);
  332. if (ret) {
  333. Except_throw(exHandler, "call to uv_udp_set_broadcast() failed [%s]",
  334. uv_strerror(ret));
  335. }
  336. ret = uv_udp_recv_start(&context->uvHandle, allocate, incoming);
  337. if (ret) {
  338. const char* err = uv_strerror(ret);
  339. uv_close((uv_handle_t*) &context->uvHandle, NULL);
  340. Except_throw(exHandler, "uv_udp_recv_start() failed [%s]", err);
  341. }
  342. int nameLen = sizeof(struct Sockaddr_storage);
  343. struct Sockaddr_storage ss;
  344. Bits_memset(&ss, 0, sizeof(struct Sockaddr_storage));
  345. ret = uv_udp_getsockname(&context->uvHandle, (void*)ss.nativeAddr, &nameLen);
  346. if (ret) {
  347. const char* err = uv_strerror(ret);
  348. uv_close((uv_handle_t*) &context->uvHandle, NULL);
  349. Except_throw(exHandler, "uv_udp_getsockname() failed [%s]", err);
  350. }
  351. ss.addr.addrLen = nameLen + 8;
  352. context->pub.generic.addr = Sockaddr_clone(&ss.addr, alloc);
  353. Log_debug(logger, "Bound to address [%s]", Sockaddr_print(context->pub.generic.addr, alloc));
  354. context->devices = List_new(alloc);
  355. if (devices) {
  356. int32_t count = List_size(devices);
  357. for (int32_t i = 0; i < count; i++) {
  358. String* device = List_getString(devices, i);
  359. List_addStringC(context->devices, device->bytes, alloc);
  360. }
  361. }
  362. Allocator_onFree(alloc, closeHandleOnFree, context);
  363. Allocator_onFree(alloc, blockFreeInsideCallback, context);
  364. return &context->pub;
  365. }