gnunet-communicator-unix.c 28 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186
  1. /*
  2. This file is part of GNUnet
  3. Copyright (C) 2010-2014, 2018 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file transport/gnunet-communicator-unix.c
  18. * @brief Transport plugin using unix domain sockets (!)
  19. * Clearly, can only be used locally on Unix/Linux hosts...
  20. * ONLY INTENDED FOR TESTING!!!
  21. * @author Christian Grothoff
  22. * @author Nathan Evans
  23. */
  24. #include "platform.h"
  25. #include "gnunet_util_lib.h"
  26. #include "gnunet_protocols.h"
  27. #include "gnunet_constants.h"
  28. #include "gnunet_nt_lib.h"
  29. #include "gnunet_statistics_service.h"
  30. #include "gnunet_transport_communication_service.h"
  31. /**
  32. * How many messages do we keep at most in the queue to the
  33. * transport service before we start to drop (default,
  34. * can be changed via the configuration file).
  35. * Should be _below_ the level of the communicator API, as
  36. * otherwise we may read messages just to have them dropped
  37. * by the communicator API.
  38. */
  39. #define DEFAULT_MAX_QUEUE_LENGTH 8
  40. /**
  41. * Address prefix used by the communicator.
  42. */
  43. #define COMMUNICATOR_ADDRESS_PREFIX "unix"
  44. /**
  45. * Configuration section used by the communicator.
  46. */
  47. #define COMMUNICATOR_CONFIG_SECTION "communicator-unix"
  48. /**
  49. * Our MTU.
  50. */
  51. #define UNIX_MTU UINT16_MAX
  52. GNUNET_NETWORK_STRUCT_BEGIN
  53. /**
  54. * UNIX Message-Packet header.
  55. */
  56. struct UNIXMessage
  57. {
  58. /**
  59. * Message header.
  60. */
  61. struct GNUNET_MessageHeader header;
  62. /**
  63. * What is the identity of the sender (GNUNET_hash of public key)
  64. */
  65. struct GNUNET_PeerIdentity sender;
  66. };
  67. GNUNET_NETWORK_STRUCT_END
  68. /**
  69. * Handle for a queue.
  70. */
  71. struct Queue
  72. {
  73. /**
  74. * Queues with pending messages (!) are kept in a DLL.
  75. */
  76. struct Queue *next;
  77. /**
  78. * Queues with pending messages (!) are kept in a DLL.
  79. */
  80. struct Queue *prev;
  81. /**
  82. * To whom are we talking to.
  83. */
  84. struct GNUNET_PeerIdentity target;
  85. /**
  86. * Address of the other peer.
  87. */
  88. struct sockaddr_un *address;
  89. /**
  90. * Length of the address.
  91. */
  92. socklen_t address_len;
  93. /**
  94. * Message currently scheduled for transmission, non-NULL if and only
  95. * if this queue is in the #queue_head DLL.
  96. */
  97. const struct GNUNET_MessageHeader *msg;
  98. /**
  99. * Message queue we are providing for the #ch.
  100. */
  101. struct GNUNET_MQ_Handle *mq;
  102. /**
  103. * handle for this queue with the #ch.
  104. */
  105. struct GNUNET_TRANSPORT_QueueHandle *qh;
  106. /**
  107. * Number of bytes we currently have in our write queue.
  108. */
  109. unsigned long long bytes_in_queue;
  110. /**
  111. * Timeout for this queue.
  112. */
  113. struct GNUNET_TIME_Absolute timeout;
  114. /**
  115. * Queue timeout task.
  116. */
  117. struct GNUNET_SCHEDULER_Task *timeout_task;
  118. };
  119. /**
  120. * ID of read task
  121. */
  122. static struct GNUNET_SCHEDULER_Task *read_task;
  123. /**
  124. * ID of write task
  125. */
  126. static struct GNUNET_SCHEDULER_Task *write_task;
  127. /**
  128. * Number of messages we currently have in our queues towards the transport service.
  129. */
  130. static unsigned long long delivering_messages;
  131. /**
  132. * Maximum queue length before we stop reading towards the transport service.
  133. */
  134. static unsigned long long max_queue_length;
  135. /**
  136. * For logging statistics.
  137. */
  138. static struct GNUNET_STATISTICS_Handle *stats;
  139. /**
  140. * Our environment.
  141. */
  142. static struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
  143. /**
  144. * Queues (map from peer identity to `struct Queue`)
  145. */
  146. static struct GNUNET_CONTAINER_MultiPeerMap *queue_map;
  147. /**
  148. * Head of queue of messages to transmit.
  149. */
  150. static struct Queue *queue_head;
  151. /**
  152. * Tail of queue of messages to transmit.
  153. */
  154. static struct Queue *queue_tail;
  155. /**
  156. * socket that we transmit all data with
  157. */
  158. static struct GNUNET_NETWORK_Handle *unix_sock;
  159. /**
  160. * Handle to the operation that publishes our address.
  161. */
  162. static struct GNUNET_TRANSPORT_AddressIdentifier *ai;
  163. /**
  164. * Functions with this signature are called whenever we need
  165. * to close a queue due to a disconnect or failure to
  166. * establish a connection.
  167. *
  168. * @param queue queue to close down
  169. */
  170. static void
  171. queue_destroy (struct Queue *queue)
  172. {
  173. struct GNUNET_MQ_Handle *mq;
  174. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  175. "Disconnecting queue for peer `%s'\n",
  176. GNUNET_i2s (&queue->target));
  177. if (0 != queue->bytes_in_queue)
  178. {
  179. GNUNET_CONTAINER_DLL_remove (queue_head,
  180. queue_tail,
  181. queue);
  182. queue->bytes_in_queue = 0;
  183. }
  184. if (NULL != (mq = queue->mq))
  185. {
  186. queue->mq = NULL;
  187. GNUNET_MQ_destroy (mq);
  188. }
  189. GNUNET_assert (GNUNET_YES ==
  190. GNUNET_CONTAINER_multipeermap_remove (queue_map,
  191. &queue->target,
  192. queue));
  193. GNUNET_STATISTICS_set (stats,
  194. "# queues active",
  195. GNUNET_CONTAINER_multipeermap_size (queue_map),
  196. GNUNET_NO);
  197. if (NULL != queue->timeout_task)
  198. {
  199. GNUNET_SCHEDULER_cancel (queue->timeout_task);
  200. queue->timeout_task = NULL;
  201. }
  202. GNUNET_free (queue->address);
  203. GNUNET_free (queue);
  204. }
  205. /**
  206. * Queue was idle for too long, so disconnect it
  207. *
  208. * @param cls the `struct Queue *` to disconnect
  209. */
  210. static void
  211. queue_timeout (void *cls)
  212. {
  213. struct Queue *queue = cls;
  214. struct GNUNET_TIME_Relative left;
  215. queue->timeout_task = NULL;
  216. left = GNUNET_TIME_absolute_get_remaining (queue->timeout);
  217. if (0 != left.rel_value_us)
  218. {
  219. /* not actually our turn yet, but let's at least update
  220. the monitor, it may think we're about to die ... */
  221. queue->timeout_task
  222. = GNUNET_SCHEDULER_add_delayed (left,
  223. &queue_timeout,
  224. queue);
  225. return;
  226. }
  227. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  228. "Queue %p was idle for %s, disconnecting\n",
  229. queue,
  230. GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
  231. GNUNET_YES));
  232. queue_destroy (queue);
  233. }
  234. /**
  235. * Increment queue timeout due to activity. We do not immediately
  236. * notify the monitor here as that might generate excessive
  237. * signalling.
  238. *
  239. * @param queue queue for which the timeout should be rescheduled
  240. */
  241. static void
  242. reschedule_queue_timeout (struct Queue *queue)
  243. {
  244. GNUNET_assert (NULL != queue->timeout_task);
  245. queue->timeout
  246. = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
  247. }
  248. /**
  249. * Convert unix path to a `struct sockaddr_un *`
  250. *
  251. * @param unixpath path to convert
  252. * @param[out] sock_len set to the length of the address
  253. * @param is_abstract is this an abstract @a unixpath
  254. * @return converted unix path
  255. */
  256. static struct sockaddr_un *
  257. unix_address_to_sockaddr (const char *unixpath,
  258. socklen_t *sock_len)
  259. {
  260. struct sockaddr_un *un;
  261. size_t slen;
  262. GNUNET_assert (0 < strlen (unixpath)); /* sanity check */
  263. un = GNUNET_new (struct sockaddr_un);
  264. un->sun_family = AF_UNIX;
  265. slen = strlen (unixpath);
  266. if (slen >= sizeof (un->sun_path))
  267. slen = sizeof (un->sun_path) - 1;
  268. GNUNET_memcpy (un->sun_path,
  269. unixpath,
  270. slen);
  271. un->sun_path[slen] = '\0';
  272. slen = sizeof (struct sockaddr_un);
  273. #if HAVE_SOCKADDR_UN_SUN_LEN
  274. un->sun_len = (u_char) slen;
  275. #endif
  276. (*sock_len) = slen;
  277. if ('@' == un->sun_path[0])
  278. un->sun_path[0] = '\0';
  279. return un;
  280. }
  281. /**
  282. * Closure to #lookup_queue_it().
  283. */
  284. struct LookupCtx
  285. {
  286. /**
  287. * Location to store the queue, if found.
  288. */
  289. struct Queue *res;
  290. /**
  291. * Address we are looking for.
  292. */
  293. const struct sockaddr_un *un;
  294. /**
  295. * Number of bytes in @a un
  296. */
  297. socklen_t un_len;
  298. };
  299. /**
  300. * Function called to find a queue by address.
  301. *
  302. * @param cls the `struct LookupCtx *`
  303. * @param key peer we are looking for (unused)
  304. * @param value a queue
  305. * @return #GNUNET_YES if not found (continue looking), #GNUNET_NO on success
  306. */
  307. static int
  308. lookup_queue_it (void *cls,
  309. const struct GNUNET_PeerIdentity *key,
  310. void *value)
  311. {
  312. struct LookupCtx *lctx = cls;
  313. struct Queue *queue = value;
  314. if ( (queue->address_len = lctx->un_len) &&
  315. (0 == memcmp (lctx->un,
  316. queue->address,
  317. queue->address_len)) )
  318. {
  319. lctx->res = queue;
  320. return GNUNET_NO;
  321. }
  322. return GNUNET_YES;
  323. }
  324. /**
  325. * Find an existing queue by address.
  326. *
  327. * @param plugin the plugin
  328. * @param address the address to find
  329. * @return NULL if queue was not found
  330. */
  331. static struct Queue *
  332. lookup_queue (const struct GNUNET_PeerIdentity *peer,
  333. const struct sockaddr_un *un,
  334. socklen_t un_len)
  335. {
  336. struct LookupCtx lctx;
  337. lctx.un = un;
  338. lctx.un_len = un_len;
  339. GNUNET_CONTAINER_multipeermap_get_multiple (queue_map,
  340. peer,
  341. &lookup_queue_it,
  342. &lctx);
  343. return lctx.res;
  344. }
  345. /**
  346. * We have been notified that our socket is ready to write.
  347. * Then reschedule this function to be called again once more is available.
  348. *
  349. * @param cls NULL
  350. */
  351. static void
  352. select_write_cb (void *cls)
  353. {
  354. struct Queue *queue = queue_tail;
  355. const struct GNUNET_MessageHeader *msg = queue->msg;
  356. size_t msg_size = ntohs (msg->size);
  357. ssize_t sent;
  358. /* take queue of the ready list */
  359. write_task = NULL;
  360. GNUNET_CONTAINER_DLL_remove (queue_head,
  361. queue_tail,
  362. queue);
  363. if (NULL != queue_head)
  364. write_task =
  365. GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
  366. unix_sock,
  367. &select_write_cb,
  368. NULL);
  369. /* send 'msg' */
  370. queue->msg = NULL;
  371. GNUNET_MQ_impl_send_continue (queue->mq);
  372. resend:
  373. /* Send the data */
  374. sent = GNUNET_NETWORK_socket_sendto (unix_sock,
  375. queue->msg,
  376. msg_size,
  377. (const struct sockaddr *) queue->address,
  378. queue->address_len);
  379. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  380. "UNIX transmitted message to %s (%d/%u: %s)\n",
  381. GNUNET_i2s (&queue->target),
  382. (int) sent,
  383. (unsigned int) msg_size,
  384. (sent < 0) ? STRERROR (errno) : "ok");
  385. if (-1 != sent)
  386. {
  387. GNUNET_STATISTICS_update (stats,
  388. "# bytes sent",
  389. (long long) sent,
  390. GNUNET_NO);
  391. reschedule_queue_timeout (queue);
  392. return; /* all good */
  393. }
  394. GNUNET_STATISTICS_update (stats,
  395. "# network transmission failures",
  396. 1,
  397. GNUNET_NO);
  398. switch (errno)
  399. {
  400. case EAGAIN:
  401. case ENOBUFS:
  402. /* We should retry later... */
  403. GNUNET_log_strerror (GNUNET_ERROR_TYPE_DEBUG,
  404. "send");
  405. return;
  406. case EMSGSIZE:
  407. {
  408. socklen_t size = 0;
  409. socklen_t len = sizeof (size);
  410. GNUNET_NETWORK_socket_getsockopt (unix_sock,
  411. SOL_SOCKET,
  412. SO_SNDBUF,
  413. &size,
  414. &len);
  415. if (size > ntohs (msg->size))
  416. {
  417. /* Buffer is bigger than message: error, no retry
  418. * This should never happen!*/
  419. GNUNET_break (0);
  420. return;
  421. }
  422. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  423. "Trying to increase socket buffer size from %u to %u for message size %u\n",
  424. (unsigned int) size,
  425. (unsigned int) ((msg_size / 1000) + 2) * 1000,
  426. (unsigned int) msg_size);
  427. size = ((msg_size / 1000) + 2) * 1000;
  428. if (GNUNET_OK ==
  429. GNUNET_NETWORK_socket_setsockopt (unix_sock,
  430. SOL_SOCKET,
  431. SO_SNDBUF,
  432. &size,
  433. sizeof (size)))
  434. goto resend; /* Increased buffer size, retry sending */
  435. /* Ok, then just try very modest increase */
  436. size = msg_size;
  437. if (GNUNET_OK ==
  438. GNUNET_NETWORK_socket_setsockopt (unix_sock,
  439. SOL_SOCKET,
  440. SO_SNDBUF,
  441. &size,
  442. sizeof (size)))
  443. goto resend; /* Increased buffer size, retry sending */
  444. /* Could not increase buffer size: error, no retry */
  445. GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
  446. "setsockopt");
  447. return;
  448. }
  449. default:
  450. GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
  451. "send");
  452. return;
  453. }
  454. }
  455. /**
  456. * Signature of functions implementing the sending functionality of a
  457. * message queue.
  458. *
  459. * @param mq the message queue
  460. * @param msg the message to send
  461. * @param impl_state our `struct Queue`
  462. */
  463. static void
  464. mq_send (struct GNUNET_MQ_Handle *mq,
  465. const struct GNUNET_MessageHeader *msg,
  466. void *impl_state)
  467. {
  468. struct Queue *queue = impl_state;
  469. GNUNET_assert (mq == queue->mq);
  470. GNUNET_assert (NULL == queue->msg);
  471. queue->msg = msg;
  472. GNUNET_CONTAINER_DLL_insert (queue_head,
  473. queue_tail,
  474. queue);
  475. GNUNET_assert (NULL != unix_sock);
  476. if (NULL == write_task)
  477. write_task =
  478. GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
  479. unix_sock,
  480. &select_write_cb,
  481. NULL);
  482. }
  483. /**
  484. * Signature of functions implementing the destruction of a message
  485. * queue. Implementations must not free @a mq, but should take care
  486. * of @a impl_state.
  487. *
  488. * @param mq the message queue to destroy
  489. * @param impl_state our `struct Queue`
  490. */
  491. static void
  492. mq_destroy (struct GNUNET_MQ_Handle *mq,
  493. void *impl_state)
  494. {
  495. struct Queue *queue = impl_state;
  496. if (mq == queue->mq)
  497. {
  498. queue->mq = NULL;
  499. queue_destroy (queue);
  500. }
  501. }
  502. /**
  503. * Implementation function that cancels the currently sent message.
  504. *
  505. * @param mq message queue
  506. * @param impl_state our `struct Queue`
  507. */
  508. static void
  509. mq_cancel (struct GNUNET_MQ_Handle *mq,
  510. void *impl_state)
  511. {
  512. struct Queue *queue = impl_state;
  513. GNUNET_assert (NULL != queue->msg);
  514. queue->msg = NULL;
  515. GNUNET_CONTAINER_DLL_remove (queue_head,
  516. queue_tail,
  517. queue);
  518. GNUNET_assert (NULL != write_task);
  519. if (NULL == queue_head)
  520. {
  521. GNUNET_SCHEDULER_cancel (write_task);
  522. write_task = NULL;
  523. }
  524. }
  525. /**
  526. * Generic error handler, called with the appropriate
  527. * error code and the same closure specified at the creation of
  528. * the message queue.
  529. * Not every message queue implementation supports an error handler.
  530. *
  531. * @param cls our `struct Queue`
  532. * @param error error code
  533. */
  534. static void
  535. mq_error (void *cls,
  536. enum GNUNET_MQ_Error error)
  537. {
  538. struct Queue *queue = cls;
  539. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  540. "UNIX MQ error in queue to %s: %d\n",
  541. GNUNET_i2s (&queue->target),
  542. (int) error);
  543. queue_destroy (queue);
  544. }
  545. /**
  546. * Creates a new outbound queue the transport service will use to send
  547. * data to another peer.
  548. *
  549. * @param peer the target peer
  550. * @param cs inbound or outbound queue
  551. * @param un the address
  552. * @param un_len number of bytes in @a un
  553. * @return the queue or NULL of max connections exceeded
  554. */
  555. static struct Queue *
  556. setup_queue (const struct GNUNET_PeerIdentity *target,
  557. enum GNUNET_TRANSPORT_ConnectionStatus cs,
  558. const struct sockaddr_un *un,
  559. socklen_t un_len)
  560. {
  561. struct Queue *queue;
  562. queue = GNUNET_new (struct Queue);
  563. queue->target = *target;
  564. queue->address = GNUNET_memdup (un,
  565. un_len);
  566. queue->address_len = un_len;
  567. (void) GNUNET_CONTAINER_multipeermap_put (queue_map,
  568. &queue->target,
  569. queue,
  570. GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
  571. GNUNET_STATISTICS_set (stats,
  572. "# queues active",
  573. GNUNET_CONTAINER_multipeermap_size (queue_map),
  574. GNUNET_NO);
  575. queue->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
  576. queue->timeout_task
  577. = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
  578. &queue_timeout,
  579. queue);
  580. queue->mq
  581. = GNUNET_MQ_queue_for_callbacks (&mq_send,
  582. &mq_destroy,
  583. &mq_cancel,
  584. queue,
  585. NULL,
  586. &mq_error,
  587. queue);
  588. {
  589. char *foreign_addr;
  590. if ('\0' == un->sun_path[0])
  591. GNUNET_asprintf (&foreign_addr,
  592. "%s-@%s",
  593. COMMUNICATOR_ADDRESS_PREFIX,
  594. &un->sun_path[1]);
  595. else
  596. GNUNET_asprintf (&foreign_addr,
  597. "%s-%s",
  598. COMMUNICATOR_ADDRESS_PREFIX,
  599. un->sun_path);
  600. queue->qh
  601. = GNUNET_TRANSPORT_communicator_mq_add (ch,
  602. &queue->target,
  603. foreign_addr,
  604. UNIX_MTU,
  605. GNUNET_NT_LOOPBACK,
  606. cs,
  607. queue->mq);
  608. GNUNET_free (foreign_addr);
  609. }
  610. return queue;
  611. }
  612. /**
  613. * We have been notified that our socket has something to read. Do the
  614. * read and reschedule this function to be called again once more is
  615. * available.
  616. *
  617. * @param cls NULL
  618. */
  619. static void
  620. select_read_cb (void *cls);
  621. /**
  622. * Function called when message was successfully passed to
  623. * transport service. Continue read activity.
  624. *
  625. * @param cls NULL
  626. * @param success #GNUNET_OK on success
  627. */
  628. static void
  629. receive_complete_cb (void *cls,
  630. int success)
  631. {
  632. delivering_messages--;
  633. if (GNUNET_OK != success)
  634. GNUNET_STATISTICS_update (stats,
  635. "# transport transmission failures",
  636. 1,
  637. GNUNET_NO);
  638. GNUNET_assert (NULL != unix_sock);
  639. if ( (NULL == read_task) &&
  640. (delivering_messages < max_queue_length) )
  641. read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
  642. unix_sock,
  643. &select_read_cb,
  644. NULL);
  645. }
  646. /**
  647. * We have been notified that our socket has something to read. Do the
  648. * read and reschedule this function to be called again once more is
  649. * available.
  650. *
  651. * @param cls NULL
  652. */
  653. static void
  654. select_read_cb (void *cls)
  655. {
  656. char buf[65536] GNUNET_ALIGN;
  657. struct Queue *queue;
  658. const struct UNIXMessage *msg;
  659. struct sockaddr_un un;
  660. socklen_t addrlen;
  661. ssize_t ret;
  662. uint16_t msize;
  663. GNUNET_assert (NULL != unix_sock);
  664. read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
  665. unix_sock,
  666. &select_read_cb,
  667. NULL);
  668. addrlen = sizeof (un);
  669. memset (&un,
  670. 0,
  671. sizeof (un));
  672. ret = GNUNET_NETWORK_socket_recvfrom (unix_sock,
  673. buf,
  674. sizeof (buf),
  675. (struct sockaddr *) &un,
  676. &addrlen);
  677. if ( (-1 == ret) &&
  678. ( (EAGAIN == errno) ||
  679. (ENOBUFS == errno) ) )
  680. return;
  681. if (-1 == ret)
  682. {
  683. GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
  684. "recvfrom");
  685. return;
  686. }
  687. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  688. "Read %d bytes from socket %s\n",
  689. (int) ret,
  690. un.sun_path);
  691. GNUNET_assert (AF_UNIX == (un.sun_family));
  692. msg = (struct UNIXMessage *) buf;
  693. msize = ntohs (msg->header.size);
  694. if ( (msize < sizeof (struct UNIXMessage)) ||
  695. (msize > ret) )
  696. {
  697. GNUNET_break_op (0);
  698. return;
  699. }
  700. queue = lookup_queue (&msg->sender,
  701. &un,
  702. addrlen);
  703. if (NULL == queue)
  704. queue = setup_queue (&msg->sender,
  705. GNUNET_TRANSPORT_CS_INBOUND,
  706. &un,
  707. addrlen);
  708. else
  709. reschedule_queue_timeout (queue);
  710. if (NULL == queue)
  711. {
  712. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  713. _("Maximum number of UNIX connections exceeded, dropping incoming message\n"));
  714. return;
  715. }
  716. {
  717. uint16_t offset = 0;
  718. uint16_t tsize = msize - sizeof (struct UNIXMessage);
  719. const char *msgbuf = (const char *) &msg[1];
  720. while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize)
  721. {
  722. const struct GNUNET_MessageHeader *currhdr;
  723. struct GNUNET_MessageHeader al_hdr;
  724. uint16_t csize;
  725. currhdr = (const struct GNUNET_MessageHeader *) &msgbuf[offset];
  726. /* ensure aligned access */
  727. memcpy (&al_hdr,
  728. currhdr,
  729. sizeof (al_hdr));
  730. csize = ntohs (al_hdr.size);
  731. if ( (csize < sizeof (struct GNUNET_MessageHeader)) ||
  732. (csize > tsize - offset))
  733. {
  734. GNUNET_break_op (0);
  735. break;
  736. }
  737. ret = GNUNET_TRANSPORT_communicator_receive (ch,
  738. &msg->sender,
  739. currhdr,
  740. &receive_complete_cb,
  741. NULL);
  742. if (GNUNET_SYSERR == ret)
  743. return; /* transport not up */
  744. if (GNUNET_NO == ret)
  745. break;
  746. delivering_messages++;
  747. offset += csize;
  748. }
  749. }
  750. if (delivering_messages >= max_queue_length)
  751. {
  752. /* we should try to apply 'back pressure' */
  753. GNUNET_SCHEDULER_cancel (read_task);
  754. read_task = NULL;
  755. }
  756. }
  757. /**
  758. * Function called by the transport service to initialize a
  759. * message queue given address information about another peer.
  760. * If and when the communication channel is established, the
  761. * communicator must call #GNUNET_TRANSPORT_communicator_mq_add()
  762. * to notify the service that the channel is now up. It is
  763. * the responsibility of the communicator to manage sane
  764. * retries and timeouts for any @a peer/@a address combination
  765. * provided by the transport service. Timeouts and retries
  766. * do not need to be signalled to the transport service.
  767. *
  768. * @param cls closure
  769. * @param peer identity of the other peer
  770. * @param address where to send the message, human-readable
  771. * communicator-specific format, 0-terminated, UTF-8
  772. * @return #GNUNET_OK on success, #GNUNET_SYSERR if the provided address is invalid
  773. */
  774. static int
  775. mq_init (void *cls,
  776. const struct GNUNET_PeerIdentity *peer,
  777. const char *address)
  778. {
  779. struct Queue *queue;
  780. const char *path;
  781. struct sockaddr_un *un;
  782. socklen_t un_len;
  783. if (0 != strncmp (address,
  784. COMMUNICATOR_ADDRESS_PREFIX "-",
  785. strlen (COMMUNICATOR_ADDRESS_PREFIX "-")))
  786. {
  787. GNUNET_break_op (0);
  788. return GNUNET_SYSERR;
  789. }
  790. path = &address[strlen (COMMUNICATOR_ADDRESS_PREFIX "-")];
  791. un = unix_address_to_sockaddr (path,
  792. &un_len);
  793. queue = lookup_queue (peer,
  794. un,
  795. un_len);
  796. if (NULL != queue)
  797. {
  798. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  799. "Address `%s' for %s ignored, queue exists\n",
  800. path,
  801. GNUNET_i2s (peer));
  802. GNUNET_free (un);
  803. return GNUNET_OK;
  804. }
  805. queue = setup_queue (peer,
  806. GNUNET_TRANSPORT_CS_OUTBOUND,
  807. un,
  808. un_len);
  809. GNUNET_free (un);
  810. if (NULL == queue)
  811. {
  812. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  813. "Failed to setup queue to %s at `%s'\n",
  814. GNUNET_i2s (peer),
  815. path);
  816. return GNUNET_NO;
  817. }
  818. return GNUNET_OK;
  819. }
  820. /**
  821. * Iterator over all message queues to clean up.
  822. *
  823. * @param cls NULL
  824. * @param target unused
  825. * @param value the queue to destroy
  826. * @return #GNUNET_OK to continue to iterate
  827. */
  828. static int
  829. get_queue_delete_it (void *cls,
  830. const struct GNUNET_PeerIdentity *target,
  831. void *value)
  832. {
  833. struct Queue *queue = value;
  834. (void) cls;
  835. (void) target;
  836. queue_destroy (queue);
  837. return GNUNET_OK;
  838. }
  839. /**
  840. * Shutdown the UNIX communicator.
  841. *
  842. * @param cls NULL (always)
  843. */
  844. static void
  845. do_shutdown (void *cls)
  846. {
  847. if (NULL != read_task)
  848. {
  849. GNUNET_SCHEDULER_cancel (read_task);
  850. read_task = NULL;
  851. }
  852. if (NULL != write_task)
  853. {
  854. GNUNET_SCHEDULER_cancel (write_task);
  855. write_task = NULL;
  856. }
  857. if (NULL != unix_sock)
  858. {
  859. GNUNET_break (GNUNET_OK ==
  860. GNUNET_NETWORK_socket_close (unix_sock));
  861. unix_sock = NULL;
  862. }
  863. GNUNET_CONTAINER_multipeermap_iterate (queue_map,
  864. &get_queue_delete_it,
  865. NULL);
  866. GNUNET_CONTAINER_multipeermap_destroy (queue_map);
  867. if (NULL != ai)
  868. {
  869. GNUNET_TRANSPORT_communicator_address_remove (ai);
  870. ai = NULL;
  871. }
  872. if (NULL != ch)
  873. {
  874. GNUNET_TRANSPORT_communicator_disconnect (ch);
  875. ch = NULL;
  876. }
  877. if (NULL != stats)
  878. {
  879. GNUNET_STATISTICS_destroy (stats,
  880. GNUNET_NO);
  881. stats = NULL;
  882. }
  883. }
  884. /**
  885. * Function called when the transport service has received an
  886. * acknowledgement for this communicator (!) via a different return
  887. * path.
  888. *
  889. * Not applicable for UNIX.
  890. *
  891. * @param cls closure
  892. * @param sender which peer sent the notification
  893. * @param msg payload
  894. */
  895. static void
  896. enc_notify_cb (void *cls,
  897. const struct GNUNET_PeerIdentity *sender,
  898. const struct GNUNET_MessageHeader *msg)
  899. {
  900. (void) cls;
  901. (void) sender;
  902. (void) msg;
  903. GNUNET_break_op (0);
  904. }
  905. /**
  906. * Setup communicator and launch network interactions.
  907. *
  908. * @param cls NULL (always)
  909. * @param args remaining command-line arguments
  910. * @param cfgfile name of the configuration file used (for saving, can be NULL!)
  911. * @param cfg configuration
  912. */
  913. static void
  914. run (void *cls,
  915. char *const *args,
  916. const char *cfgfile,
  917. const struct GNUNET_CONFIGURATION_Handle *cfg)
  918. {
  919. char *unix_socket_path;
  920. struct sockaddr_un *un;
  921. socklen_t un_len;
  922. char *my_addr;
  923. (void) cls;
  924. if (GNUNET_OK !=
  925. GNUNET_CONFIGURATION_get_value_filename (cfg,
  926. COMMUNICATOR_CONFIG_SECTION,
  927. "UNIXPATH",
  928. &unix_socket_path))
  929. {
  930. GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
  931. COMMUNICATOR_CONFIG_SECTION,
  932. "UNIXPATH");
  933. return;
  934. }
  935. if (GNUNET_OK !=
  936. GNUNET_CONFIGURATION_get_value_number (cfg,
  937. COMMUNICATOR_CONFIG_SECTION,
  938. "MAX_QUEUE_LENGTH",
  939. &max_queue_length))
  940. max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
  941. un = unix_address_to_sockaddr (unix_socket_path,
  942. &un_len);
  943. if (NULL == un)
  944. {
  945. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  946. "Failed to setup UNIX domain socket address with path `%s'\n",
  947. unix_socket_path);
  948. GNUNET_free (unix_socket_path);
  949. return;
  950. }
  951. unix_sock = GNUNET_NETWORK_socket_create (AF_UNIX,
  952. SOCK_DGRAM,
  953. 0);
  954. if (NULL == unix_sock)
  955. {
  956. GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
  957. "socket");
  958. GNUNET_free (un);
  959. GNUNET_free (unix_socket_path);
  960. return;
  961. }
  962. if ( ('\0' != un->sun_path[0]) &&
  963. (GNUNET_OK !=
  964. GNUNET_DISK_directory_create_for_file (un->sun_path)) )
  965. {
  966. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  967. _("Cannot create path to `%s'\n"),
  968. un->sun_path);
  969. GNUNET_NETWORK_socket_close (unix_sock);
  970. unix_sock = NULL;
  971. GNUNET_free (un);
  972. GNUNET_free (unix_socket_path);
  973. return;
  974. }
  975. if (GNUNET_OK !=
  976. GNUNET_NETWORK_socket_bind (unix_sock,
  977. (const struct sockaddr *) un,
  978. un_len))
  979. {
  980. GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_ERROR,
  981. "bind",
  982. un->sun_path);
  983. GNUNET_NETWORK_socket_close (unix_sock);
  984. unix_sock = NULL;
  985. GNUNET_free (un);
  986. GNUNET_free (unix_socket_path);
  987. return;
  988. }
  989. GNUNET_free (un);
  990. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  991. "Bound to `%s'\n",
  992. unix_socket_path);
  993. stats = GNUNET_STATISTICS_create ("C-UNIX",
  994. cfg);
  995. GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
  996. NULL);
  997. read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
  998. unix_sock,
  999. &select_read_cb,
  1000. NULL);
  1001. queue_map = GNUNET_CONTAINER_multipeermap_create (10,
  1002. GNUNET_NO);
  1003. ch = GNUNET_TRANSPORT_communicator_connect (cfg,
  1004. COMMUNICATOR_CONFIG_SECTION,
  1005. COMMUNICATOR_ADDRESS_PREFIX,
  1006. GNUNET_TRANSPORT_CC_RELIABLE,
  1007. &mq_init,
  1008. NULL,
  1009. &enc_notify_cb,
  1010. NULL);
  1011. if (NULL == ch)
  1012. {
  1013. GNUNET_break (0);
  1014. GNUNET_SCHEDULER_shutdown ();
  1015. GNUNET_free (unix_socket_path);
  1016. return;
  1017. }
  1018. GNUNET_asprintf (&my_addr,
  1019. "%s-%s",
  1020. COMMUNICATOR_ADDRESS_PREFIX,
  1021. unix_socket_path);
  1022. GNUNET_free (unix_socket_path);
  1023. ai = GNUNET_TRANSPORT_communicator_address_add (ch,
  1024. my_addr,
  1025. GNUNET_NT_LOOPBACK,
  1026. GNUNET_TIME_UNIT_FOREVER_REL);
  1027. GNUNET_free (my_addr);
  1028. }
  1029. /**
  1030. * The main function for the UNIX communicator.
  1031. *
  1032. * @param argc number of arguments from the command line
  1033. * @param argv command line arguments
  1034. * @return 0 ok, 1 on error
  1035. */
  1036. int
  1037. main (int argc,
  1038. char *const *argv)
  1039. {
  1040. static const struct GNUNET_GETOPT_CommandLineOption options[] = {
  1041. GNUNET_GETOPT_OPTION_END
  1042. };
  1043. int ret;
  1044. if (GNUNET_OK !=
  1045. GNUNET_STRINGS_get_utf8_args (argc, argv,
  1046. &argc, &argv))
  1047. return 2;
  1048. ret =
  1049. (GNUNET_OK ==
  1050. GNUNET_PROGRAM_run (argc, argv,
  1051. "gnunet-communicator-unix",
  1052. _("GNUnet UNIX domain socket communicator"),
  1053. options,
  1054. &run,
  1055. NULL)) ? 0 : 1;
  1056. GNUNET_free ((void*) argv);
  1057. return ret;
  1058. }
  1059. #if defined(LINUX) && defined(__GLIBC__)
  1060. #include <malloc.h>
  1061. /**
  1062. * MINIMIZE heap size (way below 128k) since this process doesn't need much.
  1063. */
  1064. void __attribute__ ((constructor))
  1065. GNUNET_ARM_memory_init ()
  1066. {
  1067. mallopt (M_TRIM_THRESHOLD, 4 * 1024);
  1068. mallopt (M_TOP_PAD, 1 * 1024);
  1069. malloc_trim (0);
  1070. }
  1071. #endif
  1072. /* end of gnunet-communicator-unix.c */