plugin_transport_udp.c 111 KB


  1. /*
  2. This file is part of GNUnet
  3. Copyright (C) 2010-2015 Christian Grothoff (and other contributing authors)
  4. GNUnet is free software; you can redistribute it and/or modify
  5. it under the terms of the GNU General Public License as published
  6. by the Free Software Foundation; either version 3, or (at your
  7. option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. General Public License for more details.
  12. You should have received a copy of the GNU General Public License
  13. along with GNUnet; see the file COPYING. If not, write to the
  14. Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  15. Boston, MA 02111-1307, USA.
  16. */
  17. /**
  18. * @file transport/plugin_transport_udp.c
  19. * @brief Implementation of the UDP transport protocol
  20. * @author Christian Grothoff
  21. * @author Nathan Evans
  22. * @author Matthias Wachs
  23. */
  24. #include "platform.h"
  25. #include "plugin_transport_udp.h"
  26. #include "gnunet_hello_lib.h"
  27. #include "gnunet_util_lib.h"
  28. #include "gnunet_fragmentation_lib.h"
  29. #include "gnunet_nat_lib.h"
  30. #include "gnunet_protocols.h"
  31. #include "gnunet_resolver_service.h"
  32. #include "gnunet_signatures.h"
  33. #include "gnunet_constants.h"
  34. #include "gnunet_statistics_service.h"
  35. #include "gnunet_transport_service.h"
  36. #include "gnunet_transport_plugin.h"
  37. #include "transport.h"
  38. #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
  39. #define UDP_SESSION_TIME_OUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
  40. /**
  41. * Number of messages we can defragment in parallel. We only really
  42. * defragment 1 message at a time, but if messages get re-ordered, we
  43. * may want to keep knowledge about the previous message to avoid
  44. * discarding the current message in favor of a single fragment of a
  45. * previous message. 3 should be good since we don't expect massive
  46. * message reorderings with UDP.
  47. */
  48. #define UDP_MAX_MESSAGES_IN_DEFRAG 3
  49. /**
  50. * We keep a defragmentation queue per sender address. How many
  51. * sender addresses do we support at the same time? Memory consumption
  52. * is roughly a factor of 32k * UDP_MAX_MESSAGES_IN_DEFRAG times this
  53. * value. (So 128 corresponds to 12 MB and should suffice for
  54. * connecting to roughly 128 peers via UDP).
  55. */
  56. #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
  57. /**
  58. * Closure for #append_port().
  59. */
  60. struct PrettyPrinterContext
  61. {
  62. /**
  63. * DLL
  64. */
  65. struct PrettyPrinterContext *next;
  66. /**
  67. * DLL
  68. */
  69. struct PrettyPrinterContext *prev;
  70. /**
  71. * Our plugin.
  72. */
  73. struct Plugin *plugin;
  74. /**
  75. * Resolver handle
  76. */
  77. struct GNUNET_RESOLVER_RequestHandle *resolver_handle;
  78. /**
  79. * Function to call with the result.
  80. */
  81. GNUNET_TRANSPORT_AddressStringCallback asc;
  82. /**
  83. * Clsoure for @e asc.
  84. */
  85. void *asc_cls;
  86. /**
  87. * Timeout task
  88. */
  89. struct GNUNET_SCHEDULER_Task *timeout_task;
  90. /**
  91. * Is this an IPv6 address?
  92. */
  93. int ipv6;
  94. /**
  95. * Options
  96. */
  97. uint32_t options;
  98. /**
  99. * Port to add after the IP address.
  100. */
  101. uint16_t port;
  102. };
  103. /**
  104. * Session with another peer.
  105. */
  106. struct Session
  107. {
  108. /**
  109. * Which peer is this session for?
  110. */
  111. struct GNUNET_PeerIdentity target;
  112. /**
  113. * Plugin this session belongs to.
  114. */
  115. struct Plugin *plugin;
  116. /**
  117. * Context for dealing with fragments.
  118. */
  119. struct UDP_FragmentationContext *frag_ctx;
  120. /**
  121. * Desired delay for next sending we send to other peer
  122. */
  123. struct GNUNET_TIME_Relative flow_delay_for_other_peer;
  124. /**
  125. * Desired delay for next sending we received from other peer
  126. */
  127. struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
  128. /**
  129. * Session timeout task
  130. */
  131. struct GNUNET_SCHEDULER_Task *timeout_task;
  132. /**
  133. * When does this session time out?
  134. */
  135. struct GNUNET_TIME_Absolute timeout;
  136. /**
  137. * expected delay for ACKs
  138. */
  139. struct GNUNET_TIME_Relative last_expected_ack_delay;
  140. /**
  141. * desired delay between UDP messages
  142. */
  143. struct GNUNET_TIME_Relative last_expected_msg_delay;
  144. /**
  145. * Our own address.
  146. */
  147. struct GNUNET_HELLO_Address *address;
  148. /**
  149. * Number of bytes waiting for transmission to this peer.
  150. */
  151. unsigned long long bytes_in_queue;
  152. /**
  153. * Number of messages waiting for transmission to this peer.
  154. */
  155. unsigned int msgs_in_queue;
  156. /**
  157. * Reference counter to indicate that this session is
  158. * currently being used and must not be destroyed;
  159. * setting @e in_destroy will destroy it as soon as
  160. * possible.
  161. */
  162. unsigned int rc;
  163. /**
  164. * Network type of the address.
  165. */
  166. enum GNUNET_ATS_Network_Type scope;
  167. /**
  168. * Is this session about to be destroyed (sometimes we cannot
  169. * destroy a session immediately as below us on the stack
  170. * there might be code that still uses it; in this case,
  171. * @e rc is non-zero).
  172. */
  173. int in_destroy;
  174. };
  175. /**
  176. * Closure for #process_inbound_tokenized_messages().
  177. */
  178. struct SourceInformation
  179. {
  180. /**
  181. * Sender identity.
  182. */
  183. struct GNUNET_PeerIdentity sender;
  184. /**
  185. * Associated session.
  186. */
  187. struct Session *session;
  188. };
  189. /**
  190. * Closure for #find_receive_context().
  191. */
  192. struct FindReceiveContext
  193. {
  194. /**
  195. * Where to store the result.
  196. */
  197. struct DefragContext *rc;
  198. /**
  199. * Session associated with this context.
  200. */
  201. struct Session *session;
  202. /**
  203. * Address to find.
  204. */
  205. const union UdpAddress *udp_addr;
  206. /**
  207. * Number of bytes in @e udp_addr.
  208. */
  209. size_t udp_addr_len;
  210. };
  211. /**
  212. * Data structure to track defragmentation contexts based
  213. * on the source of the UDP traffic.
  214. */
  215. struct DefragContext
  216. {
  217. /**
  218. * Defragmentation context.
  219. */
  220. struct GNUNET_DEFRAGMENT_Context *defrag;
  221. /**
  222. * Reference to master plugin struct.
  223. */
  224. struct Plugin *plugin;
  225. /**
  226. * Node in the defrag heap.
  227. */
  228. struct GNUNET_CONTAINER_HeapNode *hnode;
  229. /**
  230. * Who's message(s) are we defragmenting here?
  231. * Only initialized once we succeeded and
  232. * @e have_sender is set.
  233. */
  234. struct GNUNET_PeerIdentity sender;
  235. /**
  236. * Source address this receive context is for (allocated at the
  237. * end of the struct).
  238. */
  239. const union UdpAddress *udp_addr;
  240. /**
  241. * Length of @e udp_addr.
  242. */
  243. size_t udp_addr_len;
  244. /**
  245. * Network type the address belongs to.
  246. */
  247. enum GNUNET_ATS_Network_Type network_type;
  248. /**
  249. * Has the @e sender field been initialized yet?
  250. */
  251. int have_sender;
  252. };
  253. /**
  254. * Context to send fragmented messages
  255. */
  256. struct UDP_FragmentationContext
  257. {
  258. /**
  259. * Next in linked list
  260. */
  261. struct UDP_FragmentationContext *next;
  262. /**
  263. * Previous in linked list
  264. */
  265. struct UDP_FragmentationContext *prev;
  266. /**
  267. * The plugin
  268. */
  269. struct Plugin *plugin;
  270. /**
  271. * Handle for GNUNET_FRAGMENT context
  272. */
  273. struct GNUNET_FRAGMENT_Context *frag;
  274. /**
  275. * The session this fragmentation context belongs to
  276. */
  277. struct Session *session;
  278. /**
  279. * Function to call upon completion of the transmission.
  280. */
  281. GNUNET_TRANSPORT_TransmitContinuation cont;
  282. /**
  283. * Closure for @e cont.
  284. */
  285. void *cont_cls;
  286. /**
  287. * Message timeout
  288. */
  289. struct GNUNET_TIME_Absolute timeout;
  290. /**
  291. * Payload size of original unfragmented message
  292. */
  293. size_t payload_size;
  294. /**
  295. * Bytes used to send all fragments on wire including UDP overhead
  296. */
  297. size_t on_wire_size;
  298. /**
  299. * FIXME.
  300. */
  301. unsigned int fragments_used;
  302. };
  303. /**
  304. * Message types included in a `struct UDP_MessageWrapper`
  305. */
  306. enum UDP_MessageType
  307. {
  308. /**
  309. * Uninitialized (error)
  310. */
  311. UMT_UNDEFINED = 0,
  312. /**
  313. * Fragment of a message.
  314. */
  315. UMT_MSG_FRAGMENTED = 1,
  316. /**
  317. *
  318. */
  319. UMT_MSG_FRAGMENTED_COMPLETE = 2,
  320. /**
  321. * Unfragmented message.
  322. */
  323. UMT_MSG_UNFRAGMENTED = 3,
  324. /**
  325. * Receipt confirmation.
  326. */
  327. UMT_MSG_ACK = 4
  328. };
  329. /**
  330. * Information we track for each message in the queue.
  331. */
  332. struct UDP_MessageWrapper
  333. {
  334. /**
  335. * Session this message belongs to
  336. */
  337. struct Session *session;
  338. /**
  339. * DLL of messages
  340. * previous element
  341. */
  342. struct UDP_MessageWrapper *prev;
  343. /**
  344. * DLL of messages
  345. * previous element
  346. */
  347. struct UDP_MessageWrapper *next;
  348. /**
  349. * Message with size msg_size including UDP specific overhead
  350. */
  351. char *msg_buf;
  352. /**
  353. * Function to call upon completion of the transmission.
  354. */
  355. GNUNET_TRANSPORT_TransmitContinuation cont;
  356. /**
  357. * Closure for @e cont.
  358. */
  359. void *cont_cls;
  360. /**
  361. * Fragmentation context
  362. * frag_ctx == NULL if transport <= MTU
  363. * frag_ctx != NULL if transport > MTU
  364. */
  365. struct UDP_FragmentationContext *frag_ctx;
  366. /**
  367. * Message timeout
  368. */
  369. struct GNUNET_TIME_Absolute timeout;
  370. /**
  371. * Size of UDP message to send including UDP specific overhead
  372. */
  373. size_t msg_size;
  374. /**
  375. * Payload size of original message
  376. */
  377. size_t payload_size;
  378. /**
  379. * Message type
  380. */
  381. enum UDP_MessageType msg_type;
  382. };
  383. /**
  384. * UDP ACK Message-Packet header.
  385. */
  386. struct UDP_ACK_Message
  387. {
  388. /**
  389. * Message header.
  390. */
  391. struct GNUNET_MessageHeader header;
  392. /**
  393. * Desired delay for flow control
  394. */
  395. uint32_t delay;
  396. /**
  397. * What is the identity of the sender
  398. */
  399. struct GNUNET_PeerIdentity sender;
  400. };
  401. /**
  402. * If a session monitor is attached, notify it about the new
  403. * session state.
  404. *
  405. * @param plugin our plugin
  406. * @param session session that changed state
  407. * @param state new state of the session
  408. */
  409. static void
  410. notify_session_monitor (struct Plugin *plugin,
  411. struct Session *session,
  412. enum GNUNET_TRANSPORT_SessionState state)
  413. {
  414. struct GNUNET_TRANSPORT_SessionInfo info;
  415. if (NULL == plugin->sic)
  416. return;
  417. memset (&info, 0, sizeof (info));
  418. info.state = state;
  419. info.is_inbound = GNUNET_SYSERR; /* hard to say */
  420. info.num_msg_pending = session->msgs_in_queue;
  421. info.num_bytes_pending = session->bytes_in_queue;
  422. /* info.receive_delay remains zero as this is not supported by UDP
  423. (cannot selectively not receive from 'some' peer while continuing
  424. to receive from others) */
  425. info.session_timeout = session->timeout;
  426. info.address = session->address;
  427. plugin->sic (plugin->sic_cls,
  428. session,
  429. &info);
  430. }
  431. /**
  432. * We have been notified that our readset has something to read. We don't
  433. * know which socket needs to be read, so we have to check each one
  434. * Then reschedule this function to be called again once more is available.
  435. *
  436. * @param cls the plugin handle
  437. * @param tc the scheduling context (for rescheduling this function again)
  438. */
  439. static void
  440. udp_plugin_select (void *cls,
  441. const struct GNUNET_SCHEDULER_TaskContext *tc);
  442. /**
  443. * We have been notified that our readset has something to read. We don't
  444. * know which socket needs to be read, so we have to check each one
  445. * Then reschedule this function to be called again once more is available.
  446. *
  447. * @param cls the plugin handle
  448. * @param tc the scheduling context (for rescheduling this function again)
  449. */
  450. static void
  451. udp_plugin_select_v6 (void *cls,
  452. const struct GNUNET_SCHEDULER_TaskContext *tc);
  453. /**
  454. * (re)schedule select tasks for this plugin.
  455. *
  456. * @param plugin plugin to reschedule
  457. */
  458. static void
  459. schedule_select (struct Plugin *plugin)
  460. {
  461. struct GNUNET_TIME_Relative min_delay;
  462. struct UDP_MessageWrapper *udpw;
  463. if ((GNUNET_YES == plugin->enable_ipv4) && (NULL != plugin->sockv4))
  464. {
  465. /* Find a message ready to send:
  466. * Flow delay from other peer is expired or not set (0) */
  467. min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
  468. for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
  469. min_delay = GNUNET_TIME_relative_min (min_delay,
  470. GNUNET_TIME_absolute_get_remaining (
  471. udpw->session->flow_delay_from_other_peer));
  472. if (plugin->select_task != NULL )
  473. GNUNET_SCHEDULER_cancel (plugin->select_task);
  474. /* Schedule with:
  475. * - write active set if message is ready
  476. * - timeout minimum delay */
  477. plugin->select_task = GNUNET_SCHEDULER_add_select (
  478. GNUNET_SCHEDULER_PRIORITY_DEFAULT,
  479. (0 == min_delay.rel_value_us) ?
  480. GNUNET_TIME_UNIT_FOREVER_REL : min_delay, plugin->rs_v4,
  481. (0 == min_delay.rel_value_us) ? plugin->ws_v4 : NULL,
  482. &udp_plugin_select, plugin);
  483. }
  484. if ((GNUNET_YES == plugin->enable_ipv6) && (NULL != plugin->sockv6))
  485. {
  486. min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
  487. for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
  488. min_delay = GNUNET_TIME_relative_min (min_delay,
  489. GNUNET_TIME_absolute_get_remaining (
  490. udpw->session->flow_delay_from_other_peer));
  491. if (NULL != plugin->select_task_v6)
  492. GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
  493. plugin->select_task_v6 = GNUNET_SCHEDULER_add_select (
  494. GNUNET_SCHEDULER_PRIORITY_DEFAULT,
  495. (0 == min_delay.rel_value_us) ?
  496. GNUNET_TIME_UNIT_FOREVER_REL : min_delay, plugin->rs_v6,
  497. (0 == min_delay.rel_value_us) ? plugin->ws_v6 : NULL,
  498. &udp_plugin_select_v6, plugin);
  499. }
  500. }
  501. /**
  502. * Function called for a quick conversion of the binary address to
  503. * a numeric address. Note that the caller must not free the
  504. * address and that the next call to this function is allowed
  505. * to override the address again.
  506. *
  507. * @param cls closure
  508. * @param addr binary address (a `union UdpAddress`)
  509. * @param addrlen length of the @a addr
  510. * @return string representing the same address
  511. */
  512. const char *
  513. udp_address_to_string (void *cls,
  514. const void *addr,
  515. size_t addrlen)
  516. {
  517. static char rbuf[INET6_ADDRSTRLEN + 10];
  518. char buf[INET6_ADDRSTRLEN];
  519. const void *sb;
  520. struct in_addr a4;
  521. struct in6_addr a6;
  522. const struct IPv4UdpAddress *t4;
  523. const struct IPv6UdpAddress *t6;
  524. int af;
  525. uint16_t port;
  526. uint32_t options;
  527. if ((NULL != addr) && (addrlen == sizeof(struct IPv6UdpAddress)))
  528. {
  529. t6 = addr;
  530. af = AF_INET6;
  531. options = ntohl (t6->options);
  532. port = ntohs (t6->u6_port);
  533. memcpy (&a6, &t6->ipv6_addr, sizeof(a6));
  534. sb = &a6;
  535. }
  536. else if ((NULL != addr) && (addrlen == sizeof(struct IPv4UdpAddress)))
  537. {
  538. t4 = addr;
  539. af = AF_INET;
  540. options = ntohl (t4->options);
  541. port = ntohs (t4->u4_port);
  542. memcpy (&a4, &t4->ipv4_addr, sizeof(a4));
  543. sb = &a4;
  544. }
  545. else
  546. {
  547. return NULL;
  548. }
  549. inet_ntop (af, sb, buf, INET6_ADDRSTRLEN);
  550. GNUNET_snprintf (rbuf, sizeof(rbuf),
  551. (af == AF_INET6)
  552. ? "%s.%u.[%s]:%u"
  553. : "%s.%u.%s:%u",
  554. PLUGIN_NAME,
  555. options,
  556. buf,
  557. port);
  558. return rbuf;
  559. }
  560. /**
  561. * Function called to convert a string address to
  562. * a binary address.
  563. *
  564. * @param cls closure (`struct Plugin *`)
  565. * @param addr string address
  566. * @param addrlen length of the address
  567. * @param buf location to store the buffer
  568. * @param added location to store the number of bytes in the buffer.
  569. * If the function returns #GNUNET_SYSERR, its contents are undefined.
  570. * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
  571. */
  572. static int
  573. udp_string_to_address (void *cls,
  574. const char *addr,
  575. uint16_t addrlen,
  576. void **buf,
  577. size_t *added)
  578. {
  579. struct sockaddr_storage socket_address;
  580. char *address;
  581. char *plugin;
  582. char *optionstr;
  583. uint32_t options;
  584. /* Format tcp.options.address:port */
  585. address = NULL;
  586. plugin = NULL;
  587. optionstr = NULL;
  588. if ((NULL == addr) || (addrlen == 0))
  589. {
  590. GNUNET_break(0);
  591. return GNUNET_SYSERR;
  592. }
  593. if ('\0' != addr[addrlen - 1])
  594. {
  595. GNUNET_break(0);
  596. return GNUNET_SYSERR;
  597. }
  598. if (strlen (addr) != addrlen - 1)
  599. {
  600. GNUNET_break(0);
  601. return GNUNET_SYSERR;
  602. }
  603. plugin = GNUNET_strdup (addr);
  604. optionstr = strchr (plugin, '.');
  605. if (NULL == optionstr)
  606. {
  607. GNUNET_break(0);
  608. GNUNET_free(plugin);
  609. return GNUNET_SYSERR;
  610. }
  611. optionstr[0] = '\0';
  612. optionstr++;
  613. options = atol (optionstr);
  614. address = strchr (optionstr, '.');
  615. if (NULL == address)
  616. {
  617. GNUNET_break(0);
  618. GNUNET_free(plugin);
  619. return GNUNET_SYSERR;
  620. }
  621. address[0] = '\0';
  622. address++;
  623. if (GNUNET_OK !=
  624. GNUNET_STRINGS_to_address_ip (address, strlen (address),
  625. &socket_address))
  626. {
  627. GNUNET_break(0);
  628. GNUNET_free(plugin);
  629. return GNUNET_SYSERR;
  630. }
  631. GNUNET_free(plugin);
  632. switch (socket_address.ss_family)
  633. {
  634. case AF_INET:
  635. {
  636. struct IPv4UdpAddress *u4;
  637. struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address;
  638. u4 = GNUNET_new (struct IPv4UdpAddress);
  639. u4->options = htonl (options);
  640. u4->ipv4_addr = in4->sin_addr.s_addr;
  641. u4->u4_port = in4->sin_port;
  642. *buf = u4;
  643. *added = sizeof(struct IPv4UdpAddress);
  644. return GNUNET_OK;
  645. }
  646. case AF_INET6:
  647. {
  648. struct IPv6UdpAddress *u6;
  649. struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address;
  650. u6 = GNUNET_new (struct IPv6UdpAddress);
  651. u6->options = htonl (options);
  652. u6->ipv6_addr = in6->sin6_addr;
  653. u6->u6_port = in6->sin6_port;
  654. *buf = u6;
  655. *added = sizeof(struct IPv6UdpAddress);
  656. return GNUNET_OK;
  657. }
  658. default:
  659. GNUNET_break(0);
  660. return GNUNET_SYSERR;
  661. }
  662. }
  663. /**
  664. * Append our port and forward the result.
  665. *
  666. * @param cls a `struct PrettyPrinterContext *`
  667. * @param hostname result from DNS resolver
  668. */
  669. static void
  670. append_port (void *cls,
  671. const char *hostname)
  672. {
  673. struct PrettyPrinterContext *ppc = cls;
  674. struct Plugin *plugin = ppc->plugin;
  675. char *ret;
  676. if (NULL == hostname)
  677. {
  678. /* Final call, done */
  679. ppc->asc (ppc->asc_cls,
  680. NULL,
  681. GNUNET_OK);
  682. GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
  683. plugin->ppc_dll_tail,
  684. ppc);
  685. ppc->resolver_handle = NULL;
  686. GNUNET_free (ppc);
  687. return;
  688. }
  689. if (GNUNET_YES == ppc->ipv6)
  690. GNUNET_asprintf (&ret,
  691. "%s.%u.[%s]:%d",
  692. PLUGIN_NAME,
  693. ppc->options,
  694. hostname,
  695. ppc->port);
  696. else
  697. GNUNET_asprintf (&ret,
  698. "%s.%u.%s:%d",
  699. PLUGIN_NAME,
  700. ppc->options,
  701. hostname,
  702. ppc->port);
  703. ppc->asc (ppc->asc_cls,
  704. ret,
  705. GNUNET_OK);
  706. GNUNET_free (ret);
  707. }
  708. /**
  709. * Convert the transports address to a nice, human-readable
  710. * format.
  711. *
  712. * @param cls closure with the `struct Plugin *`
  713. * @param type name of the transport that generated the address
  714. * @param addr one of the addresses of the host, NULL for the last address
  715. * the specific address format depends on the transport;
  716. * a `union UdpAddress`
  717. * @param addrlen length of the address
  718. * @param numeric should (IP) addresses be displayed in numeric form?
  719. * @param timeout after how long should we give up?
  720. * @param asc function to call on each string
  721. * @param asc_cls closure for @a asc
  722. */
  723. static void
  724. udp_plugin_address_pretty_printer (void *cls,
  725. const char *type,
  726. const void *addr,
  727. size_t addrlen,
  728. int numeric,
  729. struct GNUNET_TIME_Relative timeout,
  730. GNUNET_TRANSPORT_AddressStringCallback asc,
  731. void *asc_cls)
  732. {
  733. struct Plugin *plugin = cls;
  734. struct PrettyPrinterContext *ppc;
  735. const void *sb;
  736. size_t sbs;
  737. struct sockaddr_in a4;
  738. struct sockaddr_in6 a6;
  739. const struct IPv4UdpAddress *u4;
  740. const struct IPv6UdpAddress *u6;
  741. uint16_t port;
  742. uint32_t options;
  743. if (addrlen == sizeof(struct IPv6UdpAddress))
  744. {
  745. u6 = addr;
  746. memset (&a6, 0, sizeof(a6));
  747. a6.sin6_family = AF_INET6;
  748. #if HAVE_SOCKADDR_IN_SIN_LEN
  749. a6.sin6_len = sizeof (a6);
  750. #endif
  751. a6.sin6_port = u6->u6_port;
  752. memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr));
  753. port = ntohs (u6->u6_port);
  754. options = ntohl (u6->options);
  755. sb = &a6;
  756. sbs = sizeof(a6);
  757. }
  758. else if (addrlen == sizeof(struct IPv4UdpAddress))
  759. {
  760. u4 = addr;
  761. memset (&a4, 0, sizeof(a4));
  762. a4.sin_family = AF_INET;
  763. #if HAVE_SOCKADDR_IN_SIN_LEN
  764. a4.sin_len = sizeof (a4);
  765. #endif
  766. a4.sin_port = u4->u4_port;
  767. a4.sin_addr.s_addr = u4->ipv4_addr;
  768. port = ntohs (u4->u4_port);
  769. options = ntohl (u4->options);
  770. sb = &a4;
  771. sbs = sizeof(a4);
  772. }
  773. else
  774. {
  775. /* invalid address */
  776. GNUNET_break_op (0);
  777. asc (asc_cls, NULL , GNUNET_SYSERR);
  778. asc (asc_cls, NULL, GNUNET_OK);
  779. return;
  780. }
  781. ppc = GNUNET_new (struct PrettyPrinterContext);
  782. ppc->plugin = plugin;
  783. ppc->asc = asc;
  784. ppc->asc_cls = asc_cls;
  785. ppc->port = port;
  786. ppc->options = options;
  787. if (addrlen == sizeof(struct IPv6UdpAddress))
  788. ppc->ipv6 = GNUNET_YES;
  789. else
  790. ppc->ipv6 = GNUNET_NO;
  791. GNUNET_CONTAINER_DLL_insert (plugin->ppc_dll_head,
  792. plugin->ppc_dll_tail,
  793. ppc);
  794. ppc->resolver_handle
  795. = GNUNET_RESOLVER_hostname_get (sb,
  796. sbs,
  797. ! numeric,
  798. timeout,
  799. &append_port, ppc);
  800. }
  801. /**
  802. * FIXME.
  803. */
  804. static void
  805. call_continuation (struct UDP_MessageWrapper *udpw,
  806. int result)
  807. {
  808. struct Session *session = udpw->session;
  809. struct Plugin *plugin = session->plugin;
  810. size_t overhead;
  811. LOG (GNUNET_ERROR_TYPE_DEBUG,
  812. "Calling continuation for %u byte message to `%s' with result %s\n",
  813. udpw->payload_size, GNUNET_i2s (&udpw->session->target),
  814. (GNUNET_OK == result) ? "OK" : "SYSERR");
  815. if (udpw->msg_size >= udpw->payload_size)
  816. overhead = udpw->msg_size - udpw->payload_size;
  817. else
  818. overhead = udpw->msg_size;
  819. switch (result)
  820. {
  821. case GNUNET_OK:
  822. switch (udpw->msg_type)
  823. {
  824. case UMT_MSG_UNFRAGMENTED:
  825. if (NULL != udpw->cont)
  826. {
  827. /* Transport continuation */
  828. udpw->cont (udpw->cont_cls, &udpw->session->target, result,
  829. udpw->payload_size, udpw->msg_size);
  830. }
  831. GNUNET_STATISTICS_update (plugin->env->stats,
  832. "# UDP, unfragmented msgs, messages, sent, success", 1, GNUNET_NO);
  833. GNUNET_STATISTICS_update (plugin->env->stats,
  834. "# UDP, unfragmented msgs, bytes payload, sent, success",
  835. udpw->payload_size, GNUNET_NO);
  836. GNUNET_STATISTICS_update (plugin->env->stats,
  837. "# UDP, unfragmented msgs, bytes overhead, sent, success", overhead,
  838. GNUNET_NO);
  839. GNUNET_STATISTICS_update (plugin->env->stats,
  840. "# UDP, total, bytes overhead, sent", overhead, GNUNET_NO);
  841. GNUNET_STATISTICS_update (plugin->env->stats,
  842. "# UDP, total, bytes payload, sent", udpw->payload_size, GNUNET_NO);
  843. break;
  844. case UMT_MSG_FRAGMENTED_COMPLETE:
  845. GNUNET_assert(NULL != udpw->frag_ctx);
  846. if (udpw->frag_ctx->cont != NULL )
  847. udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls, &udpw->session->target,
  848. GNUNET_OK, udpw->frag_ctx->payload_size,
  849. udpw->frag_ctx->on_wire_size);
  850. GNUNET_STATISTICS_update (plugin->env->stats,
  851. "# UDP, fragmented msgs, messages, sent, success", 1, GNUNET_NO);
  852. GNUNET_STATISTICS_update (plugin->env->stats,
  853. "# UDP, fragmented msgs, bytes payload, sent, success",
  854. udpw->payload_size, GNUNET_NO);
  855. GNUNET_STATISTICS_update (plugin->env->stats,
  856. "# UDP, fragmented msgs, bytes overhead, sent, success", overhead,
  857. GNUNET_NO);
  858. GNUNET_STATISTICS_update (plugin->env->stats,
  859. "# UDP, total, bytes overhead, sent", overhead, GNUNET_NO);
  860. GNUNET_STATISTICS_update (plugin->env->stats,
  861. "# UDP, total, bytes payload, sent", udpw->payload_size, GNUNET_NO);
  862. GNUNET_STATISTICS_update (plugin->env->stats,
  863. "# UDP, fragmented msgs, messages, pending", -1, GNUNET_NO);
  864. break;
  865. case UMT_MSG_FRAGMENTED:
  866. /* Fragmented message: enqueue next fragment */
  867. if (NULL != udpw->cont)
  868. udpw->cont (udpw->cont_cls, &udpw->session->target, result,
  869. udpw->payload_size, udpw->msg_size);
  870. GNUNET_STATISTICS_update (plugin->env->stats,
  871. "# UDP, fragmented msgs, fragments, sent, success", 1, GNUNET_NO);
  872. GNUNET_STATISTICS_update (plugin->env->stats,
  873. "# UDP, fragmented msgs, fragments bytes, sent, success",
  874. udpw->msg_size, GNUNET_NO);
  875. break;
  876. case UMT_MSG_ACK:
  877. /* No continuation */
  878. GNUNET_STATISTICS_update (plugin->env->stats,
  879. "# UDP, ACK msgs, messages, sent, success", 1, GNUNET_NO);
  880. GNUNET_STATISTICS_update (plugin->env->stats,
  881. "# UDP, ACK msgs, bytes overhead, sent, success", overhead,
  882. GNUNET_NO);
  883. GNUNET_STATISTICS_update (plugin->env->stats,
  884. "# UDP, total, bytes overhead, sent", overhead, GNUNET_NO);
  885. break;
  886. default:
  887. GNUNET_break(0);
  888. break;
  889. }
  890. break;
  891. case GNUNET_SYSERR:
  892. switch (udpw->msg_type)
  893. {
  894. case UMT_MSG_UNFRAGMENTED:
  895. /* Unfragmented message: failed to send */
  896. if (NULL != udpw->cont)
  897. udpw->cont (udpw->cont_cls, &udpw->session->target, result,
  898. udpw->payload_size, overhead);
  899. GNUNET_STATISTICS_update (plugin->env->stats,
  900. "# UDP, unfragmented msgs, messages, sent, failure", 1, GNUNET_NO);
  901. GNUNET_STATISTICS_update (plugin->env->stats,
  902. "# UDP, unfragmented msgs, bytes payload, sent, failure",
  903. udpw->payload_size, GNUNET_NO);
  904. GNUNET_STATISTICS_update (plugin->env->stats,
  905. "# UDP, unfragmented msgs, bytes overhead, sent, failure", overhead,
  906. GNUNET_NO);
  907. break;
  908. case UMT_MSG_FRAGMENTED_COMPLETE:
  909. GNUNET_assert(NULL != udpw->frag_ctx);
  910. if (udpw->frag_ctx->cont != NULL )
  911. udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls, &udpw->session->target,
  912. GNUNET_SYSERR, udpw->frag_ctx->payload_size,
  913. udpw->frag_ctx->on_wire_size);
  914. GNUNET_STATISTICS_update (plugin->env->stats,
  915. "# UDP, fragmented msgs, messages, sent, failure", 1, GNUNET_NO);
  916. GNUNET_STATISTICS_update (plugin->env->stats,
  917. "# UDP, fragmented msgs, bytes payload, sent, failure",
  918. udpw->payload_size, GNUNET_NO);
  919. GNUNET_STATISTICS_update (plugin->env->stats,
  920. "# UDP, fragmented msgs, bytes payload, sent, failure", overhead,
  921. GNUNET_NO);
  922. GNUNET_STATISTICS_update (plugin->env->stats,
  923. "# UDP, fragmented msgs, bytes payload, sent, failure", overhead,
  924. GNUNET_NO);
  925. GNUNET_STATISTICS_update (plugin->env->stats,
  926. "# UDP, fragmented msgs, messages, pending", -1, GNUNET_NO);
  927. break;
  928. case UMT_MSG_FRAGMENTED:
  929. GNUNET_assert(NULL != udpw->frag_ctx);
  930. /* Fragmented message: failed to send */
  931. GNUNET_STATISTICS_update (plugin->env->stats,
  932. "# UDP, fragmented msgs, fragments, sent, failure", 1, GNUNET_NO);
  933. GNUNET_STATISTICS_update (plugin->env->stats,
  934. "# UDP, fragmented msgs, fragments bytes, sent, failure",
  935. udpw->msg_size, GNUNET_NO);
  936. break;
  937. case UMT_MSG_ACK:
  938. /* ACK message: failed to send */
  939. GNUNET_STATISTICS_update (plugin->env->stats,
  940. "# UDP, ACK msgs, messages, sent, failure", 1, GNUNET_NO);
  941. break;
  942. default:
  943. GNUNET_break(0);
  944. break;
  945. }
  946. break;
  947. default:
  948. GNUNET_break(0);
  949. break;
  950. }
  951. }
  952. /**
  953. * Check if the given port is plausible (must be either our listen
  954. * port or our advertised port). If it is neither, we return
  955. * #GNUNET_SYSERR.
  956. *
  957. * @param plugin global variables
  958. * @param in_port port number to check
  959. * @return #GNUNET_OK if port is either open_port or adv_port
  960. */
  961. static int
  962. check_port (struct Plugin *plugin,
  963. uint16_t in_port)
  964. {
  965. if ((in_port == plugin->port) || (in_port == plugin->aport))
  966. return GNUNET_OK;
  967. return GNUNET_SYSERR;
  968. }
  969. /**
  970. * Function that will be called to check if a binary address for this
  971. * plugin is well-formed and corresponds to an address for THIS peer
  972. * (as per our configuration). Naturally, if absolutely necessary,
  973. * plugins can be a bit conservative in their answer, but in general
  974. * plugins should make sure that the address does not redirect
  975. * traffic to a 3rd party that might try to man-in-the-middle our
  976. * traffic.
  977. *
  978. * @param cls closure, should be our handle to the Plugin
  979. * @param addr pointer to a `union UdpAddress`
  980. * @param addrlen length of @a addr
  981. * @return #GNUNET_OK if this is a plausible address for this peer
  982. * and transport, #GNUNET_SYSERR if not
  983. */
  984. static int
  985. udp_plugin_check_address (void *cls,
  986. const void *addr,
  987. size_t addrlen)
  988. {
  989. struct Plugin *plugin = cls;
  990. struct IPv4UdpAddress *v4;
  991. struct IPv6UdpAddress *v6;
  992. if ( (addrlen != sizeof(struct IPv4UdpAddress)) &&
  993. (addrlen != sizeof(struct IPv6UdpAddress)) )
  994. {
  995. GNUNET_break_op(0);
  996. return GNUNET_SYSERR;
  997. }
  998. if (addrlen == sizeof(struct IPv4UdpAddress))
  999. {
  1000. v4 = (struct IPv4UdpAddress *) addr;
  1001. if (GNUNET_OK != check_port (plugin, ntohs (v4->u4_port)))
  1002. return GNUNET_SYSERR;
  1003. if (GNUNET_OK !=
  1004. GNUNET_NAT_test_address (plugin->nat,
  1005. &v4->ipv4_addr,
  1006. sizeof (struct in_addr)))
  1007. return GNUNET_SYSERR;
  1008. }
  1009. else
  1010. {
  1011. v6 = (struct IPv6UdpAddress *) addr;
  1012. if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
  1013. {
  1014. GNUNET_break_op(0);
  1015. return GNUNET_SYSERR;
  1016. }
  1017. if (GNUNET_OK != check_port (plugin, ntohs (v6->u6_port)))
  1018. return GNUNET_SYSERR;
  1019. if (GNUNET_OK !=
  1020. GNUNET_NAT_test_address (plugin->nat,
  1021. &v6->ipv6_addr,
  1022. sizeof(struct in6_addr)))
  1023. return GNUNET_SYSERR;
  1024. }
  1025. return GNUNET_OK;
  1026. }
  1027. /**
  1028. * Function to free last resources associated with a session.
  1029. *
  1030. * @param s session to free
  1031. */
  1032. static void
  1033. free_session (struct Session *s)
  1034. {
  1035. if (NULL != s->frag_ctx)
  1036. {
  1037. GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag, NULL, NULL );
  1038. GNUNET_free(s->frag_ctx);
  1039. s->frag_ctx = NULL;
  1040. }
  1041. GNUNET_free(s);
  1042. }
  1043. /**
  1044. * Remove a message from the transmission queue.
  1045. *
  1046. * @param plugin the UDP plugin
  1047. * @param udpw message wrapper to queue
  1048. */
  1049. static void
  1050. dequeue (struct Plugin *plugin,
  1051. struct UDP_MessageWrapper *udpw)
  1052. {
  1053. struct Session *session = udpw->session;
  1054. if (plugin->bytes_in_buffer < udpw->msg_size)
  1055. {
  1056. GNUNET_break (0);
  1057. }
  1058. else
  1059. {
  1060. GNUNET_STATISTICS_update (plugin->env->stats,
  1061. "# UDP, total, bytes in buffers",
  1062. -(long long) udpw->msg_size,
  1063. GNUNET_NO);
  1064. plugin->bytes_in_buffer -= udpw->msg_size;
  1065. }
  1066. GNUNET_STATISTICS_update (plugin->env->stats,
  1067. "# UDP, total, msgs in buffers",
  1068. -1, GNUNET_NO);
  1069. if (udpw->session->address->address_length == sizeof(struct IPv4UdpAddress))
  1070. GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head,
  1071. plugin->ipv4_queue_tail,
  1072. udpw);
  1073. else if (udpw->session->address->address_length == sizeof(struct IPv6UdpAddress))
  1074. GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head,
  1075. plugin->ipv6_queue_tail,
  1076. udpw);
  1077. else
  1078. {
  1079. GNUNET_break (0);
  1080. return;
  1081. }
  1082. GNUNET_assert (session->msgs_in_queue > 0);
  1083. session->msgs_in_queue--;
  1084. GNUNET_assert (session->bytes_in_queue >= udpw->msg_size);
  1085. session->bytes_in_queue -= udpw->msg_size;
  1086. }
  1087. /**
  1088. * FIXME.
  1089. */
  1090. static void
  1091. fragmented_message_done (struct UDP_FragmentationContext *fc,
  1092. int result)
  1093. {
  1094. struct Plugin *plugin = fc->plugin;
  1095. struct Session *s = fc->session;
  1096. struct UDP_MessageWrapper *udpw;
  1097. struct UDP_MessageWrapper *tmp;
  1098. struct UDP_MessageWrapper dummy;
  1099. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1100. "%p : Fragmented message removed with result %s\n",
  1101. fc,
  1102. (result == GNUNET_SYSERR) ? "FAIL" : "SUCCESS");
  1103. /* Call continuation for fragmented message */
  1104. memset (&dummy, 0, sizeof(dummy));
  1105. dummy.msg_type = UMT_MSG_FRAGMENTED_COMPLETE;
  1106. dummy.msg_size = s->frag_ctx->on_wire_size;
  1107. dummy.payload_size = s->frag_ctx->payload_size;
  1108. dummy.frag_ctx = s->frag_ctx;
  1109. dummy.cont = NULL;
  1110. dummy.cont_cls = NULL;
  1111. dummy.session = s;
  1112. call_continuation (&dummy, result);
  1113. /* Remove leftover fragments from queue */
  1114. if (s->address->address_length == sizeof(struct IPv6UdpAddress))
  1115. {
  1116. udpw = plugin->ipv6_queue_head;
  1117. while (NULL != udpw)
  1118. {
  1119. tmp = udpw->next;
  1120. if ((udpw->frag_ctx != NULL )&& (udpw->frag_ctx == s->frag_ctx)){
  1121. dequeue (plugin, udpw);
  1122. call_continuation (udpw, GNUNET_SYSERR);
  1123. GNUNET_free (udpw);
  1124. }
  1125. udpw = tmp;
  1126. }
  1127. }
  1128. if (s->address->address_length == sizeof(struct IPv4UdpAddress))
  1129. {
  1130. udpw = plugin->ipv4_queue_head;
  1131. while (udpw != NULL )
  1132. {
  1133. tmp = udpw->next;
  1134. if ((NULL != udpw->frag_ctx) && (udpw->frag_ctx == s->frag_ctx))
  1135. {
  1136. dequeue (plugin, udpw);
  1137. call_continuation (udpw, GNUNET_SYSERR);
  1138. GNUNET_free(udpw);
  1139. }
  1140. udpw = tmp;
  1141. }
  1142. }
  1143. notify_session_monitor (s->plugin,
  1144. s,
  1145. GNUNET_TRANSPORT_SS_UPDATE);
  1146. /* Destroy fragmentation context */
  1147. GNUNET_FRAGMENT_context_destroy (fc->frag,
  1148. &s->last_expected_msg_delay,
  1149. &s->last_expected_ack_delay);
  1150. s->frag_ctx = NULL;
  1151. GNUNET_free (fc);
  1152. }
  1153. /**
  1154. * Scan the heap for a receive context with the given address.
  1155. *
  1156. * @param cls the `struct FindReceiveContext`
  1157. * @param node internal node of the heap
  1158. * @param element value stored at the node (a `struct ReceiveContext`)
  1159. * @param cost cost associated with the node
  1160. * @return #GNUNET_YES if we should continue to iterate,
  1161. * #GNUNET_NO if not.
  1162. */
  1163. static int
  1164. find_receive_context (void *cls,
  1165. struct GNUNET_CONTAINER_HeapNode *node,
  1166. void *element,
  1167. GNUNET_CONTAINER_HeapCostType cost)
  1168. {
  1169. struct FindReceiveContext *frc = cls;
  1170. struct DefragContext *e = element;
  1171. if ( (frc->udp_addr_len == e->udp_addr_len) &&
  1172. (0 == memcmp (frc->udp_addr,
  1173. e->udp_addr,
  1174. frc->udp_addr_len)) )
  1175. {
  1176. frc->rc = e;
  1177. return GNUNET_NO;
  1178. }
  1179. return GNUNET_YES;
  1180. }
  1181. /**
  1182. * Functions with this signature are called whenever we need
  1183. * to close a session due to a disconnect or failure to
  1184. * establish a connection.
  1185. *
  1186. * @param cls closure with the `struct Plugin`
  1187. * @param s session to close down
  1188. * @return #GNUNET_OK on success
  1189. */
  1190. static int
  1191. udp_disconnect_session (void *cls,
  1192. struct Session *s)
  1193. {
  1194. struct Plugin *plugin = cls;
  1195. struct UDP_MessageWrapper *udpw;
  1196. struct UDP_MessageWrapper *next;
  1197. struct FindReceiveContext frc;
  1198. GNUNET_assert (GNUNET_YES != s->in_destroy);
  1199. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1200. "Session %p to peer `%s' address ended\n", s,
  1201. GNUNET_i2s (&s->target),
  1202. udp_address_to_string (plugin,
  1203. s->address->address,
  1204. s->address->address_length));
  1205. /* stop timeout task */
  1206. if (NULL != s->timeout_task)
  1207. {
  1208. GNUNET_SCHEDULER_cancel (s->timeout_task);
  1209. s->timeout_task = NULL;
  1210. }
  1211. if (NULL != s->frag_ctx)
  1212. {
  1213. /* Remove fragmented message due to disconnect */
  1214. fragmented_message_done (s->frag_ctx,
  1215. GNUNET_SYSERR);
  1216. }
  1217. frc.rc = NULL;
  1218. frc.udp_addr = s->address->address;
  1219. frc.udp_addr_len = s->address->address_length;
  1220. /* Lookup existing receive context for this address */
  1221. if (NULL != plugin->defrag_ctxs)
  1222. {
  1223. GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
  1224. &find_receive_context,
  1225. &frc);
  1226. if (NULL != frc.rc)
  1227. {
  1228. struct DefragContext *d_ctx = frc.rc;
  1229. GNUNET_CONTAINER_heap_remove_node (d_ctx->hnode);
  1230. GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
  1231. GNUNET_free (d_ctx);
  1232. }
  1233. }
  1234. next = plugin->ipv4_queue_head;
  1235. while (NULL != (udpw = next))
  1236. {
  1237. next = udpw->next;
  1238. if (udpw->session == s)
  1239. {
  1240. dequeue (plugin, udpw);
  1241. call_continuation (udpw, GNUNET_SYSERR);
  1242. GNUNET_free(udpw);
  1243. }
  1244. }
  1245. next = plugin->ipv6_queue_head;
  1246. while (NULL != (udpw = next))
  1247. {
  1248. next = udpw->next;
  1249. if (udpw->session == s)
  1250. {
  1251. dequeue (plugin, udpw);
  1252. call_continuation (udpw, GNUNET_SYSERR);
  1253. GNUNET_free(udpw);
  1254. }
  1255. }
  1256. notify_session_monitor (s->plugin,
  1257. s,
  1258. GNUNET_TRANSPORT_SS_DONE);
  1259. plugin->env->session_end (plugin->env->cls,
  1260. s->address,
  1261. s);
  1262. if (NULL != s->frag_ctx)
  1263. {
  1264. if (NULL != s->frag_ctx->cont)
  1265. {
  1266. s->frag_ctx->cont (s->frag_ctx->cont_cls,
  1267. &s->target,
  1268. GNUNET_SYSERR,
  1269. s->frag_ctx->payload_size,
  1270. s->frag_ctx->on_wire_size);
  1271. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1272. "Calling continuation for fragemented message to `%s' with result SYSERR\n",
  1273. GNUNET_i2s (&s->target));
  1274. }
  1275. }
  1276. GNUNET_assert (GNUNET_YES ==
  1277. GNUNET_CONTAINER_multipeermap_remove (plugin->sessions,
  1278. &s->target,
  1279. s));
  1280. GNUNET_STATISTICS_set (plugin->env->stats,
  1281. "# UDP sessions active",
  1282. GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
  1283. GNUNET_NO);
  1284. if (s->rc > 0)
  1285. {
  1286. s->in_destroy = GNUNET_YES;
  1287. }
  1288. else
  1289. {
  1290. GNUNET_HELLO_address_free (s->address);
  1291. free_session (s);
  1292. }
  1293. return GNUNET_OK;
  1294. }
  1295. /**
  1296. * Function that is called to get the keepalive factor.
  1297. * #GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT is divided by this number to
  1298. * calculate the interval between keepalive packets.
  1299. *
  1300. * @param cls closure with the `struct Plugin`
  1301. * @return keepalive factor
  1302. */
  1303. static unsigned int
  1304. udp_query_keepalive_factor (void *cls)
  1305. {
  1306. return 15;
  1307. }
  1308. /**
  1309. * Destroy a session, plugin is being unloaded.
  1310. *
  1311. * @param cls the `struct Plugin`
  1312. * @param key hash of public key of target peer
  1313. * @param value a `struct PeerSession *` to clean up
  1314. * @return #GNUNET_OK (continue to iterate)
  1315. */
  1316. static int
  1317. disconnect_and_free_it (void *cls,
  1318. const struct GNUNET_PeerIdentity *key,
  1319. void *value)
  1320. {
  1321. struct Plugin *plugin = cls;
  1322. udp_disconnect_session (plugin, value);
  1323. return GNUNET_OK;
  1324. }
  1325. /**
  1326. * Disconnect from a remote node. Clean up session if we have one for
  1327. * this peer.
  1328. *
  1329. * @param cls closure for this call (should be handle to Plugin)
  1330. * @param target the peeridentity of the peer to disconnect
  1331. * @return #GNUNET_OK on success, #GNUNET_SYSERR if the operation failed
  1332. */
  1333. static void
  1334. udp_disconnect (void *cls,
  1335. const struct GNUNET_PeerIdentity *target)
  1336. {
  1337. struct Plugin *plugin = cls;
  1338. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1339. "Disconnecting from peer `%s'\n",
  1340. GNUNET_i2s (target));
  1341. /* Clean up sessions */
  1342. GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
  1343. target,
  1344. &disconnect_and_free_it,
  1345. plugin);
  1346. }
  1347. /**
  1348. * Session was idle, so disconnect it
  1349. *
  1350. * @param cls the `struct Session` to time out
  1351. * @param tc scheduler context
  1352. */
  1353. static void
  1354. session_timeout (void *cls,
  1355. const struct GNUNET_SCHEDULER_TaskContext *tc)
  1356. {
  1357. struct Session *s = cls;
  1358. struct Plugin *plugin = s->plugin;
  1359. struct GNUNET_TIME_Relative left;
  1360. s->timeout_task = NULL;
  1361. left = GNUNET_TIME_absolute_get_remaining (s->timeout);
  1362. if (left.rel_value_us > 0)
  1363. {
  1364. /* not actually our turn yet, but let's at least update
  1365. the monitor, it may think we're about to die ... */
  1366. notify_session_monitor (s->plugin,
  1367. s,
  1368. GNUNET_TRANSPORT_SS_UPDATE);
  1369. s->timeout_task = GNUNET_SCHEDULER_add_delayed (left,
  1370. &session_timeout,
  1371. s);
  1372. return;
  1373. }
  1374. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1375. "Session %p was idle for %s, disconnecting\n",
  1376. s,
  1377. GNUNET_STRINGS_relative_time_to_string (UDP_SESSION_TIME_OUT,
  1378. GNUNET_YES));
  1379. /* call session destroy function */
  1380. udp_disconnect_session (plugin, s);
  1381. }
  1382. /**
  1383. * Increment session timeout due to activity
  1384. *
  1385. * @param s session to reschedule timeout activity for
  1386. */
  1387. static void
  1388. reschedule_session_timeout (struct Session *s)
  1389. {
  1390. if (GNUNET_YES == s->in_destroy)
  1391. return;
  1392. GNUNET_assert(NULL != s->timeout_task);
  1393. s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
  1394. }
  1395. /**
  1396. * FIXME.
  1397. */
  1398. static struct Session *
  1399. create_session (struct Plugin *plugin,
  1400. const struct GNUNET_HELLO_Address *address)
  1401. {
  1402. struct Session *s;
  1403. s = GNUNET_new (struct Session);
  1404. s->plugin = plugin;
  1405. s->address = GNUNET_HELLO_address_copy (address);
  1406. s->target = address->peer;
  1407. s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
  1408. 250);
  1409. s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
  1410. s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS;
  1411. s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
  1412. s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
  1413. s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT,
  1414. &session_timeout, s);
  1415. return s;
  1416. }
  1417. /**
  1418. * Function obtain the network type for a session
  1419. *
  1420. * @param cls closure ('struct Plugin*')
  1421. * @param session the session
  1422. * @return the network type
  1423. */
  1424. static enum GNUNET_ATS_Network_Type
  1425. udp_get_network (void *cls,
  1426. struct Session *session)
  1427. {
  1428. return session->scope;
  1429. }
  1430. /**
  1431. * Closure for #session_cmp_it().
  1432. */
  1433. struct SessionCompareContext
  1434. {
  1435. /**
  1436. * Set to session matching the address.
  1437. */
  1438. struct Session *res;
  1439. /**
  1440. * Address we are looking for.
  1441. */
  1442. const struct GNUNET_HELLO_Address *address;
  1443. };
  1444. /**
  1445. * Find a session with a matching address.
  1446. *
  1447. * @param cls the `struct SessionCompareContext *`
  1448. * @param key peer identity (unused)
  1449. * @param value the `struct Session *`
  1450. * @return #GNUNET_NO if we found the session, #GNUNET_OK if not
  1451. */
  1452. static int
  1453. session_cmp_it (void *cls,
  1454. const struct GNUNET_PeerIdentity *key,
  1455. void *value)
  1456. {
  1457. struct SessionCompareContext *cctx = cls;
  1458. const struct GNUNET_HELLO_Address *address = cctx->address;
  1459. struct Session *s = value;
  1460. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1461. "Comparing address %s <-> %s\n",
  1462. udp_address_to_string (s->plugin,
  1463. address->address,
  1464. address->address_length),
  1465. udp_address_to_string (s->plugin,
  1466. s->address->address,
  1467. s->address->address_length));
  1468. if (0 == GNUNET_HELLO_address_cmp(s->address, cctx->address))
  1469. {
  1470. cctx->res = s;
  1471. return GNUNET_NO;
  1472. }
  1473. return GNUNET_YES;
  1474. }
  1475. /**
  1476. * Locate an existing session the transport service is using to
  1477. * send data to another peer. Performs some basic sanity checks
  1478. * on the address and then tries to locate a matching session.
  1479. *
  1480. * @param cls the plugin
  1481. * @param address the address we should locate the session by
  1482. * @return the session if it exists, or NULL if it is not found
  1483. */
  1484. static struct Session *
  1485. udp_plugin_lookup_session (void *cls,
  1486. const struct GNUNET_HELLO_Address *address)
  1487. {
  1488. struct Plugin *plugin = cls;
  1489. const struct IPv6UdpAddress *udp_a6;
  1490. const struct IPv4UdpAddress *udp_a4;
  1491. struct SessionCompareContext cctx;
  1492. if ( (NULL == address->address) ||
  1493. ((address->address_length != sizeof (struct IPv4UdpAddress)) &&
  1494. (address->address_length != sizeof (struct IPv6UdpAddress))))
  1495. {
  1496. LOG (GNUNET_ERROR_TYPE_WARNING,
  1497. _("Trying to locate session for address of unexpected length %u (should be %u or %u)\n"),
  1498. address->address_length,
  1499. sizeof (struct IPv4UdpAddress),
  1500. sizeof (struct IPv6UdpAddress));
  1501. return NULL;
  1502. }
  1503. if (address->address_length == sizeof(struct IPv4UdpAddress))
  1504. {
  1505. if (NULL == plugin->sockv4)
  1506. return NULL;
  1507. udp_a4 = (const struct IPv4UdpAddress *) address->address;
  1508. if (0 == udp_a4->u4_port)
  1509. return NULL;
  1510. }
  1511. if (address->address_length == sizeof(struct IPv6UdpAddress))
  1512. {
  1513. if (NULL == plugin->sockv6)
  1514. return NULL;
  1515. udp_a6 = (const struct IPv6UdpAddress *) address->address;
  1516. if (0 == udp_a6->u6_port)
  1517. return NULL;
  1518. }
  1519. /* check if session already exists */
  1520. cctx.address = address;
  1521. cctx.res = NULL;
  1522. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1523. "Looking for existing session for peer `%s' `%s' \n",
  1524. GNUNET_i2s (&address->peer),
  1525. udp_address_to_string (plugin,
  1526. address->address,
  1527. address->address_length));
  1528. GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
  1529. &address->peer,
  1530. &session_cmp_it,
  1531. &cctx);
  1532. if (NULL != cctx.res)
  1533. {
  1534. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1535. "Found existing session %p\n",
  1536. cctx.res);
  1537. return cctx.res;
  1538. }
  1539. return NULL;
  1540. }
  1541. /**
  1542. * Allocate a new session for the given endpoint address.
  1543. * Note that this function does not inform the service
  1544. * of the new session, this is the responsibility of the
  1545. * caller (if needed).
  1546. *
  1547. * @param cls the `struct Plugin`
  1548. * @param address address of the other peer to use
  1549. * @param network_type network type the address belongs to
  1550. * @return NULL on error, otherwise session handle
  1551. */
  1552. static struct Session *
  1553. udp_plugin_create_session (void *cls,
  1554. const struct GNUNET_HELLO_Address *address,
  1555. enum GNUNET_ATS_Network_Type network_type)
  1556. {
  1557. struct Plugin *plugin = cls;
  1558. struct Session *s;
  1559. s = create_session (plugin, address);
  1560. s->scope = network_type;
  1561. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1562. "Creating new session %p for peer `%s' address `%s'\n",
  1563. s,
  1564. GNUNET_i2s (&address->peer),
  1565. udp_address_to_string (plugin,
  1566. address->address,
  1567. address->address_length));
  1568. GNUNET_assert(GNUNET_OK ==
  1569. GNUNET_CONTAINER_multipeermap_put (plugin->sessions,
  1570. &s->target,
  1571. s,
  1572. GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
  1573. GNUNET_STATISTICS_set (plugin->env->stats,
  1574. "# UDP sessions active",
  1575. GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
  1576. GNUNET_NO);
  1577. return s;
  1578. }
  1579. /**
  1580. * Function that will be called whenever the transport service wants to
  1581. * notify the plugin that a session is still active and in use and
  1582. * therefore the session timeout for this session has to be updated
  1583. *
  1584. * @param cls closure
  1585. * @param peer which peer was the session for
  1586. * @param session which session is being updated
  1587. */
  1588. static void
  1589. udp_plugin_update_session_timeout (void *cls,
  1590. const struct GNUNET_PeerIdentity *peer,
  1591. struct Session *session)
  1592. {
  1593. struct Plugin *plugin = cls;
  1594. if (GNUNET_YES !=
  1595. GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
  1596. peer,
  1597. session))
  1598. {
  1599. GNUNET_break(0);
  1600. return;
  1601. }
  1602. /* Reschedule session timeout */
  1603. reschedule_session_timeout (session);
  1604. }
  1605. /**
  1606. * Creates a new outbound session the transport service will use to
  1607. * send data to the peer.
  1608. *
  1609. * @param cls the plugin
  1610. * @param address the address
  1611. * @return the session or NULL of max connections exceeded
  1612. */
  1613. static struct Session *
  1614. udp_plugin_get_session (void *cls,
  1615. const struct GNUNET_HELLO_Address *address)
  1616. {
  1617. struct Plugin *plugin = cls;
  1618. struct Session *s;
  1619. enum GNUNET_ATS_Network_Type network_type;
  1620. struct IPv4UdpAddress *udp_v4;
  1621. struct IPv6UdpAddress *udp_v6;
  1622. if (NULL == address)
  1623. {
  1624. GNUNET_break(0);
  1625. return NULL;
  1626. }
  1627. if ( (address->address_length != sizeof(struct IPv4UdpAddress)) &&
  1628. (address->address_length != sizeof(struct IPv6UdpAddress)) )
  1629. return NULL;
  1630. if (NULL != (s = udp_plugin_lookup_session (cls,
  1631. address)))
  1632. return s;
  1633. if (sizeof (struct IPv4UdpAddress) == address->address_length)
  1634. {
  1635. struct sockaddr_in v4;
  1636. udp_v4 = (struct IPv4UdpAddress *) address->address;
  1637. memset (&v4, '\0', sizeof (v4));
  1638. v4.sin_family = AF_INET;
  1639. #if HAVE_SOCKADDR_IN_SIN_LEN
  1640. v4.sin_len = sizeof (struct sockaddr_in);
  1641. #endif
  1642. v4.sin_port = udp_v4->u4_port;
  1643. v4.sin_addr.s_addr = udp_v4->ipv4_addr;
  1644. network_type = plugin->env->get_address_type (plugin->env->cls,
  1645. (const struct sockaddr *) &v4,
  1646. sizeof (v4));
  1647. }
  1648. else if (sizeof (struct IPv6UdpAddress) == address->address_length)
  1649. {
  1650. struct sockaddr_in6 v6;
  1651. udp_v6 = (struct IPv6UdpAddress *) address->address;
  1652. memset (&v6, '\0', sizeof (v6));
  1653. v6.sin6_family = AF_INET6;
  1654. #if HAVE_SOCKADDR_IN_SIN_LEN
  1655. v6.sin6_len = sizeof (struct sockaddr_in6);
  1656. #endif
  1657. v6.sin6_port = udp_v6->u6_port;
  1658. v6.sin6_addr = udp_v6->ipv6_addr;
  1659. network_type = plugin->env->get_address_type (plugin->env->cls,
  1660. (const struct sockaddr *) &v6,
  1661. sizeof (v6));
  1662. }
  1663. /* otherwise create new */
  1664. return udp_plugin_create_session (cls, address, network_type);
  1665. }
  1666. /**
  1667. * Enqueue a message for transmission.
  1668. *
  1669. * @param plugin the UDP plugin
  1670. * @param udpw message wrapper to queue
  1671. */
  1672. static void
  1673. enqueue (struct Plugin *plugin,
  1674. struct UDP_MessageWrapper *udpw)
  1675. {
  1676. struct Session *session = udpw->session;
  1677. if (plugin->bytes_in_buffer + udpw->msg_size > INT64_MAX)
  1678. {
  1679. GNUNET_break (0);
  1680. }
  1681. else
  1682. {
  1683. GNUNET_STATISTICS_update (plugin->env->stats,
  1684. "# UDP, total, bytes in buffers", udpw->msg_size, GNUNET_NO);
  1685. plugin->bytes_in_buffer += udpw->msg_size;
  1686. }
  1687. GNUNET_STATISTICS_update (plugin->env->stats,
  1688. "# UDP, total, msgs in buffers",
  1689. 1, GNUNET_NO);
  1690. if (udpw->session->address->address_length == sizeof (struct IPv4UdpAddress))
  1691. GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head,
  1692. plugin->ipv4_queue_tail,
  1693. udpw);
  1694. else if (udpw->session->address->address_length == sizeof (struct IPv6UdpAddress))
  1695. GNUNET_CONTAINER_DLL_insert (plugin->ipv6_queue_head,
  1696. plugin->ipv6_queue_tail,
  1697. udpw);
  1698. else
  1699. {
  1700. GNUNET_break (0);
  1701. return;
  1702. }
  1703. session->msgs_in_queue++;
  1704. session->bytes_in_queue += udpw->msg_size;
  1705. }
  1706. /**
  1707. * Fragment message was transmitted via UDP, let fragmentation know
  1708. * to send the next fragment now.
  1709. *
  1710. * @param cls the `struct UDPMessageWrapper *` of the fragment
  1711. * @param target destination peer (ignored)
  1712. * @param result #GNUNET_OK on success (ignored)
  1713. * @param payload bytes payload sent
  1714. * @param physical bytes physical sent
  1715. */
  1716. static void
  1717. send_next_fragment (void *cls,
  1718. const struct GNUNET_PeerIdentity *target,
  1719. int result,
  1720. size_t payload,
  1721. size_t physical)
  1722. {
  1723. struct UDP_MessageWrapper *udpw = cls;
  1724. GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);
  1725. }
  1726. /**
  1727. * Function that is called with messages created by the fragmentation
  1728. * module. In the case of the 'proc' callback of the
  1729. * #GNUNET_FRAGMENT_context_create() function, this function must
  1730. * eventually call #GNUNET_FRAGMENT_context_transmission_done().
  1731. *
  1732. * @param cls closure, the 'struct FragmentationContext'
  1733. * @param msg the message that was created
  1734. */
  1735. static void
  1736. enqueue_fragment (void *cls,
  1737. const struct GNUNET_MessageHeader *msg)
  1738. {
  1739. struct UDP_FragmentationContext *frag_ctx = cls;
  1740. struct Plugin *plugin = frag_ctx->plugin;
  1741. struct UDP_MessageWrapper * udpw;
  1742. size_t msg_len = ntohs (msg->size);
  1743. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1744. "Enqueuing fragment with %u bytes\n",
  1745. msg_len);
  1746. frag_ctx->fragments_used++;
  1747. udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len);
  1748. udpw->session = frag_ctx->session;
  1749. udpw->msg_buf = (char *) &udpw[1];
  1750. udpw->msg_size = msg_len;
  1751. udpw->payload_size = msg_len; /*FIXME: minus fragment overhead */
  1752. udpw->cont = &send_next_fragment;
  1753. udpw->cont_cls = udpw;
  1754. udpw->timeout = frag_ctx->timeout;
  1755. udpw->frag_ctx = frag_ctx;
  1756. udpw->msg_type = UMT_MSG_FRAGMENTED;
  1757. memcpy (udpw->msg_buf, msg, msg_len);
  1758. enqueue (plugin, udpw);
  1759. schedule_select (plugin);
  1760. }
  1761. /**
  1762. * Function that can be used by the transport service to transmit
  1763. * a message using the plugin. Note that in the case of a
  1764. * peer disconnecting, the continuation MUST be called
  1765. * prior to the disconnect notification itself. This function
  1766. * will be called with this peer's HELLO message to initiate
  1767. * a fresh connection to another peer.
  1768. *
  1769. * @param cls closure
  1770. * @param s which session must be used
  1771. * @param msgbuf the message to transmit
  1772. * @param msgbuf_size number of bytes in 'msgbuf'
  1773. * @param priority how important is the message (most plugins will
  1774. * ignore message priority and just FIFO)
  1775. * @param to how long to wait at most for the transmission (does not
  1776. * require plugins to discard the message after the timeout,
  1777. * just advisory for the desired delay; most plugins will ignore
  1778. * this as well)
  1779. * @param cont continuation to call once the message has
  1780. * been transmitted (or if the transport is ready
  1781. * for the next transmission call; or if the
  1782. * peer disconnected...); can be NULL
  1783. * @param cont_cls closure for cont
  1784. * @return number of bytes used (on the physical network, with overheads);
  1785. * -1 on hard errors (i.e. address invalid); 0 is a legal value
  1786. * and does NOT mean that the message was not transmitted (DV)
  1787. */
  1788. static ssize_t
  1789. udp_plugin_send (void *cls,
  1790. struct Session *s,
  1791. const char *msgbuf,
  1792. size_t msgbuf_size,
  1793. unsigned int priority,
  1794. struct GNUNET_TIME_Relative to,
  1795. GNUNET_TRANSPORT_TransmitContinuation cont,
  1796. void *cont_cls)
  1797. {
  1798. struct Plugin *plugin = cls;
  1799. size_t udpmlen = msgbuf_size + sizeof(struct UDPMessage);
  1800. struct UDP_FragmentationContext * frag_ctx;
  1801. struct UDP_MessageWrapper * udpw;
  1802. struct UDPMessage *udp;
  1803. char mbuf[udpmlen];
  1804. GNUNET_assert(plugin != NULL);
  1805. GNUNET_assert(s != NULL);
  1806. if ( (s->address->address_length == sizeof(struct IPv6UdpAddress)) &&
  1807. (plugin->sockv6 == NULL) )
  1808. return GNUNET_SYSERR;
  1809. if ( (s->address->address_length == sizeof(struct IPv4UdpAddress)) &&
  1810. (plugin->sockv4 == NULL) )
  1811. return GNUNET_SYSERR;
  1812. if (udpmlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
  1813. {
  1814. GNUNET_break(0);
  1815. return GNUNET_SYSERR;
  1816. }
  1817. if (GNUNET_YES !=
  1818. GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
  1819. &s->target,
  1820. s))
  1821. {
  1822. GNUNET_break(0);
  1823. return GNUNET_SYSERR;
  1824. }
  1825. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1826. "UDP transmits %u-byte message to `%s' using address `%s'\n",
  1827. udpmlen,
  1828. GNUNET_i2s (&s->target),
  1829. udp_address_to_string (plugin,
  1830. s->address->address,
  1831. s->address->address_length));
  1832. /* Message */
  1833. udp = (struct UDPMessage *) mbuf;
  1834. udp->header.size = htons (udpmlen);
  1835. udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
  1836. udp->reserved = htonl (0);
  1837. udp->sender = *plugin->env->my_identity;
  1838. /* We do not update the session time out here!
  1839. * Otherwise this session will not timeout since we send keep alive before
  1840. * session can timeout
  1841. *
  1842. * For UDP we update session timeout only on receive, this will cover keep
  1843. * alives, since remote peer will reply with keep alive response!
  1844. */
  1845. if (udpmlen <= UDP_MTU)
  1846. {
  1847. /* unfragmented message */
  1848. udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + udpmlen);
  1849. udpw->session = s;
  1850. udpw->msg_buf = (char *) &udpw[1];
  1851. udpw->msg_size = udpmlen; /* message size with UDP overhead */
  1852. udpw->payload_size = msgbuf_size; /* message size without UDP overhead */
  1853. udpw->timeout = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (), to);
  1854. udpw->cont = cont;
  1855. udpw->cont_cls = cont_cls;
  1856. udpw->frag_ctx = NULL;
  1857. udpw->msg_type = UMT_MSG_UNFRAGMENTED;
  1858. memcpy (udpw->msg_buf, udp, sizeof(struct UDPMessage));
  1859. memcpy (&udpw->msg_buf[sizeof(struct UDPMessage)], msgbuf, msgbuf_size);
  1860. enqueue (plugin, udpw);
  1861. GNUNET_STATISTICS_update (plugin->env->stats,
  1862. "# UDP, unfragmented msgs, messages, attempt", 1, GNUNET_NO);
  1863. GNUNET_STATISTICS_update (plugin->env->stats,
  1864. "# UDP, unfragmented msgs, bytes payload, attempt",
  1865. udpw->payload_size,
  1866. GNUNET_NO);
  1867. }
  1868. else
  1869. {
  1870. /* fragmented message */
  1871. if (s->frag_ctx != NULL)
  1872. return GNUNET_SYSERR;
  1873. memcpy (&udp[1], msgbuf, msgbuf_size);
  1874. frag_ctx = GNUNET_new (struct UDP_FragmentationContext);
  1875. frag_ctx->plugin = plugin;
  1876. frag_ctx->session = s;
  1877. frag_ctx->cont = cont;
  1878. frag_ctx->cont_cls = cont_cls;
  1879. frag_ctx->timeout = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (),
  1880. to);
  1881. frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */
  1882. frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
  1883. frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
  1884. UDP_MTU,
  1885. &plugin->tracker,
  1886. s->last_expected_msg_delay,
  1887. s->last_expected_ack_delay,
  1888. &udp->header,
  1889. &enqueue_fragment,
  1890. frag_ctx);
  1891. s->frag_ctx = frag_ctx;
  1892. GNUNET_STATISTICS_update (plugin->env->stats,
  1893. "# UDP, fragmented msgs, messages, pending",
  1894. 1,
  1895. GNUNET_NO);
  1896. GNUNET_STATISTICS_update (plugin->env->stats,
  1897. "# UDP, fragmented msgs, messages, attempt",
  1898. 1,
  1899. GNUNET_NO);
  1900. GNUNET_STATISTICS_update (plugin->env->stats,
  1901. "# UDP, fragmented msgs, bytes payload, attempt",
  1902. frag_ctx->payload_size,
  1903. GNUNET_NO);
  1904. }
  1905. notify_session_monitor (s->plugin,
  1906. s,
  1907. GNUNET_TRANSPORT_SS_UPDATE);
  1908. schedule_select (plugin);
  1909. return udpmlen;
  1910. }
  1911. /**
  1912. * Our external IP address/port mapping has changed.
  1913. *
  1914. * @param cls closure, the `struct LocalAddrList`
  1915. * @param add_remove #GNUNET_YES to mean the new public IP address, #GNUNET_NO to mean
  1916. * the previous (now invalid) one
  1917. * @param addr either the previous or the new public IP address
  1918. * @param addrlen actual lenght of the address
  1919. */
  1920. static void
  1921. udp_nat_port_map_callback (void *cls,
  1922. int add_remove,
  1923. const struct sockaddr *addr,
  1924. socklen_t addrlen)
  1925. {
  1926. struct Plugin *plugin = cls;
  1927. struct GNUNET_HELLO_Address *address;
  1928. struct IPv4UdpAddress u4;
  1929. struct IPv6UdpAddress u6;
  1930. void *arg;
  1931. size_t args;
  1932. LOG (GNUNET_ERROR_TYPE_INFO,
  1933. "NAT notification to %s address `%s'\n",
  1934. (GNUNET_YES == add_remove) ? "add" : "remove",
  1935. GNUNET_a2s (addr, addrlen));
  1936. /* convert 'address' to our internal format */
  1937. switch (addr->sa_family)
  1938. {
  1939. case AF_INET:
  1940. GNUNET_assert(addrlen == sizeof(struct sockaddr_in));
  1941. memset (&u4, 0, sizeof(u4));
  1942. u4.options = htonl (plugin->myoptions);
  1943. u4.ipv4_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr;
  1944. u4.u4_port = ((struct sockaddr_in *) addr)->sin_port;
  1945. if (0 == ((struct sockaddr_in *) addr)->sin_port)
  1946. return;
  1947. arg = &u4;
  1948. args = sizeof(struct IPv4UdpAddress);
  1949. break;
  1950. case AF_INET6:
  1951. GNUNET_assert(addrlen == sizeof(struct sockaddr_in6));
  1952. memset (&u6, 0, sizeof(u6));
  1953. u6.options = htonl (plugin->myoptions);
  1954. if (0 == ((struct sockaddr_in6 *) addr)->sin6_port)
  1955. return;
  1956. memcpy (&u6.ipv6_addr, &((struct sockaddr_in6 *) addr)->sin6_addr,
  1957. sizeof(struct in6_addr));
  1958. u6.u6_port = ((struct sockaddr_in6 *) addr)->sin6_port;
  1959. arg = &u6;
  1960. args = sizeof(struct IPv6UdpAddress);
  1961. break;
  1962. default:
  1963. GNUNET_break(0);
  1964. return;
  1965. }
  1966. /* modify our published address list */
  1967. address = GNUNET_HELLO_address_allocate (plugin->env->my_identity,
  1968. PLUGIN_NAME,
  1969. arg, args,
  1970. GNUNET_HELLO_ADDRESS_INFO_NONE);
  1971. plugin->env->notify_address (plugin->env->cls, add_remove, address);
  1972. GNUNET_HELLO_address_free (address);
  1973. }
  1974. /**
  1975. * Message tokenizer has broken up an incomming message. Pass it on
  1976. * to the service.
  1977. *
  1978. * @param cls the `struct Plugin *`
  1979. * @param client the `struct SourceInformation *`
  1980. * @param hdr the actual message
  1981. * @return #GNUNET_OK (always)
  1982. */
  1983. static int
  1984. process_inbound_tokenized_messages (void *cls,
  1985. void *client,
  1986. const struct GNUNET_MessageHeader *hdr)
  1987. {
  1988. struct Plugin *plugin = cls;
  1989. struct SourceInformation *si = client;
  1990. struct GNUNET_TIME_Relative delay;
  1991. GNUNET_assert (NULL != si->session);
  1992. if (GNUNET_YES == si->session->in_destroy)
  1993. return GNUNET_OK;
  1994. /* setup ATS */
  1995. reschedule_session_timeout (si->session);
  1996. delay = plugin->env->receive (plugin->env->cls,
  1997. si->session->address,
  1998. si->session,
  1999. hdr);
  2000. si->session->flow_delay_for_other_peer = delay;
  2001. return GNUNET_OK;
  2002. }
  2003. /**
  2004. * We've received a UDP Message. Process it (pass contents to main service).
  2005. *
  2006. * @param plugin plugin context
  2007. * @param msg the message
  2008. * @param udp_addr sender address
  2009. * @param udp_addr_len number of bytes in @a udp_addr
  2010. * @param network_type network type the address belongs to
  2011. */
  2012. static void
  2013. process_udp_message (struct Plugin *plugin,
  2014. const struct UDPMessage *msg,
  2015. const union UdpAddress *udp_addr,
  2016. size_t udp_addr_len,
  2017. enum GNUNET_ATS_Network_Type network_type)
  2018. {
  2019. struct SourceInformation si;
  2020. struct Session *s;
  2021. struct GNUNET_HELLO_Address *address;
  2022. if (0 != ntohl (msg->reserved))
  2023. {
  2024. GNUNET_break_op(0);
  2025. return;
  2026. }
  2027. if (ntohs (msg->header.size)
  2028. < sizeof(struct GNUNET_MessageHeader) + sizeof(struct UDPMessage))
  2029. {
  2030. GNUNET_break_op(0);
  2031. return;
  2032. }
  2033. address = GNUNET_HELLO_address_allocate (&msg->sender,
  2034. PLUGIN_NAME,
  2035. udp_addr,
  2036. udp_addr_len,
  2037. GNUNET_HELLO_ADDRESS_INFO_NONE);
  2038. if (NULL ==
  2039. (s = udp_plugin_lookup_session (plugin, address)))
  2040. {
  2041. s = udp_plugin_create_session (plugin,
  2042. address,
  2043. network_type);
  2044. plugin->env->session_start (plugin->env->cls,
  2045. address,
  2046. s,
  2047. s->scope);
  2048. notify_session_monitor (s->plugin,
  2049. s,
  2050. GNUNET_TRANSPORT_SS_INIT);
  2051. notify_session_monitor (s->plugin,
  2052. s,
  2053. GNUNET_TRANSPORT_SS_UP);
  2054. }
  2055. GNUNET_free (address);
  2056. /* iterate over all embedded messages */
  2057. si.session = s;
  2058. si.sender = msg->sender;
  2059. s->rc++;
  2060. GNUNET_SERVER_mst_receive (plugin->mst,
  2061. &si,
  2062. (const char *) &msg[1],
  2063. ntohs (msg->header.size) - sizeof(struct UDPMessage),
  2064. GNUNET_YES,
  2065. GNUNET_NO);
  2066. s->rc--;
  2067. if ((0 == s->rc) && (GNUNET_YES == s->in_destroy))
  2068. free_session (s);
  2069. }
  2070. /**
  2071. * Process a defragmented message.
  2072. *
  2073. * @param cls the `struct DefragContext *`
  2074. * @param msg the message
  2075. */
  2076. static void
  2077. fragment_msg_proc (void *cls,
  2078. const struct GNUNET_MessageHeader *msg)
  2079. {
  2080. struct DefragContext *rc = cls;
  2081. const struct UDPMessage *um;
  2082. if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
  2083. {
  2084. GNUNET_break(0);
  2085. return;
  2086. }
  2087. if (ntohs (msg->size) < sizeof(struct UDPMessage))
  2088. {
  2089. GNUNET_break(0);
  2090. return;
  2091. }
  2092. um = (const struct UDPMessage *) msg;
  2093. rc->sender = um->sender;
  2094. rc->have_sender = GNUNET_YES;
  2095. process_udp_message (rc->plugin,
  2096. um,
  2097. rc->udp_addr,
  2098. rc->udp_addr_len,
  2099. rc->network_type);
  2100. }
  2101. /**
  2102. * Transmit an acknowledgement.
  2103. *
  2104. * @param cls the `struct DefragContext *`
  2105. * @param id message ID (unused)
  2106. * @param msg ack to transmit
  2107. */
  2108. static void
  2109. ack_proc (void *cls,
  2110. uint32_t id,
  2111. const struct GNUNET_MessageHeader *msg)
  2112. {
  2113. struct DefragContext *rc = cls;
  2114. size_t msize = sizeof(struct UDP_ACK_Message) + ntohs (msg->size);
  2115. struct UDP_ACK_Message *udp_ack;
  2116. uint32_t delay = 0;
  2117. struct UDP_MessageWrapper *udpw;
  2118. struct Session *s;
  2119. struct GNUNET_HELLO_Address *address;
  2120. if (GNUNET_NO == rc->have_sender)
  2121. {
  2122. /* tried to defragment but never succeeded, hence will not ACK */
  2123. GNUNET_break_op (0);
  2124. return;
  2125. }
  2126. address = GNUNET_HELLO_address_allocate (&rc->sender,
  2127. PLUGIN_NAME,
  2128. rc->udp_addr,
  2129. rc->udp_addr_len,
  2130. GNUNET_HELLO_ADDRESS_INFO_NONE);
  2131. s = udp_plugin_lookup_session (rc->plugin,
  2132. address);
  2133. GNUNET_HELLO_address_free (address);
  2134. if (NULL == s)
  2135. {
  2136. LOG (GNUNET_ERROR_TYPE_ERROR,
  2137. "Trying to transmit ACK to peer `%s' but no session found!\n",
  2138. udp_address_to_string (rc->plugin,
  2139. rc->udp_addr,
  2140. rc->udp_addr_len));
  2141. GNUNET_CONTAINER_heap_remove_node (rc->hnode);
  2142. GNUNET_DEFRAGMENT_context_destroy (rc->defrag);
  2143. GNUNET_free (rc);
  2144. return;
  2145. }
  2146. if (s->flow_delay_for_other_peer.rel_value_us <= UINT32_MAX)
  2147. delay = s->flow_delay_for_other_peer.rel_value_us;
  2148. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2149. "Sending ACK to `%s' including delay of %s\n",
  2150. udp_address_to_string (rc->plugin,
  2151. rc->udp_addr,
  2152. rc->udp_addr_len),
  2153. GNUNET_STRINGS_relative_time_to_string (s->flow_delay_for_other_peer,
  2154. GNUNET_YES));
  2155. udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msize);
  2156. udpw->msg_size = msize;
  2157. udpw->payload_size = 0;
  2158. udpw->session = s;
  2159. udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
  2160. udpw->msg_buf = (char *) &udpw[1];
  2161. udpw->msg_type = UMT_MSG_ACK;
  2162. udp_ack = (struct UDP_ACK_Message *) udpw->msg_buf;
  2163. udp_ack->header.size = htons ((uint16_t) msize);
  2164. udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
  2165. udp_ack->delay = htonl (delay);
  2166. udp_ack->sender = *rc->plugin->env->my_identity;
  2167. memcpy (&udp_ack[1], msg, ntohs (msg->size));
  2168. enqueue (rc->plugin, udpw);
  2169. notify_session_monitor (s->plugin,
  2170. s,
  2171. GNUNET_TRANSPORT_SS_UPDATE);
  2172. schedule_select (rc->plugin);
  2173. }
  2174. /**
  2175. * Handle an ACK message.
  2176. *
  2177. * @param plugin the UDP plugin
  2178. * @param msg the (presumed) UDP ACK message
  2179. * @param udp_addr sender address
  2180. * @param udp_addr_len number of bytes in @a udp_addr
  2181. */
  2182. static void
  2183. read_process_ack (struct Plugin *plugin,
  2184. const struct GNUNET_MessageHeader *msg,
  2185. const union UdpAddress *udp_addr,
  2186. socklen_t udp_addr_len)
  2187. {
  2188. const struct GNUNET_MessageHeader *ack;
  2189. const struct UDP_ACK_Message *udp_ack;
  2190. struct GNUNET_HELLO_Address *address;
  2191. struct Session *s;
  2192. struct GNUNET_TIME_Relative flow_delay;
  2193. if (ntohs (msg->size)
  2194. < sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader))
  2195. {
  2196. GNUNET_break_op(0);
  2197. return;
  2198. }
  2199. udp_ack = (const struct UDP_ACK_Message *) msg;
  2200. address = GNUNET_HELLO_address_allocate (&udp_ack->sender,
  2201. PLUGIN_NAME,
  2202. udp_addr,
  2203. udp_addr_len,
  2204. GNUNET_HELLO_ADDRESS_INFO_NONE);
  2205. s = udp_plugin_lookup_session (plugin,
  2206. address);
  2207. if ( (NULL == s) ||
  2208. (NULL == s->frag_ctx) )
  2209. {
  2210. LOG (GNUNET_ERROR_TYPE_WARNING,
  2211. "UDP session of address %s for ACK not found\n",
  2212. udp_address_to_string (plugin,
  2213. address->address,
  2214. address->address_length));
  2215. GNUNET_HELLO_address_free (address);
  2216. return;
  2217. }
  2218. GNUNET_HELLO_address_free (address);
  2219. flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay);
  2220. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2221. "We received a sending delay of %s\n",
  2222. GNUNET_STRINGS_relative_time_to_string (flow_delay,
  2223. GNUNET_YES));
  2224. s->flow_delay_from_other_peer = GNUNET_TIME_relative_to_absolute (flow_delay);
  2225. ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
  2226. if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message))
  2227. {
  2228. GNUNET_break_op(0);
  2229. return;
  2230. }
  2231. if (GNUNET_OK !=
  2232. GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag,
  2233. ack))
  2234. {
  2235. LOG(GNUNET_ERROR_TYPE_DEBUG,
  2236. "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
  2237. (unsigned int ) ntohs (msg->size),
  2238. GNUNET_i2s (&udp_ack->sender),
  2239. udp_address_to_string (plugin,
  2240. udp_addr,
  2241. udp_addr_len));
  2242. /* Expect more ACKs to arrive */
  2243. return;
  2244. }
  2245. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2246. "Message full ACK'ed\n",
  2247. (unsigned int ) ntohs (msg->size),
  2248. GNUNET_i2s (&udp_ack->sender),
  2249. udp_address_to_string (plugin,
  2250. udp_addr,
  2251. udp_addr_len));
  2252. /* Remove fragmented message after successful sending */
  2253. fragmented_message_done (s->frag_ctx,
  2254. GNUNET_OK);
  2255. }
  2256. /**
  2257. * We received a fragment, process it.
  2258. *
  2259. * @param plugin our plugin
  2260. * @param msg a message of type #GNUNET_MESSAGE_TYPE_FRAGMENT
  2261. * @param udp_addr sender address
  2262. * @param udp_addr_len number of bytes in @a udp_addr
  2263. * @param network_type network type the address belongs to
  2264. */
  2265. static void
  2266. read_process_fragment (struct Plugin *plugin,
  2267. const struct GNUNET_MessageHeader *msg,
  2268. const union UdpAddress *udp_addr,
  2269. size_t udp_addr_len,
  2270. enum GNUNET_ATS_Network_Type network_type)
  2271. {
  2272. struct DefragContext *d_ctx;
  2273. struct GNUNET_TIME_Absolute now;
  2274. struct FindReceiveContext frc;
  2275. frc.rc = NULL;
  2276. frc.udp_addr = udp_addr;
  2277. frc.udp_addr_len = udp_addr_len;
  2278. /* Lookup existing receive context for this address */
  2279. GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
  2280. &find_receive_context,
  2281. &frc);
  2282. now = GNUNET_TIME_absolute_get ();
  2283. d_ctx = frc.rc;
  2284. if (NULL == d_ctx)
  2285. {
  2286. /* Create a new defragmentation context */
  2287. d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + udp_addr_len);
  2288. memcpy (&d_ctx[1],
  2289. udp_addr,
  2290. udp_addr_len);
  2291. d_ctx->udp_addr = (const union UdpAddress *) &d_ctx[1];
  2292. d_ctx->udp_addr_len = udp_addr_len;
  2293. d_ctx->network_type = network_type;
  2294. d_ctx->plugin = plugin;
  2295. d_ctx->defrag = GNUNET_DEFRAGMENT_context_create (plugin->env->stats,
  2296. UDP_MTU,
  2297. UDP_MAX_MESSAGES_IN_DEFRAG,
  2298. d_ctx,
  2299. &fragment_msg_proc,
  2300. &ack_proc);
  2301. d_ctx->hnode = GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs,
  2302. d_ctx,
  2303. (GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
  2304. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2305. "Created new defragmentation context for %u-byte fragment from `%s'\n",
  2306. (unsigned int ) ntohs (msg->size),
  2307. udp_address_to_string (plugin,
  2308. udp_addr,
  2309. udp_addr_len));
  2310. }
  2311. else
  2312. {
  2313. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2314. "Found existing defragmentation context for %u-byte fragment from `%s'\n",
  2315. (unsigned int ) ntohs (msg->size),
  2316. udp_address_to_string (plugin,
  2317. udp_addr,
  2318. udp_addr_len));
  2319. }
  2320. if (GNUNET_OK ==
  2321. GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
  2322. {
  2323. /* keep this 'rc' from expiring */
  2324. GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs,
  2325. d_ctx->hnode,
  2326. (GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
  2327. }
  2328. if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
  2329. UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
  2330. {
  2331. /* remove 'rc' that was inactive the longest */
  2332. d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
  2333. GNUNET_assert (NULL != d_ctx);
  2334. GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
  2335. GNUNET_free (d_ctx);
  2336. }
  2337. }
  2338. /**
  2339. * Read and process a message from the given socket.
  2340. *
  2341. * @param plugin the overall plugin
  2342. * @param rsock socket to read from
  2343. */
  2344. static void
  2345. udp_select_read (struct Plugin *plugin,
  2346. struct GNUNET_NETWORK_Handle *rsock)
  2347. {
  2348. socklen_t fromlen;
  2349. struct sockaddr_storage addr;
  2350. char buf[65536] GNUNET_ALIGN;
  2351. ssize_t size;
  2352. const struct GNUNET_MessageHeader *msg;
  2353. struct IPv4UdpAddress v4;
  2354. struct IPv6UdpAddress v6;
  2355. const struct sockaddr *sa;
  2356. const struct sockaddr_in *sa4;
  2357. const struct sockaddr_in6 *sa6;
  2358. const union UdpAddress *int_addr;
  2359. size_t int_addr_len;
  2360. enum GNUNET_ATS_Network_Type network_type;
  2361. fromlen = sizeof(addr);
  2362. memset (&addr, 0, sizeof(addr));
  2363. size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof(buf),
  2364. (struct sockaddr *) &addr, &fromlen);
  2365. sa = (const struct sockaddr *) &addr;
  2366. #if MINGW
  2367. /* On SOCK_DGRAM UDP sockets recvfrom might fail with a
  2368. * WSAECONNRESET error to indicate that previous sendto() (yes, sendto!)
  2369. * on this socket has failed.
  2370. * Quote from MSDN:
  2371. * WSAECONNRESET - The virtual circuit was reset by the remote side
  2372. * executing a hard or abortive close. The application should close
  2373. * the socket; it is no longer usable. On a UDP-datagram socket this
  2374. * error indicates a previous send operation resulted in an ICMP Port
  2375. * Unreachable message.
  2376. */
  2377. if ( (-1 == size) && (ECONNRESET == errno) )
  2378. return;
  2379. #endif
  2380. if (-1 == size)
  2381. {
  2382. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2383. "UDP failed to receive data: %s\n",
  2384. STRERROR (errno));
  2385. /* Connection failure or something. Not a protocol violation. */
  2386. return;
  2387. }
  2388. if (size < sizeof(struct GNUNET_MessageHeader))
  2389. {
  2390. LOG (GNUNET_ERROR_TYPE_WARNING,
  2391. "UDP got %u bytes from %s, which is not enough for a GNUnet message header\n",
  2392. (unsigned int ) size,
  2393. GNUNET_a2s (sa, fromlen));
  2394. /* _MAY_ be a connection failure (got partial message) */
  2395. /* But it _MAY_ also be that the other side uses non-GNUnet protocol. */
  2396. GNUNET_break_op(0);
  2397. return;
  2398. }
  2399. msg = (const struct GNUNET_MessageHeader *) buf;
  2400. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2401. "UDP received %u-byte message from `%s' type %u\n",
  2402. (unsigned int) size,
  2403. GNUNET_a2s (sa,
  2404. fromlen),
  2405. ntohs (msg->type));
  2406. if (size != ntohs (msg->size))
  2407. {
  2408. LOG (GNUNET_ERROR_TYPE_WARNING,
  2409. "UDP malformed message header from %s\n",
  2410. (unsigned int) size,
  2411. GNUNET_a2s (sa,
  2412. fromlen));
  2413. GNUNET_break_op (0);
  2414. return;
  2415. }
  2416. GNUNET_STATISTICS_update (plugin->env->stats,
  2417. "# UDP, total, bytes, received",
  2418. size,
  2419. GNUNET_NO);
  2420. network_type = plugin->env->get_address_type (plugin->env->cls,
  2421. sa,
  2422. fromlen);
  2423. switch (sa->sa_family)
  2424. {
  2425. case AF_INET:
  2426. sa4 = (const struct sockaddr_in *) &addr;
  2427. v4.options = 0;
  2428. v4.ipv4_addr = sa4->sin_addr.s_addr;
  2429. v4.u4_port = sa4->sin_port;
  2430. int_addr = (union UdpAddress *) &v4;
  2431. int_addr_len = sizeof (v4);
  2432. break;
  2433. case AF_INET6:
  2434. sa6 = (const struct sockaddr_in6 *) &addr;
  2435. v6.options = 0;
  2436. v6.ipv6_addr = sa6->sin6_addr;
  2437. v6.u6_port = sa6->sin6_port;
  2438. int_addr = (union UdpAddress *) &v6;
  2439. int_addr_len = sizeof (v6);
  2440. break;
  2441. default:
  2442. GNUNET_break (0);
  2443. return;
  2444. }
  2445. switch (ntohs (msg->type))
  2446. {
  2447. case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
  2448. if (GNUNET_YES == plugin->enable_broadcasting_receiving)
  2449. udp_broadcast_receive (plugin,
  2450. buf,
  2451. size,
  2452. int_addr,
  2453. int_addr_len,
  2454. network_type);
  2455. return;
  2456. case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
  2457. if (ntohs (msg->size) < sizeof(struct UDPMessage))
  2458. {
  2459. GNUNET_break_op(0);
  2460. return;
  2461. }
  2462. process_udp_message (plugin,
  2463. (const struct UDPMessage *) msg,
  2464. int_addr,
  2465. int_addr_len,
  2466. network_type);
  2467. return;
  2468. case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
  2469. read_process_ack (plugin,
  2470. msg,
  2471. int_addr,
  2472. int_addr_len);
  2473. return;
  2474. case GNUNET_MESSAGE_TYPE_FRAGMENT:
  2475. read_process_fragment (plugin,
  2476. msg,
  2477. int_addr,
  2478. int_addr_len,
  2479. network_type);
  2480. return;
  2481. default:
  2482. GNUNET_break_op(0);
  2483. return;
  2484. }
  2485. }
  2486. /**
  2487. * FIXME.
  2488. */
  2489. static struct UDP_MessageWrapper *
  2490. remove_timeout_messages_and_select (struct UDP_MessageWrapper *head,
  2491. struct GNUNET_NETWORK_Handle *sock)
  2492. {
  2493. struct UDP_MessageWrapper *udpw = NULL;
  2494. struct GNUNET_TIME_Relative remaining;
  2495. struct Session *session;
  2496. struct Plugin *plugin;
  2497. int removed;
  2498. removed = GNUNET_NO;
  2499. udpw = head;
  2500. while (NULL != udpw)
  2501. {
  2502. session = udpw->session;
  2503. plugin = session->plugin;
  2504. /* Find messages with timeout */
  2505. remaining = GNUNET_TIME_absolute_get_remaining (udpw->timeout);
  2506. if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us)
  2507. {
  2508. /* Message timed out */
  2509. switch (udpw->msg_type)
  2510. {
  2511. case UMT_MSG_UNFRAGMENTED:
  2512. GNUNET_STATISTICS_update (plugin->env->stats,
  2513. "# UDP, total, bytes, sent, timeout",
  2514. udpw->msg_size,
  2515. GNUNET_NO);
  2516. GNUNET_STATISTICS_update (plugin->env->stats,
  2517. "# UDP, total, messages, sent, timeout",
  2518. 1,
  2519. GNUNET_NO);
  2520. GNUNET_STATISTICS_update (plugin->env->stats,
  2521. "# UDP, unfragmented msgs, messages, sent, timeout",
  2522. 1,
  2523. GNUNET_NO);
  2524. GNUNET_STATISTICS_update (plugin->env->stats,
  2525. "# UDP, unfragmented msgs, bytes, sent, timeout",
  2526. udpw->payload_size,
  2527. GNUNET_NO);
  2528. /* Not fragmented message */
  2529. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2530. "Message for peer `%s' with size %u timed out\n",
  2531. GNUNET_i2s (&udpw->session->target),
  2532. udpw->payload_size);
  2533. call_continuation (udpw, GNUNET_SYSERR);
  2534. /* Remove message */
  2535. removed = GNUNET_YES;
  2536. dequeue (plugin, udpw);
  2537. GNUNET_free(udpw);
  2538. break;
  2539. case UMT_MSG_FRAGMENTED:
  2540. /* Fragmented message */
  2541. GNUNET_STATISTICS_update (plugin->env->stats,
  2542. "# UDP, total, bytes, sent, timeout",
  2543. udpw->frag_ctx->on_wire_size,
  2544. GNUNET_NO);
  2545. GNUNET_STATISTICS_update (plugin->env->stats,
  2546. "# UDP, total, messages, sent, timeout",
  2547. 1,
  2548. GNUNET_NO);
  2549. call_continuation (udpw, GNUNET_SYSERR);
  2550. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2551. "Fragment for message for peer `%s' with size %u timed out\n",
  2552. GNUNET_i2s (&udpw->session->target),
  2553. udpw->frag_ctx->payload_size);
  2554. GNUNET_STATISTICS_update (plugin->env->stats,
  2555. "# UDP, fragmented msgs, messages, sent, timeout",
  2556. 1,
  2557. GNUNET_NO);
  2558. GNUNET_STATISTICS_update (plugin->env->stats,
  2559. "# UDP, fragmented msgs, bytes, sent, timeout",
  2560. udpw->frag_ctx->payload_size,
  2561. GNUNET_NO);
  2562. /* Remove fragmented message due to timeout */
  2563. fragmented_message_done (udpw->frag_ctx, GNUNET_SYSERR);
  2564. break;
  2565. case UMT_MSG_ACK:
  2566. GNUNET_STATISTICS_update (plugin->env->stats,
  2567. "# UDP, total, bytes, sent, timeout",
  2568. udpw->msg_size,
  2569. GNUNET_NO);
  2570. GNUNET_STATISTICS_update (plugin->env->stats,
  2571. "# UDP, total, messages, sent, timeout",
  2572. 1,
  2573. GNUNET_NO);
  2574. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2575. "ACK Message for peer `%s' with size %u timed out\n",
  2576. GNUNET_i2s (&udpw->session->target),
  2577. udpw->payload_size);
  2578. call_continuation (udpw, GNUNET_SYSERR);
  2579. removed = GNUNET_YES;
  2580. dequeue (plugin, udpw);
  2581. GNUNET_free(udpw);
  2582. break;
  2583. default:
  2584. break;
  2585. }
  2586. if (sock == plugin->sockv4)
  2587. udpw = plugin->ipv4_queue_head;
  2588. else if (sock == plugin->sockv6)
  2589. udpw = plugin->ipv6_queue_head;
  2590. else
  2591. {
  2592. GNUNET_break(0); /* should never happen */
  2593. udpw = NULL;
  2594. }
  2595. GNUNET_STATISTICS_update (plugin->env->stats,
  2596. "# messages discarded due to timeout",
  2597. 1,
  2598. GNUNET_NO);
  2599. }
  2600. else
  2601. {
  2602. /* Message did not time out, check flow delay */
  2603. remaining = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
  2604. if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us)
  2605. {
  2606. /* this message is not delayed */
  2607. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2608. "Message for peer `%s' (%u bytes) is not delayed \n",
  2609. GNUNET_i2s (&udpw->session->target),
  2610. udpw->payload_size);
  2611. break; /* Found message to send, break */
  2612. }
  2613. else
  2614. {
  2615. /* Message is delayed, try next */
  2616. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2617. "Message for peer `%s' (%u bytes) is delayed for %s\n",
  2618. GNUNET_i2s (&udpw->session->target), udpw->payload_size,
  2619. GNUNET_STRINGS_relative_time_to_string (remaining, GNUNET_YES));
  2620. udpw = udpw->next;
  2621. }
  2622. }
  2623. }
  2624. if (GNUNET_YES == removed)
  2625. notify_session_monitor (session->plugin,
  2626. session,
  2627. GNUNET_TRANSPORT_SS_UPDATE);
  2628. return udpw;
  2629. }
  2630. /**
  2631. * FIXME.
  2632. */
  2633. static void
  2634. analyze_send_error (struct Plugin *plugin,
  2635. const struct sockaddr *sa,
  2636. socklen_t slen, int error)
  2637. {
  2638. enum GNUNET_ATS_Network_Type type;
  2639. type = plugin->env->get_address_type (plugin->env->cls, sa, slen);
  2640. if (((GNUNET_ATS_NET_LAN == type)
  2641. || (GNUNET_ATS_NET_WAN == type))
  2642. && ((ENETUNREACH == errno)|| (ENETDOWN == errno)))
  2643. {
  2644. if (slen == sizeof (struct sockaddr_in))
  2645. {
  2646. /* IPv4: "Network unreachable" or "Network down"
  2647. *
  2648. * This indicates we do not have connectivity
  2649. */
  2650. LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
  2651. _("UDP could not transmit message to `%s': "
  2652. "Network seems down, please check your network configuration\n"),
  2653. GNUNET_a2s (sa, slen));
  2654. }
  2655. if (slen == sizeof (struct sockaddr_in6))
  2656. {
  2657. /* IPv6: "Network unreachable" or "Network down"
  2658. *
  2659. * This indicates that this system is IPv6 enabled, but does not
  2660. * have a valid global IPv6 address assigned or we do not have
  2661. * connectivity
  2662. */
  2663. LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
  2664. _("UDP could not transmit IPv6 message! "
  2665. "Please check your network configuration and disable IPv6 if your "
  2666. "connection does not have a global IPv6 address\n"));
  2667. }
  2668. }
  2669. else
  2670. {
  2671. LOG (GNUNET_ERROR_TYPE_WARNING,
  2672. "UDP could not transmit message to `%s': `%s'\n",
  2673. GNUNET_a2s (sa, slen), STRERROR (error));
  2674. }
  2675. }
  2676. /**
  2677. * FIXME.
  2678. */
  2679. static size_t
  2680. udp_select_send (struct Plugin *plugin,
  2681. struct GNUNET_NETWORK_Handle *sock)
  2682. {
  2683. ssize_t sent;
  2684. socklen_t slen;
  2685. struct sockaddr *a;
  2686. const struct IPv4UdpAddress *u4;
  2687. struct sockaddr_in a4;
  2688. const struct IPv6UdpAddress *u6;
  2689. struct sockaddr_in6 a6;
  2690. struct UDP_MessageWrapper *udpw;
  2691. /* Find message to send */
  2692. udpw = remove_timeout_messages_and_select ((sock == plugin->sockv4)
  2693. ? plugin->ipv4_queue_head
  2694. : plugin->ipv6_queue_head,
  2695. sock);
  2696. if (NULL == udpw)
  2697. return 0; /* No message to send */
  2698. if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length)
  2699. {
  2700. u4 = udpw->session->address->address;
  2701. memset (&a4, 0, sizeof(a4));
  2702. a4.sin_family = AF_INET;
  2703. #if HAVE_SOCKADDR_IN_SIN_LEN
  2704. a4.sin_len = sizeof (a4);
  2705. #endif
  2706. a4.sin_port = u4->u4_port;
  2707. memcpy (&a4.sin_addr, &u4->ipv4_addr, sizeof(struct in_addr));
  2708. a = (struct sockaddr *) &a4;
  2709. slen = sizeof (a4);
  2710. }
  2711. else if (sizeof (struct IPv6UdpAddress) == udpw->session->address->address_length)
  2712. {
  2713. u6 = udpw->session->address->address;
  2714. memset (&a6, 0, sizeof(a6));
  2715. a6.sin6_family = AF_INET6;
  2716. #if HAVE_SOCKADDR_IN_SIN_LEN
  2717. a6.sin6_len = sizeof (a6);
  2718. #endif
  2719. a6.sin6_port = u6->u6_port;
  2720. memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr));
  2721. a = (struct sockaddr *) &a6;
  2722. slen = sizeof (a6);
  2723. }
  2724. else
  2725. {
  2726. call_continuation (udpw, GNUNET_OK);
  2727. dequeue (plugin, udpw);
  2728. notify_session_monitor (plugin,
  2729. udpw->session,
  2730. GNUNET_TRANSPORT_SS_UPDATE);
  2731. GNUNET_free (udpw);
  2732. return GNUNET_SYSERR;
  2733. }
  2734. sent = GNUNET_NETWORK_socket_sendto (sock,
  2735. udpw->msg_buf,
  2736. udpw->msg_size,
  2737. a,
  2738. slen);
  2739. if (GNUNET_SYSERR == sent)
  2740. {
  2741. /* Failure */
  2742. analyze_send_error (plugin, a, slen, errno);
  2743. call_continuation (udpw, GNUNET_SYSERR);
  2744. GNUNET_STATISTICS_update (plugin->env->stats,
  2745. "# UDP, total, bytes, sent, failure", sent, GNUNET_NO);
  2746. GNUNET_STATISTICS_update (plugin->env->stats,
  2747. "# UDP, total, messages, sent, failure", 1, GNUNET_NO);
  2748. }
  2749. else
  2750. {
  2751. /* Success */
  2752. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2753. "UDP transmitted %u-byte message to `%s' `%s' (%d: %s)\n",
  2754. (unsigned int) (udpw->msg_size),
  2755. GNUNET_i2s (&udpw->session->target),
  2756. GNUNET_a2s (a, slen),
  2757. (int ) sent,
  2758. (sent < 0) ? STRERROR (errno) : "ok");
  2759. GNUNET_STATISTICS_update (plugin->env->stats,
  2760. "# UDP, total, bytes, sent, success",
  2761. sent,
  2762. GNUNET_NO);
  2763. GNUNET_STATISTICS_update (plugin->env->stats,
  2764. "# UDP, total, messages, sent, success",
  2765. 1,
  2766. GNUNET_NO);
  2767. if (NULL != udpw->frag_ctx)
  2768. udpw->frag_ctx->on_wire_size += udpw->msg_size;
  2769. call_continuation (udpw, GNUNET_OK);
  2770. }
  2771. dequeue (plugin, udpw);
  2772. notify_session_monitor (plugin,
  2773. udpw->session,
  2774. GNUNET_TRANSPORT_SS_UPDATE);
  2775. GNUNET_free(udpw);
  2776. return sent;
  2777. }
  2778. /**
  2779. * We have been notified that our readset has something to read. We don't
  2780. * know which socket needs to be read, so we have to check each one
  2781. * Then reschedule this function to be called again once more is available.
  2782. *
  2783. * @param cls the plugin handle
  2784. * @param tc the scheduling context (for rescheduling this function again)
  2785. */
  2786. static void
  2787. udp_plugin_select (void *cls,
  2788. const struct GNUNET_SCHEDULER_TaskContext *tc)
  2789. {
  2790. struct Plugin *plugin = cls;
  2791. plugin->select_task = NULL;
  2792. if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
  2793. return;
  2794. if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY))
  2795. && (NULL != plugin->sockv4)
  2796. && (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4)))
  2797. udp_select_read (plugin, plugin->sockv4);
  2798. if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY))
  2799. && (NULL != plugin->sockv4) && (NULL != plugin->ipv4_queue_head)
  2800. && (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv4)))
  2801. udp_select_send (plugin, plugin->sockv4);
  2802. schedule_select (plugin);
  2803. }
  2804. /**
  2805. * We have been notified that our readset has something to read. We don't
  2806. * know which socket needs to be read, so we have to check each one
  2807. * Then reschedule this function to be called again once more is available.
  2808. *
  2809. * @param cls the plugin handle
  2810. * @param tc the scheduling context (for rescheduling this function again)
  2811. */
  2812. static void
  2813. udp_plugin_select_v6 (void *cls,
  2814. const struct GNUNET_SCHEDULER_TaskContext *tc)
  2815. {
  2816. struct Plugin *plugin = cls;
  2817. plugin->select_task_v6 = NULL;
  2818. if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
  2819. return;
  2820. if (((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0)
  2821. && (NULL != plugin->sockv6)
  2822. && (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6)))
  2823. udp_select_read (plugin, plugin->sockv6);
  2824. if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY))
  2825. && (NULL != plugin->sockv6) && (plugin->ipv6_queue_head != NULL )&&
  2826. (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv6)) )udp_select_send (plugin, plugin->sockv6);
  2827. schedule_select (plugin);
  2828. }
  2829. /**
  2830. * Setup the UDP sockets (for IPv4 and IPv6) for the plugin.
  2831. *
  2832. * @param plugin the plugin to initialize
  2833. * @param bind_v6 IPv6 address to bind to (can be NULL, for 'any')
  2834. * @param bind_v4 IPv4 address to bind to (can be NULL, for 'any')
  2835. * @return number of sockets that were successfully bound
  2836. */
  2837. static int
  2838. setup_sockets (struct Plugin *plugin,
  2839. const struct sockaddr_in6 *bind_v6,
  2840. const struct sockaddr_in *bind_v4)
  2841. {
  2842. int tries;
  2843. int sockets_created = 0;
  2844. struct sockaddr_in6 server_addrv6;
  2845. struct sockaddr_in server_addrv4;
  2846. struct sockaddr *server_addr;
  2847. struct sockaddr *addrs[2];
  2848. socklen_t addrlens[2];
  2849. socklen_t addrlen;
  2850. int eno;
  2851. /* Create IPv6 socket */
  2852. eno = EINVAL;
  2853. if (GNUNET_YES == plugin->enable_ipv6)
  2854. {
  2855. plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0);
  2856. if (NULL == plugin->sockv6)
  2857. {
  2858. LOG(GNUNET_ERROR_TYPE_WARNING,
  2859. "Disabling IPv6 since it is not supported on this system!\n");
  2860. plugin->enable_ipv6 = GNUNET_NO;
  2861. }
  2862. else
  2863. {
  2864. memset (&server_addrv6, '\0', sizeof(struct sockaddr_in6));
  2865. #if HAVE_SOCKADDR_IN_SIN_LEN
  2866. server_addrv6.sin6_len = sizeof (struct sockaddr_in6);
  2867. #endif
  2868. server_addrv6.sin6_family = AF_INET6;
  2869. if (NULL != bind_v6)
  2870. server_addrv6.sin6_addr = bind_v6->sin6_addr;
  2871. else
  2872. server_addrv6.sin6_addr = in6addr_any;
  2873. if (0 == plugin->port) /* autodetect */
  2874. server_addrv6.sin6_port
  2875. = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537)
  2876. + 32000);
  2877. else
  2878. server_addrv6.sin6_port = htons (plugin->port);
  2879. addrlen = sizeof(struct sockaddr_in6);
  2880. server_addr = (struct sockaddr *) &server_addrv6;
  2881. tries = 0;
  2882. while (tries < 10)
  2883. {
  2884. LOG(GNUNET_ERROR_TYPE_DEBUG,
  2885. "Binding to IPv6 `%s'\n",
  2886. GNUNET_a2s (server_addr, addrlen));
  2887. /* binding */
  2888. if (GNUNET_OK ==
  2889. GNUNET_NETWORK_socket_bind (plugin->sockv6,
  2890. server_addr,
  2891. addrlen))
  2892. break;
  2893. eno = errno;
  2894. if (0 != plugin->port)
  2895. {
  2896. tries = 10; /* fail immediately */
  2897. break; /* bind failed on specific port */
  2898. }
  2899. /* autodetect */
  2900. server_addrv6.sin6_port
  2901. = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537)
  2902. + 32000);
  2903. tries++;
  2904. }
  2905. if (tries >= 10)
  2906. {
  2907. GNUNET_NETWORK_socket_close (plugin->sockv6);
  2908. plugin->enable_ipv6 = GNUNET_NO;
  2909. plugin->sockv6 = NULL;
  2910. }
  2911. else
  2912. {
  2913. plugin->port = ntohs (server_addrv6.sin6_port);
  2914. }
  2915. if (NULL != plugin->sockv6)
  2916. {
  2917. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2918. "IPv6 socket created on port %s\n",
  2919. GNUNET_a2s (server_addr, addrlen));
  2920. addrs[sockets_created] = (struct sockaddr *) &server_addrv6;
  2921. addrlens[sockets_created] = sizeof(struct sockaddr_in6);
  2922. sockets_created++;
  2923. }
  2924. else
  2925. {
  2926. LOG (GNUNET_ERROR_TYPE_ERROR,
  2927. "Failed to bind UDP socket to %s: %s\n",
  2928. GNUNET_a2s (server_addr, addrlen),
  2929. STRERROR (eno));
  2930. }
  2931. }
  2932. }
  2933. /* Create IPv4 socket */
  2934. eno = EINVAL;
  2935. plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET, SOCK_DGRAM, 0);
  2936. if (NULL == plugin->sockv4)
  2937. {
  2938. GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
  2939. "socket");
  2940. LOG (GNUNET_ERROR_TYPE_WARNING,
  2941. "Disabling IPv4 since it is not supported on this system!\n");
  2942. plugin->enable_ipv4 = GNUNET_NO;
  2943. }
  2944. else
  2945. {
  2946. memset (&server_addrv4, '\0', sizeof(struct sockaddr_in));
  2947. #if HAVE_SOCKADDR_IN_SIN_LEN
  2948. server_addrv4.sin_len = sizeof (struct sockaddr_in);
  2949. #endif
  2950. server_addrv4.sin_family = AF_INET;
  2951. if (NULL != bind_v4)
  2952. server_addrv4.sin_addr = bind_v4->sin_addr;
  2953. else
  2954. server_addrv4.sin_addr.s_addr = INADDR_ANY;
  2955. if (0 == plugin->port)
  2956. /* autodetect */
  2957. server_addrv4.sin_port
  2958. = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
  2959. 33537)
  2960. + 32000);
  2961. else
  2962. server_addrv4.sin_port = htons (plugin->port);
  2963. addrlen = sizeof(struct sockaddr_in);
  2964. server_addr = (struct sockaddr *) &server_addrv4;
  2965. tries = 0;
  2966. while (tries < 10)
  2967. {
  2968. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2969. "Binding to IPv4 `%s'\n",
  2970. GNUNET_a2s (server_addr, addrlen));
  2971. /* binding */
  2972. if (GNUNET_OK ==
  2973. GNUNET_NETWORK_socket_bind (plugin->sockv4,
  2974. server_addr,
  2975. addrlen))
  2976. break;
  2977. eno = errno;
  2978. if (0 != plugin->port)
  2979. {
  2980. tries = 10; /* fail */
  2981. break; /* bind failed on specific port */
  2982. }
  2983. /* autodetect */
  2984. server_addrv4.sin_port
  2985. = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537)
  2986. + 32000);
  2987. tries++;
  2988. }
  2989. if (tries >= 10)
  2990. {
  2991. GNUNET_NETWORK_socket_close (plugin->sockv4);
  2992. plugin->enable_ipv4 = GNUNET_NO;
  2993. plugin->sockv4 = NULL;
  2994. }
  2995. else
  2996. {
  2997. plugin->port = ntohs (server_addrv4.sin_port);
  2998. }
  2999. if (NULL != plugin->sockv4)
  3000. {
  3001. LOG (GNUNET_ERROR_TYPE_DEBUG,
  3002. "IPv4 socket created on port %s\n",
  3003. GNUNET_a2s (server_addr, addrlen));
  3004. addrs[sockets_created] = (struct sockaddr *) &server_addrv4;
  3005. addrlens[sockets_created] = sizeof(struct sockaddr_in);
  3006. sockets_created++;
  3007. }
  3008. else
  3009. {
  3010. LOG (GNUNET_ERROR_TYPE_ERROR,
  3011. _("Failed to bind UDP socket to %s: %s\n"),
  3012. GNUNET_a2s (server_addr, addrlen),
  3013. STRERROR (eno));
  3014. }
  3015. }
  3016. if (0 == sockets_created)
  3017. {
  3018. LOG (GNUNET_ERROR_TYPE_WARNING,
  3019. _("Failed to open UDP sockets\n"));
  3020. return 0; /* No sockets created, return */
  3021. }
  3022. /* Create file descriptors */
  3023. if (plugin->enable_ipv4 == GNUNET_YES)
  3024. {
  3025. plugin->rs_v4 = GNUNET_NETWORK_fdset_create ();
  3026. plugin->ws_v4 = GNUNET_NETWORK_fdset_create ();
  3027. GNUNET_NETWORK_fdset_zero (plugin->rs_v4);
  3028. GNUNET_NETWORK_fdset_zero (plugin->ws_v4);
  3029. if (NULL != plugin->sockv4)
  3030. {
  3031. GNUNET_NETWORK_fdset_set (plugin->rs_v4, plugin->sockv4);
  3032. GNUNET_NETWORK_fdset_set (plugin->ws_v4, plugin->sockv4);
  3033. }
  3034. }
  3035. if (plugin->enable_ipv6 == GNUNET_YES)
  3036. {
  3037. plugin->rs_v6 = GNUNET_NETWORK_fdset_create ();
  3038. plugin->ws_v6 = GNUNET_NETWORK_fdset_create ();
  3039. GNUNET_NETWORK_fdset_zero (plugin->rs_v6);
  3040. GNUNET_NETWORK_fdset_zero (plugin->ws_v6);
  3041. if (NULL != plugin->sockv6)
  3042. {
  3043. GNUNET_NETWORK_fdset_set (plugin->rs_v6, plugin->sockv6);
  3044. GNUNET_NETWORK_fdset_set (plugin->ws_v6, plugin->sockv6);
  3045. }
  3046. }
  3047. schedule_select (plugin);
  3048. plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
  3049. GNUNET_NO,
  3050. plugin->port,
  3051. sockets_created,
  3052. (const struct sockaddr **) addrs,
  3053. addrlens,
  3054. &udp_nat_port_map_callback,
  3055. NULL,
  3056. plugin);
  3057. return sockets_created;
  3058. }
  3059. /**
  3060. * Return information about the given session to the
  3061. * monitor callback.
  3062. *
  3063. * @param cls the `struct Plugin` with the monitor callback (`sic`)
  3064. * @param peer peer we send information about
  3065. * @param value our `struct Session` to send information about
  3066. * @return #GNUNET_OK (continue to iterate)
  3067. */
  3068. static int
  3069. send_session_info_iter (void *cls,
  3070. const struct GNUNET_PeerIdentity *peer,
  3071. void *value)
  3072. {
  3073. struct Plugin *plugin = cls;
  3074. struct Session *session = value;
  3075. notify_session_monitor (plugin,
  3076. session,
  3077. GNUNET_TRANSPORT_SS_INIT);
  3078. notify_session_monitor (plugin,
  3079. session,
  3080. GNUNET_TRANSPORT_SS_UP);
  3081. return GNUNET_OK;
  3082. }
  3083. /**
  3084. * Begin monitoring sessions of a plugin. There can only
  3085. * be one active monitor per plugin (i.e. if there are
  3086. * multiple monitors, the transport service needs to
  3087. * multiplex the generated events over all of them).
  3088. *
  3089. * @param cls closure of the plugin
  3090. * @param sic callback to invoke, NULL to disable monitor;
  3091. * plugin will being by iterating over all active
  3092. * sessions immediately and then enter monitor mode
  3093. * @param sic_cls closure for @a sic
  3094. */
  3095. static void
  3096. udp_plugin_setup_monitor (void *cls,
  3097. GNUNET_TRANSPORT_SessionInfoCallback sic,
  3098. void *sic_cls)
  3099. {
  3100. struct Plugin *plugin = cls;
  3101. plugin->sic = sic;
  3102. plugin->sic_cls = sic_cls;
  3103. if (NULL != sic)
  3104. {
  3105. GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
  3106. &send_session_info_iter,
  3107. plugin);
  3108. /* signal end of first iteration */
  3109. sic (sic_cls, NULL, NULL);
  3110. }
  3111. }
  3112. /**
  3113. * The exported method. Makes the core api available via a global and
  3114. * returns the udp transport API.
  3115. *
  3116. * @param cls our `struct GNUNET_TRANSPORT_PluginEnvironment`
  3117. * @return our `struct GNUNET_TRANSPORT_PluginFunctions`
  3118. */
  3119. void *
  3120. libgnunet_plugin_transport_udp_init (void *cls)
  3121. {
  3122. struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
  3123. struct GNUNET_TRANSPORT_PluginFunctions *api;
  3124. struct Plugin *p;
  3125. unsigned long long port;
  3126. unsigned long long aport;
  3127. unsigned long long udp_max_bps;
  3128. unsigned long long enable_v6;
  3129. unsigned long long enable_broadcasting;
  3130. unsigned long long enable_broadcasting_recv;
  3131. char *bind4_address;
  3132. char *bind6_address;
  3133. char *fancy_interval;
  3134. struct GNUNET_TIME_Relative interval;
  3135. struct sockaddr_in server_addrv4;
  3136. struct sockaddr_in6 server_addrv6;
  3137. int res;
  3138. int have_bind4;
  3139. int have_bind6;
  3140. if (NULL == env->receive)
  3141. {
  3142. /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
  3143. initialze the plugin or the API */
  3144. api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
  3145. api->cls = NULL;
  3146. api->address_pretty_printer = &udp_plugin_address_pretty_printer;
  3147. api->address_to_string = &udp_address_to_string;
  3148. api->string_to_address = &udp_string_to_address;
  3149. return api;
  3150. }
  3151. /* Get port number: port == 0 : autodetect a port,
  3152. * > 0 : use this port, not given : 2086 default */
  3153. if (GNUNET_OK !=
  3154. GNUNET_CONFIGURATION_get_value_number (env->cfg,
  3155. "transport-udp",
  3156. "PORT", &port))
  3157. port = 2086;
  3158. if (GNUNET_OK !=
  3159. GNUNET_CONFIGURATION_get_value_number (env->cfg,
  3160. "transport-udp",
  3161. "ADVERTISED_PORT", &aport))
  3162. aport = port;
  3163. if (port > 65535)
  3164. {
  3165. LOG (GNUNET_ERROR_TYPE_WARNING,
  3166. _("Given `%s' option is out of range: %llu > %u\n"),
  3167. "PORT", port,
  3168. 65535);
  3169. return NULL;
  3170. }
  3171. /* Protocols */
  3172. if (GNUNET_YES ==
  3173. GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "nat", "DISABLEV6"))
  3174. enable_v6 = GNUNET_NO;
  3175. else
  3176. enable_v6 = GNUNET_YES;
  3177. /* Addresses */
  3178. have_bind4 = GNUNET_NO;
  3179. memset (&server_addrv4, 0, sizeof(server_addrv4));
  3180. if (GNUNET_YES ==
  3181. GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
  3182. "BINDTO", &bind4_address))
  3183. {
  3184. LOG (GNUNET_ERROR_TYPE_DEBUG,
  3185. "Binding udp plugin to specific address: `%s'\n",
  3186. bind4_address);
  3187. if (1 != inet_pton (AF_INET,
  3188. bind4_address,
  3189. &server_addrv4.sin_addr))
  3190. {
  3191. GNUNET_free (bind4_address);
  3192. return NULL;
  3193. }
  3194. have_bind4 = GNUNET_YES;
  3195. }
  3196. GNUNET_free_non_null(bind4_address);
  3197. have_bind6 = GNUNET_NO;
  3198. memset (&server_addrv6, 0, sizeof(server_addrv6));
  3199. if (GNUNET_YES ==
  3200. GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
  3201. "BINDTO6", &bind6_address))
  3202. {
  3203. LOG (GNUNET_ERROR_TYPE_DEBUG,
  3204. "Binding udp plugin to specific address: `%s'\n",
  3205. bind6_address);
  3206. if (1 != inet_pton (AF_INET6,
  3207. bind6_address,
  3208. &server_addrv6.sin6_addr))
  3209. {
  3210. LOG (GNUNET_ERROR_TYPE_ERROR,
  3211. _("Invalid IPv6 address: `%s'\n"),
  3212. bind6_address);
  3213. GNUNET_free (bind6_address);
  3214. return NULL;
  3215. }
  3216. have_bind6 = GNUNET_YES;
  3217. }
  3218. GNUNET_free_non_null (bind6_address);
  3219. /* Enable neighbour discovery */
  3220. enable_broadcasting = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
  3221. "transport-udp", "BROADCAST");
  3222. if (enable_broadcasting == GNUNET_SYSERR)
  3223. enable_broadcasting = GNUNET_NO;
  3224. enable_broadcasting_recv = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
  3225. "transport-udp", "BROADCAST_RECEIVE");
  3226. if (enable_broadcasting_recv == GNUNET_SYSERR)
  3227. enable_broadcasting_recv = GNUNET_YES;
  3228. if (GNUNET_SYSERR ==
  3229. GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
  3230. "BROADCAST_INTERVAL",
  3231. &fancy_interval))
  3232. {
  3233. interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
  3234. }
  3235. else
  3236. {
  3237. if (GNUNET_SYSERR ==
  3238. GNUNET_STRINGS_fancy_time_to_relative (fancy_interval, &interval))
  3239. {
  3240. interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30);
  3241. }
  3242. GNUNET_free(fancy_interval);
  3243. }
  3244. /* Maximum datarate */
  3245. if (GNUNET_OK !=
  3246. GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
  3247. "MAX_BPS", &udp_max_bps))
  3248. {
  3249. udp_max_bps = 1024 * 1024 * 50; /* 50 MB/s == infinity for practical purposes */
  3250. }
  3251. p = GNUNET_new (struct Plugin);
  3252. p->port = port;
  3253. p->aport = aport;
  3254. p->broadcast_interval = interval;
  3255. p->enable_ipv6 = enable_v6;
  3256. p->enable_ipv4 = GNUNET_YES; /* default */
  3257. p->enable_broadcasting = enable_broadcasting;
  3258. p->enable_broadcasting_receiving = enable_broadcasting_recv;
  3259. p->env = env;
  3260. p->sessions = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
  3261. p->defrag_ctxs = GNUNET_CONTAINER_heap_create (
  3262. GNUNET_CONTAINER_HEAP_ORDER_MIN);
  3263. p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages,
  3264. p);
  3265. GNUNET_BANDWIDTH_tracker_init (&p->tracker,
  3266. NULL,
  3267. NULL,
  3268. GNUNET_BANDWIDTH_value_init ((uint32_t) udp_max_bps),
  3269. 30);
  3270. LOG(GNUNET_ERROR_TYPE_DEBUG,
  3271. "Setting up sockets\n");
  3272. res = setup_sockets (p,
  3273. (GNUNET_YES == have_bind6) ? &server_addrv6 : NULL,
  3274. (GNUNET_YES == have_bind4) ? &server_addrv4 : NULL);
  3275. if ((res == 0) || ((p->sockv4 == NULL )&& (p->sockv6 == NULL)))
  3276. {
  3277. LOG (GNUNET_ERROR_TYPE_ERROR,
  3278. _("Failed to create network sockets, plugin failed\n"));
  3279. GNUNET_CONTAINER_multipeermap_destroy (p->sessions);
  3280. GNUNET_CONTAINER_heap_destroy (p->defrag_ctxs);
  3281. GNUNET_SERVER_mst_destroy (p->mst);
  3282. GNUNET_free (p);
  3283. return NULL;
  3284. }
  3285. /* Setup broadcasting and receiving beacons */
  3286. setup_broadcast (p, &server_addrv6, &server_addrv4);
  3287. api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
  3288. api->cls = p;
  3289. api->send = NULL;
  3290. api->disconnect_session = &udp_disconnect_session;
  3291. api->query_keepalive_factor = &udp_query_keepalive_factor;
  3292. api->disconnect_peer = &udp_disconnect;
  3293. api->address_pretty_printer = &udp_plugin_address_pretty_printer;
  3294. api->address_to_string = &udp_address_to_string;
  3295. api->string_to_address = &udp_string_to_address;
  3296. api->check_address = &udp_plugin_check_address;
  3297. api->get_session = &udp_plugin_get_session;
  3298. api->send = &udp_plugin_send;
  3299. api->get_network = &udp_get_network;
  3300. api->update_session_timeout = &udp_plugin_update_session_timeout;
  3301. api->setup_monitor = &udp_plugin_setup_monitor;
  3302. return api;
  3303. }
  3304. /**
  3305. * Function called on each entry in the defragmentation heap to
  3306. * clean it up.
  3307. *
  3308. * @param cls NULL
  3309. * @param node node in the heap (to be removed)
  3310. * @param element a `struct DefragContext` to be cleaned up
  3311. * @param cost unused
  3312. * @return #GNUNET_YES
  3313. */
  3314. static int
  3315. heap_cleanup_iterator (void *cls,
  3316. struct GNUNET_CONTAINER_HeapNode *node,
  3317. void *element,
  3318. GNUNET_CONTAINER_HeapCostType cost)
  3319. {
  3320. struct DefragContext *d_ctx = element;
  3321. GNUNET_CONTAINER_heap_remove_node (node);
  3322. GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
  3323. GNUNET_free (d_ctx);
  3324. return GNUNET_YES;
  3325. }
  3326. /**
  3327. * The exported method. Makes the core api available via a global and
  3328. * returns the udp transport API.
  3329. *
  3330. * @param cls our `struct GNUNET_TRANSPORT_PluginEnvironment`
  3331. * @return NULL
  3332. */
  3333. void *
  3334. libgnunet_plugin_transport_udp_done (void *cls)
  3335. {
  3336. struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
  3337. struct Plugin *plugin = api->cls;
  3338. struct PrettyPrinterContext *cur;
  3339. struct PrettyPrinterContext *next;
  3340. struct UDP_MessageWrapper *udpw;
  3341. if (NULL == plugin)
  3342. {
  3343. GNUNET_free(api);
  3344. return NULL;
  3345. }
  3346. stop_broadcast (plugin);
  3347. if (plugin->select_task != NULL)
  3348. {
  3349. GNUNET_SCHEDULER_cancel (plugin->select_task);
  3350. plugin->select_task = NULL;
  3351. }
  3352. if (plugin->select_task_v6 != NULL)
  3353. {
  3354. GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
  3355. plugin->select_task_v6 = NULL;
  3356. }
  3357. /* Closing sockets */
  3358. if (GNUNET_YES == plugin->enable_ipv4)
  3359. {
  3360. if (NULL != plugin->sockv4)
  3361. {
  3362. GNUNET_break (GNUNET_OK ==
  3363. GNUNET_NETWORK_socket_close (plugin->sockv4));
  3364. plugin->sockv4 = NULL;
  3365. }
  3366. GNUNET_NETWORK_fdset_destroy (plugin->rs_v4);
  3367. GNUNET_NETWORK_fdset_destroy (plugin->ws_v4);
  3368. }
  3369. if (GNUNET_YES == plugin->enable_ipv6)
  3370. {
  3371. if (NULL != plugin->sockv6)
  3372. {
  3373. GNUNET_break (GNUNET_OK ==
  3374. GNUNET_NETWORK_socket_close (plugin->sockv6));
  3375. plugin->sockv6 = NULL;
  3376. GNUNET_NETWORK_fdset_destroy (plugin->rs_v6);
  3377. GNUNET_NETWORK_fdset_destroy (plugin->ws_v6);
  3378. }
  3379. }
  3380. if (NULL != plugin->nat)
  3381. {
  3382. GNUNET_NAT_unregister (plugin->nat);
  3383. plugin->nat = NULL;
  3384. }
  3385. if (NULL != plugin->defrag_ctxs)
  3386. {
  3387. GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
  3388. &heap_cleanup_iterator, NULL);
  3389. GNUNET_CONTAINER_heap_destroy (plugin->defrag_ctxs);
  3390. plugin->defrag_ctxs = NULL;
  3391. }
  3392. if (NULL != plugin->mst)
  3393. {
  3394. GNUNET_SERVER_mst_destroy (plugin->mst);
  3395. plugin->mst = NULL;
  3396. }
  3397. /* Clean up leftover messages */
  3398. udpw = plugin->ipv4_queue_head;
  3399. while (NULL != udpw)
  3400. {
  3401. struct UDP_MessageWrapper *tmp = udpw->next;
  3402. dequeue (plugin, udpw);
  3403. call_continuation (udpw, GNUNET_SYSERR);
  3404. GNUNET_free(udpw);
  3405. udpw = tmp;
  3406. }
  3407. udpw = plugin->ipv6_queue_head;
  3408. while (NULL != udpw)
  3409. {
  3410. struct UDP_MessageWrapper *tmp = udpw->next;
  3411. dequeue (plugin, udpw);
  3412. call_continuation (udpw, GNUNET_SYSERR);
  3413. GNUNET_free(udpw);
  3414. udpw = tmp;
  3415. }
  3416. /* Clean up sessions */
  3417. LOG (GNUNET_ERROR_TYPE_DEBUG,
  3418. "Cleaning up sessions\n");
  3419. GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
  3420. &disconnect_and_free_it, plugin);
  3421. GNUNET_CONTAINER_multipeermap_destroy (plugin->sessions);
  3422. next = plugin->ppc_dll_head;
  3423. for (cur = next; NULL != cur; cur = next)
  3424. {
  3425. GNUNET_break(0);
  3426. next = cur->next;
  3427. GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
  3428. plugin->ppc_dll_tail,
  3429. cur);
  3430. GNUNET_RESOLVER_request_cancel (cur->resolver_handle);
  3431. GNUNET_free (cur);
  3432. }
  3433. GNUNET_free (plugin);
  3434. GNUNET_free (api);
  3435. return NULL;
  3436. }
  3437. /* end of plugin_transport_udp.c */