gnunet-communicator-unix.c 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167
  1. /*
  2. This file is part of GNUnet
  3. Copyright (C) 2010-2014, 2018 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file transport/gnunet-communicator-unix.c
  18. * @brief Transport plugin using unix domain sockets (!)
  19. * Clearly, can only be used locally on Unix/Linux hosts...
  20. * ONLY INTENDED FOR TESTING!!!
  21. * @author Christian Grothoff
  22. * @author Nathan Evans
  23. */
  24. #include "platform.h"
  25. #include "gnunet_util_lib.h"
  26. #include "gnunet_protocols.h"
  27. #include "gnunet_constants.h"
  28. #include "gnunet_nt_lib.h"
  29. #include "gnunet_statistics_service.h"
  30. #include "gnunet_transport_communication_service.h"
  31. /**
  32. * How many messages do we keep at most in the queue to the
  33. * transport service before we start to drop (default,
  34. * can be changed via the configuration file).
  35. * Should be _below_ the level of the communicator API, as
  36. * otherwise we may read messages just to have them dropped
  37. * by the communicator API.
  38. */
  39. #define DEFAULT_MAX_QUEUE_LENGTH 8000
  40. /**
  41. * Address prefix used by the communicator.
  42. */
  43. #define COMMUNICATOR_ADDRESS_PREFIX "unix"
  44. /**
  45. * Configuration section used by the communicator.
  46. */
  47. #define COMMUNICATOR_CONFIG_SECTION "communicator-unix"
  48. /**
  49. * Our MTU.
  50. */
  51. #ifndef DARWIN
  52. #define UNIX_MTU UINT16_MAX
  53. #else
  54. #define UNIX_MTU 2048
  55. #endif
  56. GNUNET_NETWORK_STRUCT_BEGIN
  57. /**
  58. * UNIX Message-Packet header.
  59. */
  60. struct UNIXMessage
  61. {
  62. /**
  63. * Message header.
  64. */
  65. struct GNUNET_MessageHeader header;
  66. /**
  67. * What is the identity of the sender (GNUNET_hash of public key)
  68. */
  69. struct GNUNET_PeerIdentity sender;
  70. };
  71. GNUNET_NETWORK_STRUCT_END
  72. /**
  73. * Handle for a queue.
  74. */
  75. struct Queue
  76. {
  77. /**
  78. * Queues with pending messages (!) are kept in a DLL.
  79. */
  80. struct Queue *next;
  81. /**
  82. * Queues with pending messages (!) are kept in a DLL.
  83. */
  84. struct Queue *prev;
  85. /**
  86. * To whom are we talking to.
  87. */
  88. struct GNUNET_PeerIdentity target;
  89. /**
  90. * Address of the other peer.
  91. */
  92. struct sockaddr_un *address;
  93. /**
  94. * Length of the address.
  95. */
  96. socklen_t address_len;
  97. /**
  98. * Message currently scheduled for transmission, non-NULL if and only
  99. * if this queue is in the #queue_head DLL.
  100. */
  101. struct UNIXMessage *msg;
  102. /**
  103. * Message queue we are providing for the #ch.
  104. */
  105. struct GNUNET_MQ_Handle *mq;
  106. /**
  107. * handle for this queue with the #ch.
  108. */
  109. struct GNUNET_TRANSPORT_QueueHandle *qh;
  110. /**
  111. * Number of bytes we currently have in our write queue.
  112. */
  113. unsigned long long bytes_in_queue;
  114. /**
  115. * Timeout for this queue.
  116. */
  117. struct GNUNET_TIME_Absolute timeout;
  118. /**
  119. * Queue timeout task.
  120. */
  121. struct GNUNET_SCHEDULER_Task *timeout_task;
  122. };
  123. /**
  124. * My Peer Identity
  125. */
  126. static struct GNUNET_PeerIdentity my_identity;
  127. /**
  128. * ID of read task
  129. */
  130. static struct GNUNET_SCHEDULER_Task *read_task;
  131. /**
  132. * ID of write task
  133. */
  134. static struct GNUNET_SCHEDULER_Task *write_task;
  135. /**
  136. * Number of messages we currently have in our queues towards the transport service.
  137. */
  138. static unsigned long long delivering_messages;
  139. /**
  140. * Maximum queue length before we stop reading towards the transport service.
  141. */
  142. static unsigned long long max_queue_length;
  143. /**
  144. * For logging statistics.
  145. */
  146. static struct GNUNET_STATISTICS_Handle *stats;
  147. /**
  148. * Our environment.
  149. */
  150. static struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
  151. /**
  152. * Queues (map from peer identity to `struct Queue`)
  153. */
  154. static struct GNUNET_CONTAINER_MultiPeerMap *queue_map;
  155. /**
  156. * Head of queue of messages to transmit.
  157. */
  158. static struct Queue *queue_head;
  159. /**
  160. * Tail of queue of messages to transmit.
  161. */
  162. static struct Queue *queue_tail;
  163. /**
  164. * socket that we transmit all data with
  165. */
  166. static struct GNUNET_NETWORK_Handle *unix_sock;
  167. /**
  168. * Handle to the operation that publishes our address.
  169. */
  170. static struct GNUNET_TRANSPORT_AddressIdentifier *ai;
  171. /**
  172. * Functions with this signature are called whenever we need
  173. * to close a queue due to a disconnect or failure to
  174. * establish a connection.
  175. *
  176. * @param queue queue to close down
  177. */
  178. static void
  179. queue_destroy (struct Queue *queue)
  180. {
  181. struct GNUNET_MQ_Handle *mq;
  182. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  183. "Disconnecting queue for peer `%s'\n",
  184. GNUNET_i2s (&queue->target));
  185. if (0 != queue->bytes_in_queue)
  186. {
  187. GNUNET_CONTAINER_DLL_remove (queue_head, queue_tail, queue);
  188. queue->bytes_in_queue = 0;
  189. }
  190. if (NULL != (mq = queue->mq))
  191. {
  192. queue->mq = NULL;
  193. GNUNET_MQ_destroy (mq);
  194. }
  195. GNUNET_assert (
  196. GNUNET_YES ==
  197. GNUNET_CONTAINER_multipeermap_remove (queue_map, &queue->target, queue));
  198. GNUNET_STATISTICS_set (stats,
  199. "# queues active",
  200. GNUNET_CONTAINER_multipeermap_size (queue_map),
  201. GNUNET_NO);
  202. if (NULL != queue->timeout_task)
  203. {
  204. GNUNET_SCHEDULER_cancel (queue->timeout_task);
  205. queue->timeout_task = NULL;
  206. }
  207. GNUNET_free (queue->address);
  208. GNUNET_free (queue);
  209. }
  210. /**
  211. * Queue was idle for too long, so disconnect it
  212. *
  213. * @param cls the `struct Queue *` to disconnect
  214. */
  215. static void
  216. queue_timeout (void *cls)
  217. {
  218. struct Queue *queue = cls;
  219. struct GNUNET_TIME_Relative left;
  220. queue->timeout_task = NULL;
  221. left = GNUNET_TIME_absolute_get_remaining (queue->timeout);
  222. if (0 != left.rel_value_us)
  223. {
  224. /* not actually our turn yet, but let's at least update
  225. the monitor, it may think we're about to die ... */
  226. queue->timeout_task =
  227. GNUNET_SCHEDULER_add_delayed (left, &queue_timeout, queue);
  228. return;
  229. }
  230. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  231. "Queue %p was idle for %s, disconnecting\n",
  232. queue,
  233. GNUNET_STRINGS_relative_time_to_string (
  234. GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
  235. GNUNET_YES));
  236. queue_destroy (queue);
  237. }
  238. /**
  239. * Increment queue timeout due to activity. We do not immediately
  240. * notify the monitor here as that might generate excessive
  241. * signalling.
  242. *
  243. * @param queue queue for which the timeout should be rescheduled
  244. */
  245. static void
  246. reschedule_queue_timeout (struct Queue *queue)
  247. {
  248. GNUNET_assert (NULL != queue->timeout_task);
  249. queue->timeout =
  250. GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
  251. }
  252. /**
  253. * Convert unix path to a `struct sockaddr_un *`
  254. *
  255. * @param unixpath path to convert
  256. * @param[out] sock_len set to the length of the address
  257. * @param is_abstract is this an abstract @a unixpath
  258. * @return converted unix path
  259. */
  260. static struct sockaddr_un *
  261. unix_address_to_sockaddr (const char *unixpath, socklen_t *sock_len)
  262. {
  263. struct sockaddr_un *un;
  264. size_t slen;
  265. GNUNET_assert (0 < strlen (unixpath)); /* sanity check */
  266. un = GNUNET_new (struct sockaddr_un);
  267. un->sun_family = AF_UNIX;
  268. slen = strlen (unixpath);
  269. if (slen >= sizeof(un->sun_path))
  270. slen = sizeof(un->sun_path) - 1;
  271. GNUNET_memcpy (un->sun_path, unixpath, slen);
  272. un->sun_path[slen] = '\0';
  273. slen = sizeof(struct sockaddr_un);
  274. #if HAVE_SOCKADDR_UN_SUN_LEN
  275. un->sun_len = (u_char) slen;
  276. #endif
  277. (*sock_len) = slen;
  278. if ('@' == un->sun_path[0])
  279. un->sun_path[0] = '\0';
  280. return un;
  281. }
  282. /**
  283. * Closure to #lookup_queue_it().
  284. */
  285. struct LookupCtx
  286. {
  287. /**
  288. * Location to store the queue, if found.
  289. */
  290. struct Queue *res;
  291. /**
  292. * Address we are looking for.
  293. */
  294. const struct sockaddr_un *un;
  295. /**
  296. * Number of bytes in @a un
  297. */
  298. socklen_t un_len;
  299. };
  300. /**
  301. * Function called to find a queue by address.
  302. *
  303. * @param cls the `struct LookupCtx *`
  304. * @param key peer we are looking for (unused)
  305. * @param value a queue
  306. * @return #GNUNET_YES if not found (continue looking), #GNUNET_NO on success
  307. */
  308. static int
  309. lookup_queue_it (void *cls, const struct GNUNET_PeerIdentity *key, void *value)
  310. {
  311. struct LookupCtx *lctx = cls;
  312. struct Queue *queue = value;
  313. if ((queue->address_len == lctx->un_len) &&
  314. (0 == memcmp (lctx->un, queue->address, queue->address_len)))
  315. {
  316. lctx->res = queue;
  317. return GNUNET_NO;
  318. }
  319. return GNUNET_YES;
  320. }
  321. /**
  322. * Find an existing queue by address.
  323. *
  324. * @param plugin the plugin
  325. * @param address the address to find
  326. * @return NULL if queue was not found
  327. */
  328. static struct Queue *
  329. lookup_queue (const struct GNUNET_PeerIdentity *peer,
  330. const struct sockaddr_un *un,
  331. socklen_t un_len)
  332. {
  333. struct LookupCtx lctx;
  334. lctx.un = un;
  335. lctx.un_len = un_len;
  336. lctx.res = NULL;
  337. GNUNET_CONTAINER_multipeermap_get_multiple (queue_map,
  338. peer,
  339. &lookup_queue_it,
  340. &lctx);
  341. return lctx.res;
  342. }
  343. /**
  344. * We have been notified that our socket is ready to write.
  345. * Then reschedule this function to be called again once more is available.
  346. *
  347. * @param cls NULL
  348. */
  349. static void
  350. select_write_cb (void *cls)
  351. {
  352. struct Queue *queue = queue_tail;
  353. const struct GNUNET_MessageHeader *msg = &queue->msg->header;
  354. size_t msg_size = ntohs (msg->size);
  355. ssize_t sent;
  356. /* take queue of the ready list */
  357. write_task = NULL;
  358. resend:
  359. /* Send the data */
  360. sent = GNUNET_NETWORK_socket_sendto (unix_sock,
  361. msg,
  362. msg_size,
  363. (const struct sockaddr *) queue->address,
  364. queue->address_len);
  365. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  366. "UNIX transmitted message to %s (%d/%u: %s)\n",
  367. GNUNET_i2s (&queue->target),
  368. (int) sent,
  369. (unsigned int) msg_size,
  370. (sent < 0) ? strerror (errno) : "ok");
  371. if (-1 != sent)
  372. {
  373. GNUNET_CONTAINER_DLL_remove (queue_head, queue_tail, queue);
  374. if (NULL != queue_head)
  375. write_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
  376. unix_sock,
  377. &select_write_cb,
  378. NULL);
  379. /* send 'msg' */
  380. GNUNET_free (queue->msg);
  381. queue->msg = NULL;
  382. GNUNET_MQ_impl_send_continue (queue->mq);
  383. GNUNET_STATISTICS_update (stats,
  384. "# bytes sent",
  385. (long long) sent,
  386. GNUNET_NO);
  387. reschedule_queue_timeout (queue);
  388. return; /* all good */
  389. }
  390. GNUNET_STATISTICS_update (stats,
  391. "# network transmission failures",
  392. 1,
  393. GNUNET_NO);
  394. write_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
  395. unix_sock,
  396. &select_write_cb,
  397. NULL);
  398. switch (errno)
  399. {
  400. case EAGAIN:
  401. case ENOBUFS:
  402. /* We should retry later... */
  403. GNUNET_log_strerror (GNUNET_ERROR_TYPE_DEBUG, "send");
  404. return;
  405. case EMSGSIZE: {
  406. socklen_t size = 0;
  407. socklen_t len = sizeof(size);
  408. GNUNET_NETWORK_socket_getsockopt (unix_sock,
  409. SOL_SOCKET,
  410. SO_SNDBUF,
  411. &size,
  412. &len);
  413. if (size > ntohs (msg->size))
  414. {
  415. /* Buffer is bigger than message: error, no retry
  416. * This should never happen!*/
  417. GNUNET_break (0);
  418. return;
  419. }
  420. GNUNET_log (
  421. GNUNET_ERROR_TYPE_WARNING,
  422. "Trying to increase socket buffer size from %u to %u for message size %u\n",
  423. (unsigned int) size,
  424. (unsigned int) ((msg_size / 1000) + 2) * 1000,
  425. (unsigned int) msg_size);
  426. size = ((msg_size / 1000) + 2) * 1000;
  427. if (GNUNET_OK == GNUNET_NETWORK_socket_setsockopt (unix_sock,
  428. SOL_SOCKET,
  429. SO_SNDBUF,
  430. &size,
  431. sizeof(size)))
  432. goto resend; /* Increased buffer size, retry sending */
  433. /* Ok, then just try very modest increase */
  434. size = msg_size;
  435. if (GNUNET_OK == GNUNET_NETWORK_socket_setsockopt (unix_sock,
  436. SOL_SOCKET,
  437. SO_SNDBUF,
  438. &size,
  439. sizeof(size)))
  440. goto resend; /* Increased buffer size, retry sending */
  441. /* Could not increase buffer size: error, no retry */
  442. GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "setsockopt");
  443. return;
  444. }
  445. default:
  446. GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "send");
  447. return;
  448. }
  449. }
  450. /**
  451. * Signature of functions implementing the sending functionality of a
  452. * message queue.
  453. *
  454. * @param mq the message queue
  455. * @param msg the message to send
  456. * @param impl_state our `struct Queue`
  457. */
  458. static void
  459. mq_send (struct GNUNET_MQ_Handle *mq,
  460. const struct GNUNET_MessageHeader *msg,
  461. void *impl_state)
  462. {
  463. struct Queue *queue = impl_state;
  464. size_t msize = ntohs (msg->size);
  465. GNUNET_assert (mq == queue->mq);
  466. GNUNET_assert (NULL == queue->msg);
  467. // Convert to UNIXMessage
  468. queue->msg = GNUNET_malloc (msize + sizeof (struct UNIXMessage));
  469. queue->msg->header.size = htons (msize + sizeof (struct UNIXMessage));
  470. queue->msg->sender = my_identity;
  471. memcpy (&queue->msg[1], msg, msize);
  472. GNUNET_CONTAINER_DLL_insert (queue_head, queue_tail, queue);
  473. GNUNET_assert (NULL != unix_sock);
  474. if (NULL == write_task)
  475. write_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
  476. unix_sock,
  477. &select_write_cb,
  478. NULL);
  479. }
  480. /**
  481. * Signature of functions implementing the destruction of a message
  482. * queue. Implementations must not free @a mq, but should take care
  483. * of @a impl_state.
  484. *
  485. * @param mq the message queue to destroy
  486. * @param impl_state our `struct Queue`
  487. */
  488. static void
  489. mq_destroy (struct GNUNET_MQ_Handle *mq, void *impl_state)
  490. {
  491. struct Queue *queue = impl_state;
  492. if (mq == queue->mq)
  493. {
  494. queue->mq = NULL;
  495. queue_destroy (queue);
  496. }
  497. }
  498. /**
  499. * Implementation function that cancels the currently sent message.
  500. *
  501. * @param mq message queue
  502. * @param impl_state our `struct Queue`
  503. */
  504. static void
  505. mq_cancel (struct GNUNET_MQ_Handle *mq, void *impl_state)
  506. {
  507. struct Queue *queue = impl_state;
  508. GNUNET_assert (NULL != queue->msg);
  509. queue->msg = NULL;
  510. GNUNET_CONTAINER_DLL_remove (queue_head, queue_tail, queue);
  511. GNUNET_assert (NULL != write_task);
  512. if (NULL == queue_head)
  513. {
  514. GNUNET_SCHEDULER_cancel (write_task);
  515. write_task = NULL;
  516. }
  517. }
  518. /**
  519. * Generic error handler, called with the appropriate
  520. * error code and the same closure specified at the creation of
  521. * the message queue.
  522. * Not every message queue implementation supports an error handler.
  523. *
  524. * @param cls our `struct Queue`
  525. * @param error error code
  526. */
  527. static void
  528. mq_error (void *cls, enum GNUNET_MQ_Error error)
  529. {
  530. struct Queue *queue = cls;
  531. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  532. "UNIX MQ error in queue to %s: %d\n",
  533. GNUNET_i2s (&queue->target),
  534. (int) error);
  535. queue_destroy (queue);
  536. }
  537. /**
  538. * Creates a new outbound queue the transport service will use to send
  539. * data to another peer.
  540. *
  541. * @param peer the target peer
  542. * @param cs inbound or outbound queue
  543. * @param un the address
  544. * @param un_len number of bytes in @a un
  545. * @return the queue or NULL of max connections exceeded
  546. */
  547. static struct Queue *
  548. setup_queue (const struct GNUNET_PeerIdentity *target,
  549. enum GNUNET_TRANSPORT_ConnectionStatus cs,
  550. const struct sockaddr_un *un,
  551. socklen_t un_len)
  552. {
  553. struct Queue *queue;
  554. queue = GNUNET_new (struct Queue);
  555. queue->target = *target;
  556. queue->address = GNUNET_memdup (un, un_len);
  557. queue->address_len = un_len;
  558. (void) GNUNET_CONTAINER_multipeermap_put (
  559. queue_map,
  560. &queue->target,
  561. queue,
  562. GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
  563. GNUNET_STATISTICS_set (stats,
  564. "# queues active",
  565. GNUNET_CONTAINER_multipeermap_size (queue_map),
  566. GNUNET_NO);
  567. queue->timeout =
  568. GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
  569. queue->timeout_task =
  570. GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
  571. &queue_timeout,
  572. queue);
  573. queue->mq = GNUNET_MQ_queue_for_callbacks (&mq_send,
  574. &mq_destroy,
  575. &mq_cancel,
  576. queue,
  577. NULL,
  578. &mq_error,
  579. queue);
  580. {
  581. char *foreign_addr;
  582. if ('\0' == un->sun_path[0])
  583. GNUNET_asprintf (&foreign_addr,
  584. "%s-@%s",
  585. COMMUNICATOR_ADDRESS_PREFIX,
  586. &un->sun_path[1]);
  587. else
  588. GNUNET_asprintf (&foreign_addr,
  589. "%s-%s",
  590. COMMUNICATOR_ADDRESS_PREFIX,
  591. un->sun_path);
  592. queue->qh = GNUNET_TRANSPORT_communicator_mq_add (ch,
  593. &queue->target,
  594. foreign_addr,
  595. UNIX_MTU - sizeof (struct
  596. UNIXMessage),
  597. GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED,
  598. 0,
  599. GNUNET_NT_LOOPBACK,
  600. cs,
  601. queue->mq);
  602. GNUNET_free (foreign_addr);
  603. }
  604. return queue;
  605. }
  606. /**
  607. * We have been notified that our socket has something to read. Do the
  608. * read and reschedule this function to be called again once more is
  609. * available.
  610. *
  611. * @param cls NULL
  612. */
  613. static void
  614. select_read_cb (void *cls);
  615. /**
  616. * Function called when message was successfully passed to
  617. * transport service. Continue read activity.
  618. *
  619. * @param cls NULL
  620. * @param success #GNUNET_OK on success
  621. */
  622. static void
  623. receive_complete_cb (void *cls, int success)
  624. {
  625. (void) cls;
  626. delivering_messages--;
  627. if (GNUNET_OK != success)
  628. GNUNET_STATISTICS_update (stats,
  629. "# transport transmission failures",
  630. 1,
  631. GNUNET_NO);
  632. if ((NULL == read_task) && (delivering_messages < max_queue_length) &&
  633. (NULL != unix_sock))
  634. read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
  635. unix_sock,
  636. &select_read_cb,
  637. NULL);
  638. }
  639. /**
  640. * We have been notified that our socket has something to read. Do the
  641. * read and reschedule this function to be called again once more is
  642. * available.
  643. *
  644. * @param cls NULL
  645. */
  646. static void
  647. select_read_cb (void *cls)
  648. {
  649. char buf[65536] GNUNET_ALIGN;
  650. struct Queue *queue;
  651. const struct UNIXMessage *msg;
  652. struct sockaddr_un un;
  653. socklen_t addrlen;
  654. ssize_t ret;
  655. uint16_t msize;
  656. GNUNET_assert (NULL != unix_sock);
  657. read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
  658. unix_sock,
  659. &select_read_cb,
  660. NULL);
  661. addrlen = sizeof(un);
  662. memset (&un, 0, sizeof(un));
  663. ret = GNUNET_NETWORK_socket_recvfrom (unix_sock,
  664. buf,
  665. sizeof(buf),
  666. (struct sockaddr *) &un,
  667. &addrlen);
  668. if ((-1 == ret) && ((EAGAIN == errno) || (ENOBUFS == errno)))
  669. return;
  670. if (-1 == ret)
  671. {
  672. GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "recvfrom");
  673. return;
  674. }
  675. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  676. "Read %d bytes from socket %s\n",
  677. (int) ret,
  678. un.sun_path);
  679. GNUNET_assert (AF_UNIX == (un.sun_family));
  680. msg = (struct UNIXMessage *) buf;
  681. msize = ntohs (msg->header.size);
  682. if ((msize < sizeof(struct UNIXMessage)) || (msize > ret))
  683. {
  684. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  685. "Wrong message size: %d bytes\n",
  686. msize);
  687. GNUNET_break_op (0);
  688. return;
  689. }
  690. queue = lookup_queue (&msg->sender, &un, addrlen);
  691. if (NULL == queue)
  692. queue =
  693. setup_queue (&msg->sender, GNUNET_TRANSPORT_CS_INBOUND, &un, addrlen);
  694. else
  695. reschedule_queue_timeout (queue);
  696. if (NULL == queue)
  697. {
  698. GNUNET_log (
  699. GNUNET_ERROR_TYPE_ERROR,
  700. _ (
  701. "Maximum number of UNIX connections exceeded, dropping incoming message\n"));
  702. return;
  703. }
  704. {
  705. uint16_t tsize = msize - sizeof(struct UNIXMessage);
  706. const struct GNUNET_MessageHeader *currhdr;
  707. struct GNUNET_MessageHeader al_hdr;
  708. currhdr = (const struct GNUNET_MessageHeader *) &msg[1];
  709. /* ensure aligned access */
  710. memcpy (&al_hdr, currhdr, sizeof(al_hdr));
  711. if ((tsize < sizeof(struct GNUNET_MessageHeader)) ||
  712. (tsize != ntohs (al_hdr.size)))
  713. {
  714. GNUNET_break_op (0);
  715. return;
  716. }
  717. ret = GNUNET_TRANSPORT_communicator_receive (ch,
  718. &msg->sender,
  719. currhdr,
  720. GNUNET_TIME_UNIT_FOREVER_REL,
  721. &receive_complete_cb,
  722. NULL);
  723. if (GNUNET_SYSERR == ret)
  724. {
  725. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  726. "Transport not up!\n");
  727. return; /* transport not up */
  728. }
  729. if (GNUNET_NO == ret)
  730. {
  731. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  732. "Error sending message to transport\n");
  733. return;
  734. }
  735. delivering_messages++;
  736. }
  737. if (delivering_messages >= max_queue_length)
  738. {
  739. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  740. "Back pressure %llu\n", delivering_messages);
  741. /* we should try to apply 'back pressure' */
  742. GNUNET_SCHEDULER_cancel (read_task);
  743. read_task = NULL;
  744. }
  745. }
  746. /**
  747. * Function called by the transport service to initialize a
  748. * message queue given address information about another peer.
  749. * If and when the communication channel is established, the
  750. * communicator must call #GNUNET_TRANSPORT_communicator_mq_add()
  751. * to notify the service that the channel is now up. It is
  752. * the responsibility of the communicator to manage sane
  753. * retries and timeouts for any @a peer/@a address combination
  754. * provided by the transport service. Timeouts and retries
  755. * do not need to be signalled to the transport service.
  756. *
  757. * @param cls closure
  758. * @param peer identity of the other peer
  759. * @param address where to send the message, human-readable
  760. * communicator-specific format, 0-terminated, UTF-8
  761. * @return #GNUNET_OK on success, #GNUNET_SYSERR if the provided address is invalid
  762. */
  763. static int
  764. mq_init (void *cls, const struct GNUNET_PeerIdentity *peer, const char *address)
  765. {
  766. struct Queue *queue;
  767. const char *path;
  768. struct sockaddr_un *un;
  769. socklen_t un_len;
  770. (void) cls;
  771. if (0 != strncmp (address,
  772. COMMUNICATOR_ADDRESS_PREFIX "-",
  773. strlen (COMMUNICATOR_ADDRESS_PREFIX "-")))
  774. {
  775. GNUNET_break_op (0);
  776. return GNUNET_SYSERR;
  777. }
  778. path = &address[strlen (COMMUNICATOR_ADDRESS_PREFIX "-")];
  779. un = unix_address_to_sockaddr (path, &un_len);
  780. queue = lookup_queue (peer, un, un_len);
  781. if (NULL != queue)
  782. {
  783. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  784. "Address `%s' for %s ignored, queue exists\n",
  785. path,
  786. GNUNET_i2s (peer));
  787. GNUNET_free (un);
  788. return GNUNET_OK;
  789. }
  790. queue = setup_queue (peer, GNUNET_TRANSPORT_CS_OUTBOUND, un, un_len);
  791. GNUNET_free (un);
  792. if (NULL == queue)
  793. {
  794. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  795. "Failed to setup queue to %s at `%s'\n",
  796. GNUNET_i2s (peer),
  797. path);
  798. return GNUNET_NO;
  799. }
  800. return GNUNET_OK;
  801. }
  802. /**
  803. * Iterator over all message queues to clean up.
  804. *
  805. * @param cls NULL
  806. * @param target unused
  807. * @param value the queue to destroy
  808. * @return #GNUNET_OK to continue to iterate
  809. */
  810. static int
  811. get_queue_delete_it (void *cls,
  812. const struct GNUNET_PeerIdentity *target,
  813. void *value)
  814. {
  815. struct Queue *queue = value;
  816. (void) cls;
  817. (void) target;
  818. queue_destroy (queue);
  819. return GNUNET_OK;
  820. }
  821. /**
  822. * Shutdown the UNIX communicator.
  823. *
  824. * @param cls NULL (always)
  825. */
  826. static void
  827. do_shutdown (void *cls)
  828. {
  829. if (NULL != read_task)
  830. {
  831. GNUNET_SCHEDULER_cancel (read_task);
  832. read_task = NULL;
  833. }
  834. if (NULL != write_task)
  835. {
  836. GNUNET_SCHEDULER_cancel (write_task);
  837. write_task = NULL;
  838. }
  839. if (NULL != unix_sock)
  840. {
  841. GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (unix_sock));
  842. unix_sock = NULL;
  843. }
  844. GNUNET_CONTAINER_multipeermap_iterate (queue_map, &get_queue_delete_it, NULL);
  845. GNUNET_CONTAINER_multipeermap_destroy (queue_map);
  846. if (NULL != ai)
  847. {
  848. GNUNET_TRANSPORT_communicator_address_remove (ai);
  849. ai = NULL;
  850. }
  851. if (NULL != ch)
  852. {
  853. GNUNET_TRANSPORT_communicator_disconnect (ch);
  854. ch = NULL;
  855. }
  856. if (NULL != stats)
  857. {
  858. GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
  859. stats = NULL;
  860. }
  861. }
  862. /**
  863. * Function called when the transport service has received an
  864. * acknowledgement for this communicator (!) via a different return
  865. * path.
  866. *
  867. * Not applicable for UNIX.
  868. *
  869. * @param cls closure
  870. * @param sender which peer sent the notification
  871. * @param msg payload
  872. */
  873. static void
  874. enc_notify_cb (void *cls,
  875. const struct GNUNET_PeerIdentity *sender,
  876. const struct GNUNET_MessageHeader *msg)
  877. {
  878. (void) cls;
  879. (void) sender;
  880. (void) msg;
  881. GNUNET_break_op (0);
  882. }
  883. /**
  884. * Setup communicator and launch network interactions.
  885. *
  886. * @param cls NULL (always)
  887. * @param args remaining command-line arguments
  888. * @param cfgfile name of the configuration file used (for saving, can be NULL!)
  889. * @param cfg configuration
  890. */
  891. static void
  892. run (void *cls,
  893. char *const *args,
  894. const char *cfgfile,
  895. const struct GNUNET_CONFIGURATION_Handle *cfg)
  896. {
  897. char *unix_socket_path;
  898. struct sockaddr_un *un;
  899. socklen_t un_len;
  900. char *my_addr;
  901. struct GNUNET_CRYPTO_EddsaPrivateKey *my_private_key;
  902. (void) cls;
  903. delivering_messages = 0;
  904. my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (cfg);
  905. if (NULL == my_private_key)
  906. {
  907. GNUNET_log (
  908. GNUNET_ERROR_TYPE_ERROR,
  909. _ (
  910. "UNIX communicator is lacking key configuration settings. Exiting.\n"));
  911. GNUNET_SCHEDULER_shutdown ();
  912. return;
  913. }
  914. GNUNET_CRYPTO_eddsa_key_get_public (my_private_key, &my_identity.public_key);
  915. if (GNUNET_OK !=
  916. GNUNET_CONFIGURATION_get_value_filename (cfg,
  917. COMMUNICATOR_CONFIG_SECTION,
  918. "UNIXPATH",
  919. &unix_socket_path))
  920. {
  921. GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
  922. COMMUNICATOR_CONFIG_SECTION,
  923. "UNIXPATH");
  924. return;
  925. }
  926. if (GNUNET_OK !=
  927. GNUNET_CONFIGURATION_get_value_number (cfg,
  928. COMMUNICATOR_CONFIG_SECTION,
  929. "MAX_QUEUE_LENGTH",
  930. &max_queue_length))
  931. max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
  932. un = unix_address_to_sockaddr (unix_socket_path, &un_len);
  933. if (NULL == un)
  934. {
  935. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  936. "Failed to setup UNIX domain socket address with path `%s'\n",
  937. unix_socket_path);
  938. GNUNET_free (unix_socket_path);
  939. return;
  940. }
  941. unix_sock = GNUNET_NETWORK_socket_create (AF_UNIX, SOCK_DGRAM, 0);
  942. if (NULL == unix_sock)
  943. {
  944. GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "socket");
  945. GNUNET_free (un);
  946. GNUNET_free (unix_socket_path);
  947. return;
  948. }
  949. if (('\0' != un->sun_path[0]) &&
  950. (GNUNET_OK != GNUNET_DISK_directory_create_for_file (un->sun_path)))
  951. {
  952. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  953. _ ("Cannot create path to `%s'\n"),
  954. un->sun_path);
  955. GNUNET_NETWORK_socket_close (unix_sock);
  956. unix_sock = NULL;
  957. GNUNET_free (un);
  958. GNUNET_free (unix_socket_path);
  959. return;
  960. }
  961. if (GNUNET_OK != GNUNET_NETWORK_socket_bind (unix_sock,
  962. (const struct sockaddr *) un,
  963. un_len))
  964. {
  965. GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_ERROR, "bind", un->sun_path);
  966. GNUNET_NETWORK_socket_close (unix_sock);
  967. unix_sock = NULL;
  968. GNUNET_free (un);
  969. GNUNET_free (unix_socket_path);
  970. return;
  971. }
  972. GNUNET_free (un);
  973. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Bound to `%s'\n", unix_socket_path);
  974. stats = GNUNET_STATISTICS_create ("C-UNIX", cfg);
  975. GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL);
  976. read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
  977. unix_sock,
  978. &select_read_cb,
  979. NULL);
  980. queue_map = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
  981. ch = GNUNET_TRANSPORT_communicator_connect (cfg,
  982. COMMUNICATOR_CONFIG_SECTION,
  983. COMMUNICATOR_ADDRESS_PREFIX,
  984. GNUNET_TRANSPORT_CC_RELIABLE,
  985. &mq_init,
  986. NULL,
  987. &enc_notify_cb,
  988. NULL);
  989. if (NULL == ch)
  990. {
  991. GNUNET_break (0);
  992. GNUNET_SCHEDULER_shutdown ();
  993. GNUNET_free (unix_socket_path);
  994. return;
  995. }
  996. GNUNET_asprintf (&my_addr,
  997. "%s-%s",
  998. COMMUNICATOR_ADDRESS_PREFIX,
  999. unix_socket_path);
  1000. GNUNET_free (unix_socket_path);
  1001. ai = GNUNET_TRANSPORT_communicator_address_add (ch,
  1002. my_addr,
  1003. GNUNET_NT_LOOPBACK,
  1004. GNUNET_TIME_UNIT_FOREVER_REL);
  1005. GNUNET_free (my_addr);
  1006. }
  1007. /**
  1008. * The main function for the UNIX communicator.
  1009. *
  1010. * @param argc number of arguments from the command line
  1011. * @param argv command line arguments
  1012. * @return 0 ok, 1 on error
  1013. */
  1014. int
  1015. main (int argc, char *const *argv)
  1016. {
  1017. static const struct GNUNET_GETOPT_CommandLineOption options[] = {
  1018. GNUNET_GETOPT_OPTION_END
  1019. };
  1020. int ret;
  1021. if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv))
  1022. return 2;
  1023. ret = (GNUNET_OK ==
  1024. GNUNET_PROGRAM_run (argc,
  1025. argv,
  1026. "gnunet-communicator-unix",
  1027. _ ("GNUnet UNIX domain socket communicator"),
  1028. options,
  1029. &run,
  1030. NULL))
  1031. ? 0
  1032. : 1;
  1033. GNUNET_free_nz ((void *) argv);
  1034. return ret;
  1035. }
  1036. #if defined(__linux__) && defined(__GLIBC__)
  1037. #include <malloc.h>
  1038. /**
  1039. * MINIMIZE heap size (way below 128k) since this process doesn't need much.
  1040. */
  1041. void __attribute__ ((constructor))
  1042. GNUNET_ARM_memory_init ()
  1043. {
  1044. mallopt (M_TRIM_THRESHOLD, 4 * 1024);
  1045. mallopt (M_TOP_PAD, 1 * 1024);
  1046. malloc_trim (0);
  1047. }
  1048. #endif
  1049. /* end of gnunet-communicator-unix.c */