transport_api2_communication.c 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137
  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2018 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file transport/transport_api2_communication.c
  18. * @brief implementation of the gnunet_transport_communication_service.h API
  19. * @author Christian Grothoff
  20. */
  21. #include "platform.h"
  22. #include "gnunet_util_lib.h"
  23. #include "gnunet_protocols.h"
  24. #include "gnunet_transport_communication_service.h"
  25. #include "gnunet_ats_transport_service.h"
  26. #include "transport.h"
  27. /**
  28. * How many messages do we keep at most in the queue to the
  29. * transport service before we start to drop (default,
  30. * can be changed via the configuration file).
  31. */
  32. #define DEFAULT_MAX_QUEUE_LENGTH 16
  33. /**
  34. * Information we track per packet to enable flow control.
  35. */
  36. struct FlowControl
  37. {
  38. /**
  39. * Kept in a DLL.
  40. */
  41. struct FlowControl *next;
  42. /**
  43. * Kept in a DLL.
  44. */
  45. struct FlowControl *prev;
  46. /**
  47. * Function to call once the message was processed.
  48. */
  49. GNUNET_TRANSPORT_MessageCompletedCallback cb;
  50. /**
  51. * Closure for @e cb
  52. */
  53. void *cb_cls;
  54. /**
  55. * Which peer is this about?
  56. */
  57. struct GNUNET_PeerIdentity sender;
  58. /**
  59. * More-or-less unique ID for the message.
  60. */
  61. uint64_t id;
  62. };
  63. /**
  64. * Information we track per message to tell the transport about
  65. * success or failures.
  66. */
  67. struct AckPending
  68. {
  69. /**
  70. * Kept in a DLL.
  71. */
  72. struct AckPending *next;
  73. /**
  74. * Kept in a DLL.
  75. */
  76. struct AckPending *prev;
  77. /**
  78. * Communicator this entry belongs to.
  79. */
  80. struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
  81. /**
  82. * Which peer is this about?
  83. */
  84. struct GNUNET_PeerIdentity receiver;
  85. /**
  86. * More-or-less unique ID for the message.
  87. */
  88. uint64_t mid;
  89. };
  90. /**
  91. * Opaque handle to the transport service for communicators.
  92. */
  93. struct GNUNET_TRANSPORT_CommunicatorHandle
  94. {
  95. /**
  96. * Head of DLL of addresses this communicator offers to the transport service.
  97. */
  98. struct GNUNET_TRANSPORT_AddressIdentifier *ai_head;
  99. /**
  100. * Tail of DLL of addresses this communicator offers to the transport service.
  101. */
  102. struct GNUNET_TRANSPORT_AddressIdentifier *ai_tail;
  103. /**
  104. * DLL of messages awaiting flow control confirmation (ack).
  105. */
  106. struct FlowControl *fc_head;
  107. /**
  108. * DLL of messages awaiting flow control confirmation (ack).
  109. */
  110. struct FlowControl *fc_tail;
  111. /**
  112. * DLL of messages awaiting transmission confirmation (ack).
  113. */
  114. struct AckPending *ap_head;
  115. /**
  116. * DLL of messages awaiting transmission confirmation (ack).
  117. */
  118. struct AckPending *ap_tail;
  119. /**
  120. * DLL of queues we offer.
  121. */
  122. struct GNUNET_TRANSPORT_QueueHandle *queue_head;
  123. /**
  124. * DLL of queues we offer.
  125. */
  126. struct GNUNET_TRANSPORT_QueueHandle *queue_tail;
  127. /**
  128. * Our configuration.
  129. */
  130. const struct GNUNET_CONFIGURATION_Handle *cfg;
  131. /**
  132. * Config section to use.
  133. */
  134. const char *config_section;
  135. /**
  136. * Address prefix to use.
  137. */
  138. const char *addr_prefix;
  139. /**
  140. * Function to call when the transport service wants us to initiate
  141. * a communication channel with another peer.
  142. */
  143. GNUNET_TRANSPORT_CommunicatorMqInit mq_init;
  144. /**
  145. * Closure for @e mq_init.
  146. */
  147. void *mq_init_cls;
  148. /**
  149. * Function to call when the transport service receives messages
  150. * for a communicator (i.e. for NAT traversal or for non-bidirectional
  151. * communicators).
  152. */
  153. GNUNET_TRANSPORT_CommunicatorNotify notify_cb;
  154. /**
  155. * Closure for @e notify_Cb.
  156. */
  157. void *notify_cb_cls;
  158. /**
  159. * Queue to talk to the transport service.
  160. */
  161. struct GNUNET_MQ_Handle *mq;
  162. /**
  163. * Maximum permissable queue length.
  164. */
  165. unsigned long long max_queue_length;
  166. /**
  167. * Flow-control identifier generator.
  168. */
  169. uint64_t fc_gen;
  170. /**
  171. * Internal UUID for the address used in communication with the
  172. * transport service.
  173. */
  174. uint32_t aid_gen;
  175. /**
  176. * Queue identifier generator.
  177. */
  178. uint32_t queue_gen;
  179. /**
  180. * Characteristics of the communicator.
  181. */
  182. enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
  183. };
  184. /**
  185. * Handle returned to identify the internal data structure the transport
  186. * API has created to manage a message queue to a particular peer.
  187. */
  188. struct GNUNET_TRANSPORT_QueueHandle
  189. {
  190. /**
  191. * Kept in a DLL.
  192. */
  193. struct GNUNET_TRANSPORT_QueueHandle *next;
  194. /**
  195. * Kept in a DLL.
  196. */
  197. struct GNUNET_TRANSPORT_QueueHandle *prev;
  198. /**
  199. * Handle this queue belongs to.
  200. */
  201. struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
  202. /**
  203. * Address used by the communication queue.
  204. */
  205. char *address;
  206. /**
  207. * The queue itself.
  208. */
  209. struct GNUNET_MQ_Handle *mq;
  210. /**
  211. * Which peer we can communciate with.
  212. */
  213. struct GNUNET_PeerIdentity peer;
  214. /**
  215. * Network type of the communciation queue.
  216. */
  217. enum GNUNET_NetworkType nt;
  218. /**
  219. * Communication status of the queue.
  220. */
  221. enum GNUNET_TRANSPORT_ConnectionStatus cs;
  222. /**
  223. * ID for this queue when talking to the transport service.
  224. */
  225. uint32_t queue_id;
  226. /**
  227. * Maximum transmission unit for the queue.
  228. */
  229. uint32_t mtu;
  230. };
  231. /**
  232. * Internal representation of an address a communicator is
  233. * currently providing for the transport service.
  234. */
  235. struct GNUNET_TRANSPORT_AddressIdentifier
  236. {
  237. /**
  238. * Kept in a DLL.
  239. */
  240. struct GNUNET_TRANSPORT_AddressIdentifier *next;
  241. /**
  242. * Kept in a DLL.
  243. */
  244. struct GNUNET_TRANSPORT_AddressIdentifier *prev;
  245. /**
  246. * Transport handle where the address was added.
  247. */
  248. struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
  249. /**
  250. * The actual address.
  251. */
  252. char *address;
  253. /**
  254. * When does the address expire? (Expected lifetime of the
  255. * address.)
  256. */
  257. struct GNUNET_TIME_Relative expiration;
  258. /**
  259. * Internal UUID for the address used in communication with the
  260. * transport service.
  261. */
  262. uint32_t aid;
  263. /**
  264. * Network type for the address.
  265. */
  266. enum GNUNET_NetworkType nt;
  267. };
  268. /**
  269. * (re)connect our communicator to the transport service
  270. *
  271. * @param ch handle to reconnect
  272. */
  273. static void
  274. reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch);
  275. /**
  276. * Send message to the transport service about address @a ai
  277. * being now available.
  278. *
  279. * @param ai address to add
  280. */
  281. static void
  282. send_add_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
  283. {
  284. struct GNUNET_MQ_Envelope *env;
  285. struct GNUNET_TRANSPORT_AddAddressMessage *aam;
  286. if (NULL == ai->ch->mq)
  287. return;
  288. env = GNUNET_MQ_msg_extra (aam,
  289. strlen (ai->address) + 1,
  290. GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS);
  291. aam->expiration = GNUNET_TIME_relative_hton (ai->expiration);
  292. aam->nt = htonl ((uint32_t) ai->nt);
  293. memcpy (&aam[1],
  294. ai->address,
  295. strlen (ai->address) + 1);
  296. GNUNET_MQ_send (ai->ch->mq,
  297. env);
  298. }
  299. /**
  300. * Send message to the transport service about address @a ai
  301. * being no longer available.
  302. *
  303. * @param ai address to delete
  304. */
  305. static void
  306. send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
  307. {
  308. struct GNUNET_MQ_Envelope *env;
  309. struct GNUNET_TRANSPORT_DelAddressMessage *dam;
  310. if (NULL == ai->ch->mq)
  311. return;
  312. env = GNUNET_MQ_msg (dam,
  313. GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS);
  314. dam->aid = htonl (ai->aid);
  315. GNUNET_MQ_send (ai->ch->mq,
  316. env);
  317. }
  318. /**
  319. * Send message to the transport service about queue @a qh
  320. * being now available.
  321. *
  322. * @param qh queue to add
  323. */
  324. static void
  325. send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
  326. {
  327. struct GNUNET_MQ_Envelope *env;
  328. struct GNUNET_TRANSPORT_AddQueueMessage *aqm;
  329. if (NULL == qh->ch->mq)
  330. return;
  331. env = GNUNET_MQ_msg_extra (aqm,
  332. strlen (qh->address) + 1,
  333. GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP);
  334. aqm->qid = htonl (qh->queue_id);
  335. aqm->receiver = qh->peer;
  336. aqm->nt = htonl ((uint32_t) qh->nt);
  337. aqm->mtu = htonl (qh->mtu);
  338. aqm->cs = htonl ((uint32_t) qh->cs);
  339. memcpy (&aqm[1],
  340. qh->address,
  341. strlen (qh->address) + 1);
  342. GNUNET_MQ_send (qh->ch->mq,
  343. env);
  344. }
  345. /**
  346. * Send message to the transport service about queue @a qh
  347. * being no longer available.
  348. *
  349. * @param qh queue to delete
  350. */
  351. static void
  352. send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
  353. {
  354. struct GNUNET_MQ_Envelope *env;
  355. struct GNUNET_TRANSPORT_DelQueueMessage *dqm;
  356. if (NULL == qh->ch->mq)
  357. return;
  358. env = GNUNET_MQ_msg (dqm,
  359. GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN);
  360. dqm->qid = htonl (qh->queue_id);
  361. dqm->receiver = qh->peer;
  362. GNUNET_MQ_send (qh->ch->mq,
  363. env);
  364. }
  365. /**
  366. * Disconnect from the transport service. Purges
  367. * all flow control entries as we will no longer receive
  368. * the ACKs. Purges the ack pending entries as the
  369. * transport will no longer expect the confirmations.
  370. *
  371. * @param ch service to disconnect from
  372. */
  373. static void
  374. disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
  375. {
  376. struct FlowControl *fcn;
  377. struct AckPending *apn;
  378. for (struct FlowControl *fc = ch->fc_head;
  379. NULL != fc;
  380. fc = fcn)
  381. {
  382. fcn = fc->next;
  383. GNUNET_CONTAINER_DLL_remove (ch->fc_head,
  384. ch->fc_tail,
  385. fc);
  386. fc->cb (fc->cb_cls,
  387. GNUNET_SYSERR);
  388. GNUNET_free (fc);
  389. }
  390. for (struct AckPending *ap = ch->ap_head;
  391. NULL != ap;
  392. ap = apn)
  393. {
  394. apn = ap->next;
  395. GNUNET_CONTAINER_DLL_remove (ch->ap_head,
  396. ch->ap_tail,
  397. ap);
  398. GNUNET_free (ap);
  399. }
  400. if (NULL == ch->mq)
  401. return;
  402. GNUNET_MQ_destroy (ch->mq);
  403. ch->mq = NULL;
  404. }
  405. /**
  406. * Function called on MQ errors.
  407. */
  408. static void
  409. error_handler (void *cls,
  410. enum GNUNET_MQ_Error error)
  411. {
  412. struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
  413. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  414. "MQ failure %d, reconnecting to transport service.\n",
  415. error);
  416. disconnect (ch);
  417. /* TODO: maybe do this with exponential backoff/delay */
  418. reconnect (ch);
  419. }
  420. /**
  421. * Transport service acknowledged a message we gave it
  422. * (with flow control enabled). Tell the communicator.
  423. *
  424. * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
  425. * @param incoming_ack the ack
  426. */
  427. static void
  428. handle_incoming_ack (void *cls,
  429. const struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
  430. {
  431. struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
  432. for (struct FlowControl *fc = ch->fc_head;
  433. NULL != fc;
  434. fc = fc->next)
  435. {
  436. if ( (fc->id == incoming_ack->fc_id) &&
  437. (0 == memcmp (&fc->sender,
  438. &incoming_ack->sender,
  439. sizeof (struct GNUNET_PeerIdentity))) )
  440. {
  441. GNUNET_CONTAINER_DLL_remove (ch->fc_head,
  442. ch->fc_tail,
  443. fc);
  444. fc->cb (fc->cb_cls,
  445. GNUNET_OK);
  446. GNUNET_free (fc);
  447. return;
  448. }
  449. }
  450. GNUNET_break (0);
  451. disconnect (ch);
  452. /* TODO: maybe do this with exponential backoff/delay */
  453. reconnect (ch);
  454. }
  455. /**
  456. * Transport service wants us to create a queue. Check if @a cq
  457. * is well-formed.
  458. *
  459. * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
  460. * @param cq the queue creation request
  461. * @return #GNUNET_OK if @a smt is well-formed
  462. */
  463. static int
  464. check_create_queue (void *cls,
  465. const struct GNUNET_TRANSPORT_CreateQueue *cq)
  466. {
  467. uint16_t len = ntohs (cq->header.size) - sizeof (*cq);
  468. const char *addr = (const char *) &cq[1];
  469. (void) cls;
  470. if ( (0 == len) ||
  471. ('\0' != addr[len-1]) )
  472. {
  473. GNUNET_break (0);
  474. return GNUNET_SYSERR;
  475. }
  476. return GNUNET_OK;
  477. }
  478. /**
  479. * Transport service wants us to create a queue. Tell the communicator.
  480. *
  481. * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
  482. * @param cq the queue creation request
  483. */
  484. static void
  485. handle_create_queue (void *cls,
  486. const struct GNUNET_TRANSPORT_CreateQueue *cq)
  487. {
  488. struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
  489. const char *addr = (const char *) &cq[1];
  490. struct GNUNET_TRANSPORT_CreateQueueResponse *cqr;
  491. struct GNUNET_MQ_Envelope *env;
  492. if (GNUNET_OK !=
  493. ch->mq_init (ch->mq_init_cls,
  494. &cq->receiver,
  495. addr))
  496. {
  497. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  498. "Address `%s' invalid for this communicator\n",
  499. addr);
  500. env = GNUNET_MQ_msg (cqr,
  501. GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL);
  502. }
  503. else
  504. {
  505. env = GNUNET_MQ_msg (cqr,
  506. GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK);
  507. }
  508. cqr->request_id = cq->request_id;
  509. GNUNET_MQ_send (ch->mq,
  510. env);
  511. }
  512. /**
  513. * Transport service wants us to send a message. Check if @a smt
  514. * is well-formed.
  515. *
  516. * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
  517. * @param smt the transmission request
  518. * @return #GNUNET_OK if @a smt is well-formed
  519. */
  520. static int
  521. check_send_msg (void *cls,
  522. const struct GNUNET_TRANSPORT_SendMessageTo *smt)
  523. {
  524. (void) cls;
  525. GNUNET_MQ_check_boxed_message (smt);
  526. return GNUNET_OK;
  527. }
  528. /**
  529. * Notify transport service about @a status of a message with
  530. * @a mid sent to @a receiver.
  531. *
  532. * @param ch handle
  533. * @param status #GNUNET_OK on success, #GNUNET_SYSERR on failure
  534. * @param receiver which peer was the receiver
  535. * @param mid message that the ack is about
  536. */
  537. static void
  538. send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
  539. int status,
  540. const struct GNUNET_PeerIdentity *receiver,
  541. uint64_t mid)
  542. {
  543. struct GNUNET_MQ_Envelope *env;
  544. struct GNUNET_TRANSPORT_SendMessageToAck *ack;
  545. env = GNUNET_MQ_msg (ack,
  546. GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
  547. ack->status = htonl (status);
  548. ack->mid = mid;
  549. ack->receiver = *receiver;
  550. GNUNET_MQ_send (ch->mq,
  551. env);
  552. }
  553. /**
  554. * Message queue transmission by communicator was successful,
  555. * notify transport service.
  556. *
  557. * @param cls an `struct AckPending *`
  558. */
  559. static void
  560. send_ack_cb (void *cls)
  561. {
  562. struct AckPending *ap = cls;
  563. struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch;
  564. GNUNET_CONTAINER_DLL_remove (ch->ap_head,
  565. ch->ap_tail,
  566. ap);
  567. send_ack (ch,
  568. GNUNET_OK,
  569. &ap->receiver,
  570. ap->mid);
  571. GNUNET_free (ap);
  572. }
  573. /**
  574. * Transport service wants us to send a message. Tell the communicator.
  575. *
  576. * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
  577. * @param smt the transmission request
  578. */
  579. static void
  580. handle_send_msg (void *cls,
  581. const struct GNUNET_TRANSPORT_SendMessageTo *smt)
  582. {
  583. struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
  584. const struct GNUNET_MessageHeader *mh;
  585. struct GNUNET_MQ_Envelope *env;
  586. struct AckPending *ap;
  587. struct GNUNET_TRANSPORT_QueueHandle *qh;
  588. for (qh = ch->queue_head;NULL != qh; qh = qh->next)
  589. if ( (qh->queue_id == smt->qid) &&
  590. (0 == memcmp (&qh->peer,
  591. &smt->receiver,
  592. sizeof (struct GNUNET_PeerIdentity))) )
  593. break;
  594. if (NULL == qh)
  595. {
  596. /* queue is already gone, tell transport this one failed */
  597. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  598. "Transmission failed, queue no longer exists.\n");
  599. send_ack (ch,
  600. GNUNET_NO,
  601. &smt->receiver,
  602. smt->mid);
  603. return;
  604. }
  605. ap = GNUNET_new (struct AckPending);
  606. ap->ch = ch;
  607. ap->receiver = smt->receiver;
  608. ap->mid = smt->mid;
  609. GNUNET_CONTAINER_DLL_insert (ch->ap_head,
  610. ch->ap_tail,
  611. ap);
  612. mh = (const struct GNUNET_MessageHeader *) &smt[1];
  613. env = GNUNET_MQ_msg_copy (mh);
  614. GNUNET_MQ_notify_sent (env,
  615. &send_ack_cb,
  616. ap);
  617. GNUNET_MQ_send (qh->mq,
  618. env);
  619. }
  620. /**
  621. * Transport service gives us backchannel message. Check if @a bi
  622. * is well-formed.
  623. *
  624. * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
  625. * @param bi the backchannel message
  626. * @return #GNUNET_OK if @a smt is well-formed
  627. */
  628. static int
  629. check_backchannel_incoming (void *cls,
  630. const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi)
  631. {
  632. (void) cls;
  633. GNUNET_MQ_check_boxed_message (bi);
  634. return GNUNET_OK;
  635. }
  636. /**
  637. * Transport service gives us backchannel message. Handle it.
  638. *
  639. * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
  640. * @param bi the backchannel message
  641. */
  642. static void
  643. handle_backchannel_incoming (void *cls,
  644. const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi)
  645. {
  646. struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
  647. if (NULL != ch->notify_cb)
  648. ch->notify_cb (ch->notify_cb_cls,
  649. &bi->pid,
  650. (const struct GNUNET_MessageHeader *) &bi[1]);
  651. else
  652. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  653. _("Dropped backchanel message: handler not provided by communicator\n"));
  654. }
  655. /**
  656. * (re)connect our communicator to the transport service
  657. *
  658. * @param ch handle to reconnect
  659. */
  660. static void
  661. reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
  662. {
  663. struct GNUNET_MQ_MessageHandler handlers[] = {
  664. GNUNET_MQ_hd_fixed_size (incoming_ack,
  665. GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK,
  666. struct GNUNET_TRANSPORT_IncomingMessageAck,
  667. ch),
  668. GNUNET_MQ_hd_var_size (create_queue,
  669. GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE,
  670. struct GNUNET_TRANSPORT_CreateQueue,
  671. ch),
  672. GNUNET_MQ_hd_var_size (send_msg,
  673. GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG,
  674. struct GNUNET_TRANSPORT_SendMessageTo,
  675. ch),
  676. GNUNET_MQ_hd_var_size (backchannel_incoming,
  677. GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING,
  678. struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming,
  679. ch),
  680. GNUNET_MQ_handler_end()
  681. };
  682. struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam;
  683. struct GNUNET_MQ_Envelope *env;
  684. ch->mq = GNUNET_CLIENT_connect (ch->cfg,
  685. "transport",
  686. handlers,
  687. &error_handler,
  688. ch);
  689. if (NULL == ch->mq)
  690. return;
  691. env = GNUNET_MQ_msg_extra (cam,
  692. strlen (ch->addr_prefix) + 1,
  693. GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR);
  694. cam->cc = htonl ((uint32_t) ch->cc);
  695. memcpy (&cam[1],
  696. ch->addr_prefix,
  697. strlen (ch->addr_prefix) + 1);
  698. GNUNET_MQ_send (ch->mq,
  699. env);
  700. for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head;
  701. NULL != ai;
  702. ai = ai->next)
  703. send_add_address (ai);
  704. for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head;
  705. NULL != qh;
  706. qh = qh->next)
  707. send_add_queue (qh);
  708. }
  709. /**
  710. * Connect to the transport service.
  711. *
  712. * @param cfg configuration to use
  713. * @param config_section section of the configuration to use for options
  714. * @param addr_prefix address prefix for addresses supported by this
  715. * communicator, could be NULL for incoming-only communicators
  716. * @param cc what characteristics does the communicator have?
  717. * @param mtu maximum message size supported by communicator, 0 if
  718. * sending is not supported, SIZE_MAX for no MTU
  719. * @param mq_init function to call to initialize a message queue given
  720. * the address of another peer, can be NULL if the
  721. * communicator only supports receiving messages
  722. * @param mq_init_cls closure for @a mq_init
  723. * @param notify_cb function to pass backchannel messages to communicator
  724. * @param notify_cb_cls closure for @a notify_cb
  725. * @return NULL on error
  726. */
  727. struct GNUNET_TRANSPORT_CommunicatorHandle *
  728. GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
  729. const char *config_section,
  730. const char *addr_prefix,
  731. enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc,
  732. GNUNET_TRANSPORT_CommunicatorMqInit mq_init,
  733. void *mq_init_cls,
  734. GNUNET_TRANSPORT_CommunicatorNotify notify_cb,
  735. void *notify_cb_cls)
  736. {
  737. struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
  738. ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle);
  739. ch->cfg = cfg;
  740. ch->config_section = config_section;
  741. ch->addr_prefix = addr_prefix;
  742. ch->mq_init = mq_init;
  743. ch->mq_init_cls = mq_init_cls;
  744. ch->notify_cb = notify_cb;
  745. ch->notify_cb_cls = notify_cb_cls;
  746. ch->cc = cc;
  747. reconnect (ch);
  748. if (GNUNET_OK !=
  749. GNUNET_CONFIGURATION_get_value_number (cfg,
  750. config_section,
  751. "MAX_QUEUE_LENGTH",
  752. &ch->max_queue_length))
  753. ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
  754. if (NULL == ch->mq)
  755. {
  756. GNUNET_free (ch);
  757. return NULL;
  758. }
  759. return ch;
  760. }
  761. /**
  762. * Disconnect from the transport service.
  763. *
  764. * @param ch handle returned from connect
  765. */
  766. void
  767. GNUNET_TRANSPORT_communicator_disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
  768. {
  769. disconnect (ch);
  770. while (NULL != ch->ai_head)
  771. {
  772. GNUNET_break (0); /* communicator forgot to remove address, warn! */
  773. GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head);
  774. }
  775. GNUNET_free (ch);
  776. }
  777. /* ************************* Receiving *************************** */
  778. /**
  779. * Notify transport service that the communicator has received
  780. * a message.
  781. *
  782. * @param ch connection to transport service
  783. * @param sender presumed sender of the message (details to be checked
  784. * by higher layers)
  785. * @param msg the message
  786. * @param cb function to call once handling the message is done, NULL if
  787. * flow control is not supported by this communicator
  788. * @param cb_cls closure for @a cb
  789. * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was
  790. * immediately dropped due to memory limitations (communicator
  791. * should try to apply back pressure),
  792. * #GNUNET_SYSERR if the message could not be delivered because
  793. * the tranport service is not yet up
  794. */
  795. int
  796. GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
  797. const struct GNUNET_PeerIdentity *sender,
  798. const struct GNUNET_MessageHeader *msg,
  799. GNUNET_TRANSPORT_MessageCompletedCallback cb,
  800. void *cb_cls)
  801. {
  802. struct GNUNET_MQ_Envelope *env;
  803. struct GNUNET_TRANSPORT_IncomingMessage *im;
  804. uint16_t msize;
  805. if (NULL == ch->mq)
  806. return GNUNET_SYSERR;
  807. if ( (NULL == cb) &&
  808. (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length) )
  809. {
  810. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  811. "Dropping message: transprot is too slow, queue length %llu exceeded\n",
  812. ch->max_queue_length);
  813. return GNUNET_NO;
  814. }
  815. msize = ntohs (msg->size);
  816. env = GNUNET_MQ_msg_extra (im,
  817. msize,
  818. GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG);
  819. if (NULL == env)
  820. {
  821. GNUNET_break (0);
  822. return GNUNET_SYSERR;
  823. }
  824. im->sender = *sender;
  825. memcpy (&im[1],
  826. msg,
  827. msize);
  828. if (NULL != cb)
  829. {
  830. struct FlowControl *fc;
  831. im->fc_on = htonl (GNUNET_YES);
  832. im->fc_id = ch->fc_gen++;
  833. fc = GNUNET_new (struct FlowControl);
  834. fc->sender = *sender;
  835. fc->id = im->fc_id;
  836. fc->cb = cb;
  837. fc->cb_cls = cb_cls;
  838. GNUNET_CONTAINER_DLL_insert (ch->fc_head,
  839. ch->fc_tail,
  840. fc);
  841. }
  842. GNUNET_MQ_send (ch->mq,
  843. env);
  844. return GNUNET_OK;
  845. }
  846. /* ************************* Discovery *************************** */
  847. /**
  848. * Notify transport service that an MQ became available due to an
  849. * "inbound" connection or because the communicator discovered the
  850. * presence of another peer.
  851. *
  852. * @param ch connection to transport service
  853. * @param peer peer with which we can now communicate
  854. * @param address address in human-readable format, 0-terminated, UTF-8
  855. * @param mtu maximum message size supported by queue, 0 if
  856. * sending is not supported, SIZE_MAX for no MTU
  857. * @param nt which network type does the @a address belong to?
  858. * @param cc what characteristics does the communicator have?
  859. * @param cs what is the connection status of the queue?
  860. * @param mq message queue of the @a peer
  861. * @return API handle identifying the new MQ
  862. */
  863. struct GNUNET_TRANSPORT_QueueHandle *
  864. GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
  865. const struct GNUNET_PeerIdentity *peer,
  866. const char *address,
  867. uint32_t mtu,
  868. enum GNUNET_NetworkType nt,
  869. enum GNUNET_TRANSPORT_ConnectionStatus cs,
  870. struct GNUNET_MQ_Handle *mq)
  871. {
  872. struct GNUNET_TRANSPORT_QueueHandle *qh;
  873. qh = GNUNET_new (struct GNUNET_TRANSPORT_QueueHandle);
  874. qh->ch = ch;
  875. qh->peer = *peer;
  876. qh->address = GNUNET_strdup (address);
  877. qh->nt = nt;
  878. qh->mtu = mtu;
  879. qh->cs = cs;
  880. qh->mq = mq;
  881. qh->queue_id = ch->queue_gen++;
  882. GNUNET_CONTAINER_DLL_insert (ch->queue_head,
  883. ch->queue_tail,
  884. qh);
  885. send_add_queue (qh);
  886. return qh;
  887. }
  888. /**
  889. * Notify transport service that an MQ became unavailable due to a
  890. * disconnect or timeout.
  891. *
  892. * @param qh handle for the queue that must be invalidated
  893. */
  894. void
  895. GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh)
  896. {
  897. struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch;
  898. send_del_queue (qh);
  899. GNUNET_CONTAINER_DLL_remove (ch->queue_head,
  900. ch->queue_tail,
  901. qh);
  902. GNUNET_MQ_destroy (qh->mq);
  903. GNUNET_free (qh->address);
  904. GNUNET_free (qh);
  905. }
  906. /**
  907. * Notify transport service about an address that this communicator
  908. * provides for this peer.
  909. *
  910. * @param ch connection to transport service
  911. * @param address our address in human-readable format, 0-terminated, UTF-8
  912. * @param nt which network type does the address belong to?
  913. * @param expiration when does the communicator forsee this address expiring?
  914. */
  915. struct GNUNET_TRANSPORT_AddressIdentifier *
  916. GNUNET_TRANSPORT_communicator_address_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
  917. const char *address,
  918. enum GNUNET_NetworkType nt,
  919. struct GNUNET_TIME_Relative expiration)
  920. {
  921. struct GNUNET_TRANSPORT_AddressIdentifier *ai;
  922. ai = GNUNET_new (struct GNUNET_TRANSPORT_AddressIdentifier);
  923. ai->ch = ch;
  924. ai->address = GNUNET_strdup (address);
  925. ai->nt = nt;
  926. ai->expiration = expiration;
  927. ai->aid = ch->aid_gen++;
  928. GNUNET_CONTAINER_DLL_insert (ch->ai_head,
  929. ch->ai_tail,
  930. ai);
  931. send_add_address (ai);
  932. return ai;
  933. }
  934. /**
  935. * Notify transport service about an address that this communicator no
  936. * longer provides for this peer.
  937. *
  938. * @param ai address that is no longer provided
  939. */
  940. void
  941. GNUNET_TRANSPORT_communicator_address_remove (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
  942. {
  943. struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch;
  944. send_del_address (ai);
  945. GNUNET_CONTAINER_DLL_remove (ch->ai_head,
  946. ch->ai_tail,
  947. ai);
  948. GNUNET_free (ai->address);
  949. GNUNET_free (ai);
  950. }
  951. /* ************************* Backchannel *************************** */
  952. /**
  953. * The communicator asks the transport service to route a message via
  954. * a different path to another communicator service at another peer.
  955. * This must only be done for special control traffic (as there is no
  956. * flow control for this API), such as acknowledgements, and generally
  957. * only be done if the communicator is uni-directional (i.e. cannot
  958. * send the message back itself).
  959. *
  960. * @param ch handle of this communicator
  961. * @param pid peer to send the message to
  962. * @param comm name of the communicator to send the message to
  963. * @param header header of the message to transmit and pass via the
  964. * notify-API to @a pid's communicator @a comm
  965. */
  966. void
  967. GNUNET_TRANSPORT_communicator_notify (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
  968. const struct GNUNET_PeerIdentity *pid,
  969. const char *comm,
  970. const struct GNUNET_MessageHeader *header)
  971. {
  972. struct GNUNET_MQ_Envelope *env;
  973. struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb;
  974. size_t slen = strlen (comm) + 1;
  975. uint16_t mlen = ntohs (header->size);
  976. GNUNET_assert (mlen + slen + sizeof (*cb) < UINT16_MAX);
  977. env = GNUNET_MQ_msg_extra (cb,
  978. slen + mlen,
  979. GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL);
  980. cb->pid = *pid;
  981. memcpy (&cb[1],
  982. header,
  983. mlen);
  984. memcpy (((char *)&cb[1]) + mlen,
  985. comm,
  986. slen);
  987. GNUNET_MQ_send (ch->mq,
  988. env);
  989. }
  990. /* end of transport_api2_communication.c */