transport_api2_communication.c 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146
  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2018 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file transport/transport_api2_communication.c
  18. * @brief implementation of the gnunet_transport_communication_service.h API
  19. * @author Christian Grothoff
  20. */
  21. #include "platform.h"
  22. #include "gnunet_util_lib.h"
  23. #include "gnunet_protocols.h"
  24. #include "gnunet_transport_communication_service.h"
  25. #include "gnunet_ats_transport_service.h"
  26. #include "transport.h"
  27. /**
  28. * How many messages do we keep at most in the queue to the
  29. * transport service before we start to drop (default,
  30. * can be changed via the configuration file).
  31. */
  32. #define DEFAULT_MAX_QUEUE_LENGTH 16
  33. /**
  34. * Information we track per packet to enable flow control.
  35. */
  36. struct FlowControl
  37. {
  38. /**
  39. * Kept in a DLL.
  40. */
  41. struct FlowControl *next;
  42. /**
  43. * Kept in a DLL.
  44. */
  45. struct FlowControl *prev;
  46. /**
  47. * Function to call once the message was processed.
  48. */
  49. GNUNET_TRANSPORT_MessageCompletedCallback cb;
  50. /**
  51. * Closure for @e cb
  52. */
  53. void *cb_cls;
  54. /**
  55. * Which peer is this about?
  56. */
  57. struct GNUNET_PeerIdentity sender;
  58. /**
  59. * More-or-less unique ID for the message.
  60. */
  61. uint64_t id;
  62. };
  63. /**
  64. * Information we track per message to tell the transport about
  65. * success or failures.
  66. */
  67. struct AckPending
  68. {
  69. /**
  70. * Kept in a DLL.
  71. */
  72. struct AckPending *next;
  73. /**
  74. * Kept in a DLL.
  75. */
  76. struct AckPending *prev;
  77. /**
  78. * Communicator this entry belongs to.
  79. */
  80. struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
  81. /**
  82. * Which peer is this about?
  83. */
  84. struct GNUNET_PeerIdentity receiver;
  85. /**
  86. * More-or-less unique ID for the message.
  87. */
  88. uint64_t mid;
  89. };
  90. /**
  91. * Opaque handle to the transport service for communicators.
  92. */
  93. struct GNUNET_TRANSPORT_CommunicatorHandle
  94. {
  95. /**
  96. * Head of DLL of addresses this communicator offers to the transport service.
  97. */
  98. struct GNUNET_TRANSPORT_AddressIdentifier *ai_head;
  99. /**
  100. * Tail of DLL of addresses this communicator offers to the transport service.
  101. */
  102. struct GNUNET_TRANSPORT_AddressIdentifier *ai_tail;
  103. /**
  104. * DLL of messages awaiting flow control confirmation (ack).
  105. */
  106. struct FlowControl *fc_head;
  107. /**
  108. * DLL of messages awaiting flow control confirmation (ack).
  109. */
  110. struct FlowControl *fc_tail;
  111. /**
  112. * DLL of messages awaiting transmission confirmation (ack).
  113. */
  114. struct AckPending *ap_head;
  115. /**
  116. * DLL of messages awaiting transmission confirmation (ack).
  117. */
  118. struct AckPending *ap_tail;
  119. /**
  120. * DLL of queues we offer.
  121. */
  122. struct GNUNET_TRANSPORT_QueueHandle *queue_head;
  123. /**
  124. * DLL of queues we offer.
  125. */
  126. struct GNUNET_TRANSPORT_QueueHandle *queue_tail;
  127. /**
  128. * Our configuration.
  129. */
  130. const struct GNUNET_CONFIGURATION_Handle *cfg;
  131. /**
  132. * Config section to use.
  133. */
  134. const char *config_section;
  135. /**
  136. * Address prefix to use.
  137. */
  138. const char *addr_prefix;
  139. /**
  140. * Function to call when the transport service wants us to initiate
  141. * a communication channel with another peer.
  142. */
  143. GNUNET_TRANSPORT_CommunicatorMqInit mq_init;
  144. /**
  145. * Closure for @e mq_init.
  146. */
  147. void *mq_init_cls;
  148. /**
  149. * Function to call when the transport service receives messages
  150. * for a communicator (i.e. for NAT traversal or for non-bidirectional
  151. * communicators).
  152. */
  153. GNUNET_TRANSPORT_CommunicatorNotify notify_cb;
  154. /**
  155. * Closure for @e notify_Cb.
  156. */
  157. void *notify_cb_cls;
  158. /**
  159. * Queue to talk to the transport service.
  160. */
  161. struct GNUNET_MQ_Handle *mq;
  162. /**
  163. * Maximum permissable queue length.
  164. */
  165. unsigned long long max_queue_length;
  166. /**
  167. * Flow-control identifier generator.
  168. */
  169. uint64_t fc_gen;
  170. /**
  171. * Internal UUID for the address used in communication with the
  172. * transport service.
  173. */
  174. uint32_t aid_gen;
  175. /**
  176. * Queue identifier generator.
  177. */
  178. uint32_t queue_gen;
  179. /**
  180. * Characteristics of the communicator.
  181. */
  182. enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
  183. };
  184. /**
  185. * Handle returned to identify the internal data structure the transport
  186. * API has created to manage a message queue to a particular peer.
  187. */
  188. struct GNUNET_TRANSPORT_QueueHandle
  189. {
  190. /**
  191. * Kept in a DLL.
  192. */
  193. struct GNUNET_TRANSPORT_QueueHandle *next;
  194. /**
  195. * Kept in a DLL.
  196. */
  197. struct GNUNET_TRANSPORT_QueueHandle *prev;
  198. /**
  199. * Handle this queue belongs to.
  200. */
  201. struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
  202. /**
  203. * Address used by the communication queue.
  204. */
  205. char *address;
  206. /**
  207. * The queue itself.
  208. */
  209. struct GNUNET_MQ_Handle *mq;
  210. /**
  211. * Which peer we can communciate with.
  212. */
  213. struct GNUNET_PeerIdentity peer;
  214. /**
  215. * Network type of the communciation queue.
  216. */
  217. enum GNUNET_NetworkType nt;
  218. /**
  219. * Communication status of the queue.
  220. */
  221. enum GNUNET_TRANSPORT_ConnectionStatus cs;
  222. /**
  223. * ID for this queue when talking to the transport service.
  224. */
  225. uint32_t queue_id;
  226. /**
  227. * Maximum transmission unit for the queue.
  228. */
  229. uint32_t mtu;
  230. /**
  231. * Queue length.
  232. */
  233. uint64_t q_len;
  234. /**
  235. * Queue priority.
  236. */
  237. uint32_t priority;
  238. };
  239. /**
  240. * Internal representation of an address a communicator is
  241. * currently providing for the transport service.
  242. */
  243. struct GNUNET_TRANSPORT_AddressIdentifier
  244. {
  245. /**
  246. * Kept in a DLL.
  247. */
  248. struct GNUNET_TRANSPORT_AddressIdentifier *next;
  249. /**
  250. * Kept in a DLL.
  251. */
  252. struct GNUNET_TRANSPORT_AddressIdentifier *prev;
  253. /**
  254. * Transport handle where the address was added.
  255. */
  256. struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
  257. /**
  258. * The actual address.
  259. */
  260. char *address;
  261. /**
  262. * When does the address expire? (Expected lifetime of the
  263. * address.)
  264. */
  265. struct GNUNET_TIME_Relative expiration;
  266. /**
  267. * Internal UUID for the address used in communication with the
  268. * transport service.
  269. */
  270. uint32_t aid;
  271. /**
  272. * Network type for the address.
  273. */
  274. enum GNUNET_NetworkType nt;
  275. };
  276. /**
  277. * (re)connect our communicator to the transport service
  278. *
  279. * @param ch handle to reconnect
  280. */
  281. static void
  282. reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch);
  283. /**
  284. * Send message to the transport service about address @a ai
  285. * being now available.
  286. *
  287. * @param ai address to add
  288. */
  289. static void
  290. send_add_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
  291. {
  292. struct GNUNET_MQ_Envelope *env;
  293. struct GNUNET_TRANSPORT_AddAddressMessage *aam;
  294. if (NULL == ai->ch->mq)
  295. return;
  296. env = GNUNET_MQ_msg_extra (aam,
  297. strlen (ai->address) + 1,
  298. GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS);
  299. aam->expiration = GNUNET_TIME_relative_hton (ai->expiration);
  300. aam->nt = htonl ((uint32_t) ai->nt);
  301. memcpy (&aam[1], ai->address, strlen (ai->address) + 1);
  302. GNUNET_MQ_send (ai->ch->mq, env);
  303. }
  304. /**
  305. * Send message to the transport service about address @a ai
  306. * being no longer available.
  307. *
  308. * @param ai address to delete
  309. */
  310. static void
  311. send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
  312. {
  313. struct GNUNET_MQ_Envelope *env;
  314. struct GNUNET_TRANSPORT_DelAddressMessage *dam;
  315. if (NULL == ai->ch->mq)
  316. return;
  317. env = GNUNET_MQ_msg (dam, GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS);
  318. dam->aid = htonl (ai->aid);
  319. GNUNET_MQ_send (ai->ch->mq, env);
  320. }
  321. /**
  322. * Send message to the transport service about queue @a qh
  323. * being now available.
  324. *
  325. * @param qh queue to add
  326. */
  327. static void
  328. send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
  329. {
  330. struct GNUNET_MQ_Envelope *env;
  331. struct GNUNET_TRANSPORT_AddQueueMessage *aqm;
  332. if (NULL == qh->ch->mq)
  333. return;
  334. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  335. "Sending `GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP` message\n");
  336. env = GNUNET_MQ_msg_extra (aqm,
  337. strlen (qh->address) + 1,
  338. GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP);
  339. aqm->qid = htonl (qh->queue_id);
  340. aqm->receiver = qh->peer;
  341. aqm->nt = htonl ((uint32_t) qh->nt);
  342. aqm->mtu = htonl (qh->mtu);
  343. aqm->q_len = GNUNET_htonll (qh->q_len);
  344. aqm->priority = htonl (qh->priority);
  345. aqm->cs = htonl ((uint32_t) qh->cs);
  346. memcpy (&aqm[1], qh->address, strlen (qh->address) + 1);
  347. GNUNET_MQ_send (qh->ch->mq, env);
  348. }
  349. /**
  350. * Send message to the transport service about queue @a qh
  351. * updated.
  352. *
  353. * @param qh queue to add
  354. */
  355. static void
  356. send_update_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
  357. {
  358. struct GNUNET_MQ_Envelope *env;
  359. struct GNUNET_TRANSPORT_UpdateQueueMessage *uqm;
  360. if (NULL == qh->ch->mq)
  361. return;
  362. env = GNUNET_MQ_msg (uqm, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_UPDATE);
  363. uqm->qid = htonl (qh->queue_id);
  364. uqm->receiver = qh->peer;
  365. uqm->nt = htonl ((uint32_t) qh->nt);
  366. uqm->mtu = htonl (qh->mtu);
  367. uqm->q_len = GNUNET_htonll (qh->q_len);
  368. uqm->priority = htonl (qh->priority);
  369. uqm->cs = htonl ((uint32_t) qh->cs);
  370. GNUNET_MQ_send (qh->ch->mq, env);
  371. }
  372. /**
  373. * Send message to the transport service about queue @a qh
  374. * being no longer available.
  375. *
  376. * @param qh queue to delete
  377. */
  378. static void
  379. send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
  380. {
  381. struct GNUNET_MQ_Envelope *env;
  382. struct GNUNET_TRANSPORT_DelQueueMessage *dqm;
  383. if (NULL == qh->ch->mq)
  384. return;
  385. env = GNUNET_MQ_msg (dqm, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN);
  386. dqm->qid = htonl (qh->queue_id);
  387. dqm->receiver = qh->peer;
  388. GNUNET_MQ_send (qh->ch->mq, env);
  389. }
  390. /**
  391. * Disconnect from the transport service. Purges
  392. * all flow control entries as we will no longer receive
  393. * the ACKs. Purges the ack pending entries as the
  394. * transport will no longer expect the confirmations.
  395. *
  396. * @param ch service to disconnect from
  397. */
  398. static void
  399. disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
  400. {
  401. struct FlowControl *fcn;
  402. struct AckPending *apn;
  403. for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fcn)
  404. {
  405. fcn = fc->next;
  406. GNUNET_CONTAINER_DLL_remove (ch->fc_head, ch->fc_tail, fc);
  407. fc->cb (fc->cb_cls, GNUNET_SYSERR);
  408. GNUNET_free (fc);
  409. }
  410. for (struct AckPending *ap = ch->ap_head; NULL != ap; ap = apn)
  411. {
  412. apn = ap->next;
  413. GNUNET_CONTAINER_DLL_remove (ch->ap_head, ch->ap_tail, ap);
  414. GNUNET_free (ap);
  415. }
  416. if (NULL == ch->mq)
  417. return;
  418. GNUNET_MQ_destroy (ch->mq);
  419. ch->mq = NULL;
  420. }
  421. /**
  422. * Function called on MQ errors.
  423. */
  424. static void
  425. error_handler (void *cls, enum GNUNET_MQ_Error error)
  426. {
  427. struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
  428. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  429. "MQ failure %d, reconnecting to transport service.\n",
  430. error);
  431. disconnect (ch);
  432. /* TODO: maybe do this with exponential backoff/delay */
  433. reconnect (ch);
  434. }
  435. /**
  436. * Transport service acknowledged a message we gave it
  437. * (with flow control enabled). Tell the communicator.
  438. *
  439. * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
  440. * @param incoming_ack the ack
  441. */
  442. static void
  443. handle_incoming_ack (
  444. void *cls,
  445. const struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
  446. {
  447. struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
  448. for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fc->next)
  449. {
  450. if ((fc->id == incoming_ack->fc_id) &&
  451. (0 == memcmp (&fc->sender,
  452. &incoming_ack->sender,
  453. sizeof(struct GNUNET_PeerIdentity))))
  454. {
  455. GNUNET_CONTAINER_DLL_remove (ch->fc_head, ch->fc_tail, fc);
  456. fc->cb (fc->cb_cls, GNUNET_OK);
  457. GNUNET_free (fc);
  458. return;
  459. }
  460. }
  461. GNUNET_break (0);
  462. disconnect (ch);
  463. /* TODO: maybe do this with exponential backoff/delay */
  464. reconnect (ch);
  465. }
  466. /**
  467. * Transport service wants us to create a queue. Check if @a cq
  468. * is well-formed.
  469. *
  470. * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
  471. * @param cq the queue creation request
  472. * @return #GNUNET_OK if @a smt is well-formed
  473. */
  474. static int
  475. check_create_queue (void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq)
  476. {
  477. (void) cls;
  478. GNUNET_MQ_check_zero_termination (cq);
  479. return GNUNET_OK;
  480. }
  481. /**
  482. * Transport service wants us to create a queue. Tell the communicator.
  483. *
  484. * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
  485. * @param cq the queue creation request
  486. */
  487. static void
  488. handle_create_queue (void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq)
  489. {
  490. struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
  491. const char *addr = (const char *) &cq[1];
  492. struct GNUNET_TRANSPORT_CreateQueueResponse *cqr;
  493. struct GNUNET_MQ_Envelope *env;
  494. if (GNUNET_OK != ch->mq_init (ch->mq_init_cls, &cq->receiver, addr))
  495. {
  496. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  497. "Address `%s' invalid for this communicator\n",
  498. addr);
  499. env = GNUNET_MQ_msg (cqr, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL);
  500. }
  501. else
  502. {
  503. env = GNUNET_MQ_msg (cqr, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK);
  504. }
  505. cqr->request_id = cq->request_id;
  506. GNUNET_MQ_send (ch->mq, env);
  507. }
  508. /**
  509. * Transport service wants us to send a message. Check if @a smt
  510. * is well-formed.
  511. *
  512. * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
  513. * @param smt the transmission request
  514. * @return #GNUNET_OK if @a smt is well-formed
  515. */
  516. static int
  517. check_send_msg (void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt)
  518. {
  519. (void) cls;
  520. GNUNET_MQ_check_boxed_message (smt);
  521. return GNUNET_OK;
  522. }
  523. /**
  524. * Notify transport service about @a status of a message with
  525. * @a mid sent to @a receiver.
  526. *
  527. * @param ch handle
  528. * @param status #GNUNET_OK on success, #GNUNET_SYSERR on failure
  529. * @param receiver which peer was the receiver
  530. * @param mid message that the ack is about
  531. */
  532. static void
  533. send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
  534. int status,
  535. const struct GNUNET_PeerIdentity *receiver,
  536. uint64_t mid)
  537. {
  538. struct GNUNET_MQ_Envelope *env;
  539. struct GNUNET_TRANSPORT_SendMessageToAck *ack;
  540. env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
  541. ack->status = htonl (status);
  542. ack->mid = mid;
  543. ack->receiver = *receiver;
  544. GNUNET_MQ_send (ch->mq, env);
  545. }
  546. /**
  547. * Message queue transmission by communicator was successful,
  548. * notify transport service.
  549. *
  550. * @param cls an `struct AckPending *`
  551. */
  552. static void
  553. send_ack_cb (void *cls)
  554. {
  555. struct AckPending *ap = cls;
  556. struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch;
  557. GNUNET_CONTAINER_DLL_remove (ch->ap_head, ch->ap_tail, ap);
  558. send_ack (ch, GNUNET_OK, &ap->receiver, ap->mid);
  559. GNUNET_free (ap);
  560. }
  561. /**
  562. * Transport service wants us to send a message. Tell the communicator.
  563. *
  564. * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
  565. * @param smt the transmission request
  566. */
  567. static void
  568. handle_send_msg (void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt)
  569. {
  570. struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
  571. const struct GNUNET_MessageHeader *mh;
  572. struct GNUNET_MQ_Envelope *env;
  573. struct AckPending *ap;
  574. struct GNUNET_TRANSPORT_QueueHandle *qh;
  575. for (qh = ch->queue_head; NULL != qh; qh = qh->next)
  576. if ((qh->queue_id == smt->qid) &&
  577. (0 == memcmp (&qh->peer,
  578. &smt->receiver,
  579. sizeof(struct GNUNET_PeerIdentity))))
  580. break;
  581. if (NULL == qh)
  582. {
  583. /* queue is already gone, tell transport this one failed */
  584. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  585. "Transmission failed, queue no longer exists.\n");
  586. send_ack (ch, GNUNET_NO, &smt->receiver, smt->mid);
  587. return;
  588. }
  589. ap = GNUNET_new (struct AckPending);
  590. ap->ch = ch;
  591. ap->receiver = smt->receiver;
  592. ap->mid = smt->mid;
  593. GNUNET_CONTAINER_DLL_insert (ch->ap_head, ch->ap_tail, ap);
  594. mh = (const struct GNUNET_MessageHeader *) &smt[1];
  595. env = GNUNET_MQ_msg_copy (mh);
  596. GNUNET_MQ_notify_sent (env, &send_ack_cb, ap);
  597. GNUNET_MQ_send (qh->mq, env);
  598. }
  599. /**
  600. * Transport service gives us backchannel message. Check if @a bi
  601. * is well-formed.
  602. *
  603. * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
  604. * @param bi the backchannel message
  605. * @return #GNUNET_OK if @a smt is well-formed
  606. */
  607. static int
  608. check_backchannel_incoming (
  609. void *cls,
  610. const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi)
  611. {
  612. (void) cls;
  613. GNUNET_MQ_check_boxed_message (bi);
  614. return GNUNET_OK;
  615. }
  616. /**
  617. * Transport service gives us backchannel message. Handle it.
  618. *
  619. * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
  620. * @param bi the backchannel message
  621. */
  622. static void
  623. handle_backchannel_incoming (
  624. void *cls,
  625. const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi)
  626. {
  627. struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
  628. if (NULL != ch->notify_cb)
  629. ch->notify_cb (ch->notify_cb_cls,
  630. &bi->pid,
  631. (const struct GNUNET_MessageHeader *) &bi[1]);
  632. else
  633. GNUNET_log (
  634. GNUNET_ERROR_TYPE_INFO,
  635. _ ("Dropped backchanel message: handler not provided by communicator\n"));
  636. }
  637. /**
  638. * (re)connect our communicator to the transport service
  639. *
  640. * @param ch handle to reconnect
  641. */
  642. static void
  643. reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
  644. {
  645. struct GNUNET_MQ_MessageHandler handlers[] =
  646. { GNUNET_MQ_hd_fixed_size (incoming_ack,
  647. GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK,
  648. struct GNUNET_TRANSPORT_IncomingMessageAck,
  649. ch),
  650. GNUNET_MQ_hd_var_size (create_queue,
  651. GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE,
  652. struct GNUNET_TRANSPORT_CreateQueue,
  653. ch),
  654. GNUNET_MQ_hd_var_size (send_msg,
  655. GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG,
  656. struct GNUNET_TRANSPORT_SendMessageTo,
  657. ch),
  658. GNUNET_MQ_hd_var_size (
  659. backchannel_incoming,
  660. GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING,
  661. struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming,
  662. ch),
  663. GNUNET_MQ_handler_end () };
  664. struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam;
  665. struct GNUNET_MQ_Envelope *env;
  666. ch->mq =
  667. GNUNET_CLIENT_connect (ch->cfg, "transport", handlers, &error_handler, ch);
  668. if (NULL == ch->mq)
  669. return;
  670. env = GNUNET_MQ_msg_extra (cam,
  671. strlen (ch->addr_prefix) + 1,
  672. GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR);
  673. cam->cc = htonl ((uint32_t) ch->cc);
  674. memcpy (&cam[1], ch->addr_prefix, strlen (ch->addr_prefix) + 1);
  675. GNUNET_MQ_send (ch->mq, env);
  676. for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head; NULL != ai;
  677. ai = ai->next)
  678. send_add_address (ai);
  679. for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head; NULL != qh;
  680. qh = qh->next)
  681. send_add_queue (qh);
  682. }
  683. /**
  684. * Connect to the transport service.
  685. *
  686. * @param cfg configuration to use
  687. * @param config_section section of the configuration to use for options
  688. * @param addr_prefix address prefix for addresses supported by this
  689. * communicator, could be NULL for incoming-only communicators
  690. * @param cc what characteristics does the communicator have?
  691. * @param mtu maximum message size supported by communicator, 0 if
  692. * sending is not supported, SIZE_MAX for no MTU
  693. * @param mq_init function to call to initialize a message queue given
  694. * the address of another peer, can be NULL if the
  695. * communicator only supports receiving messages
  696. * @param mq_init_cls closure for @a mq_init
  697. * @param notify_cb function to pass backchannel messages to communicator
  698. * @param notify_cb_cls closure for @a notify_cb
  699. * @return NULL on error
  700. */
  701. struct GNUNET_TRANSPORT_CommunicatorHandle *
  702. GNUNET_TRANSPORT_communicator_connect (
  703. const struct GNUNET_CONFIGURATION_Handle *cfg,
  704. const char *config_section,
  705. const char *addr_prefix,
  706. enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc,
  707. GNUNET_TRANSPORT_CommunicatorMqInit mq_init,
  708. void *mq_init_cls,
  709. GNUNET_TRANSPORT_CommunicatorNotify notify_cb,
  710. void *notify_cb_cls)
  711. {
  712. struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
  713. ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle);
  714. ch->cfg = cfg;
  715. ch->config_section = config_section;
  716. ch->addr_prefix = addr_prefix;
  717. ch->mq_init = mq_init;
  718. ch->mq_init_cls = mq_init_cls;
  719. ch->notify_cb = notify_cb;
  720. ch->notify_cb_cls = notify_cb_cls;
  721. ch->cc = cc;
  722. reconnect (ch);
  723. if (GNUNET_OK !=
  724. GNUNET_CONFIGURATION_get_value_number (cfg,
  725. config_section,
  726. "MAX_QUEUE_LENGTH",
  727. &ch->max_queue_length))
  728. ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
  729. if (NULL == ch->mq)
  730. {
  731. GNUNET_free (ch);
  732. return NULL;
  733. }
  734. return ch;
  735. }
  736. /**
  737. * Disconnect from the transport service.
  738. *
  739. * @param ch handle returned from connect
  740. */
  741. void
  742. GNUNET_TRANSPORT_communicator_disconnect (
  743. struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
  744. {
  745. disconnect (ch);
  746. while (NULL != ch->ai_head)
  747. {
  748. GNUNET_break (0); /* communicator forgot to remove address, warn! */
  749. GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head);
  750. }
  751. GNUNET_free (ch);
  752. }
  753. /* ************************* Receiving *************************** */
  754. /**
  755. * Notify transport service that the communicator has received
  756. * a message.
  757. *
  758. * @param ch connection to transport service
  759. * @param sender presumed sender of the message (details to be checked
  760. * by higher layers)
  761. * @param msg the message
  762. * @param expected_addr_validity how long does the communicator believe it
  763. * will continue to be able to receive messages from the same address
  764. * on which it received this message?
  765. * @param cb function to call once handling the message is done, NULL if
  766. * flow control is not supported by this communicator
  767. * @param cb_cls closure for @a cb
  768. * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was
  769. * immediately dropped due to memory limitations (communicator
  770. * should try to apply back pressure),
  771. * #GNUNET_SYSERR if the message could not be delivered because
  772. * the tranport service is not yet up
  773. */
  774. int
  775. GNUNET_TRANSPORT_communicator_receive (
  776. struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
  777. const struct GNUNET_PeerIdentity *sender,
  778. const struct GNUNET_MessageHeader *msg,
  779. struct GNUNET_TIME_Relative expected_addr_validity,
  780. GNUNET_TRANSPORT_MessageCompletedCallback cb,
  781. void *cb_cls)
  782. {
  783. struct GNUNET_MQ_Envelope *env;
  784. struct GNUNET_TRANSPORT_IncomingMessage *im;
  785. uint16_t msize;
  786. if (NULL == ch->mq)
  787. return GNUNET_SYSERR;
  788. if ((NULL == cb) && (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length))
  789. {
  790. GNUNET_log (
  791. GNUNET_ERROR_TYPE_WARNING,
  792. "Dropping message: transport is too slow, queue length %llu exceeded\n",
  793. ch->max_queue_length);
  794. return GNUNET_NO;
  795. }
  796. msize = ntohs (msg->size);
  797. env =
  798. GNUNET_MQ_msg_extra (im, msize, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG);
  799. if (NULL == env)
  800. {
  801. GNUNET_break (0);
  802. return GNUNET_SYSERR;
  803. }
  804. im->expected_address_validity =
  805. GNUNET_TIME_relative_hton (expected_addr_validity);
  806. im->sender = *sender;
  807. // FIXME: this is expensive, would be better if we would
  808. // re-design the API to allow us to create the envelope first,
  809. // and then have the application fill in the body so we do
  810. // not have to memcpy()
  811. memcpy (&im[1], msg, msize);
  812. im->fc_on = htonl (GNUNET_NO);
  813. if (NULL != cb)
  814. {
  815. struct FlowControl *fc;
  816. im->fc_on = htonl (GNUNET_YES);
  817. im->fc_id = ch->fc_gen++;
  818. fc = GNUNET_new (struct FlowControl);
  819. fc->sender = *sender;
  820. fc->id = im->fc_id;
  821. fc->cb = cb;
  822. fc->cb_cls = cb_cls;
  823. GNUNET_CONTAINER_DLL_insert (ch->fc_head, ch->fc_tail, fc);
  824. }
  825. GNUNET_MQ_send (ch->mq, env);
  826. return GNUNET_OK;
  827. }
  828. /* ************************* Discovery *************************** */
  829. /**
  830. * Notify transport service that an MQ became available due to an
  831. * "inbound" connection or because the communicator discovered the
  832. * presence of another peer.
  833. *
  834. * @param ch connection to transport service
  835. * @param peer peer with which we can now communicate
  836. * @param address address in human-readable format, 0-terminated, UTF-8
  837. * @param mtu maximum message size supported by queue, 0 if
  838. * sending is not supported, SIZE_MAX for no MTU
  839. * @param q_len number of messages that can be send through this queue
  840. * @param priority queue priority. Queues with highest priority should be
  841. * used
  842. * @param nt which network type does the @a address belong to?
  843. * @param cc what characteristics does the communicator have?
  844. * @param cs what is the connection status of the queue?
  845. * @param mq message queue of the @a peer
  846. * @return API handle identifying the new MQ
  847. */
  848. struct GNUNET_TRANSPORT_QueueHandle *
  849. GNUNET_TRANSPORT_communicator_mq_add (
  850. struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
  851. const struct GNUNET_PeerIdentity *peer,
  852. const char *address,
  853. uint32_t mtu,
  854. uint64_t q_len,
  855. uint32_t priority,
  856. enum GNUNET_NetworkType nt,
  857. enum GNUNET_TRANSPORT_ConnectionStatus cs,
  858. struct GNUNET_MQ_Handle *mq)
  859. {
  860. struct GNUNET_TRANSPORT_QueueHandle *qh;
  861. qh = GNUNET_new (struct GNUNET_TRANSPORT_QueueHandle);
  862. qh->ch = ch;
  863. qh->peer = *peer;
  864. qh->address = GNUNET_strdup (address);
  865. qh->nt = nt;
  866. qh->mtu = mtu;
  867. qh->q_len = q_len;
  868. qh->priority = priority;
  869. qh->cs = cs;
  870. qh->mq = mq;
  871. qh->queue_id = ch->queue_gen++;
  872. GNUNET_CONTAINER_DLL_insert (ch->queue_head, ch->queue_tail, qh);
  873. send_add_queue (qh);
  874. return qh;
  875. }
  876. /**
  877. * Notify transport service that an MQ was updated
  878. *
  879. * @param ch connection to transport service
  880. * @param qh the queue to update
  881. * @param q_len number of messages that can be send through this queue
  882. * @param priority queue priority. Queues with highest priority should be
  883. * used
  884. */
  885. void
  886. GNUNET_TRANSPORT_communicator_mq_update (
  887. struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
  888. const struct GNUNET_TRANSPORT_QueueHandle *u_qh,
  889. uint64_t q_len,
  890. uint32_t priority)
  891. {
  892. struct GNUNET_TRANSPORT_QueueHandle *qh;
  893. for (qh = ch->queue_head; NULL != qh; qh = qh->next)
  894. {
  895. if (u_qh == qh)
  896. break;
  897. }
  898. GNUNET_assert (NULL != qh);
  899. qh->q_len = q_len;
  900. qh->priority = priority;
  901. send_update_queue (qh);
  902. }
  903. /**
  904. * Notify transport service that an MQ became unavailable due to a
  905. * disconnect or timeout.
  906. *
  907. * @param qh handle for the queue that must be invalidated
  908. */
  909. void
  910. GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh)
  911. {
  912. struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch;
  913. send_del_queue (qh);
  914. GNUNET_CONTAINER_DLL_remove (ch->queue_head, ch->queue_tail, qh);
  915. GNUNET_MQ_destroy (qh->mq);
  916. GNUNET_free (qh->address);
  917. GNUNET_free (qh);
  918. }
  919. /**
  920. * Notify transport service about an address that this communicator
  921. * provides for this peer.
  922. *
  923. * @param ch connection to transport service
  924. * @param address our address in human-readable format, 0-terminated, UTF-8
  925. * @param nt which network type does the address belong to?
  926. * @param expiration when does the communicator forsee this address expiring?
  927. */
  928. struct GNUNET_TRANSPORT_AddressIdentifier *
  929. GNUNET_TRANSPORT_communicator_address_add (
  930. struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
  931. const char *address,
  932. enum GNUNET_NetworkType nt,
  933. struct GNUNET_TIME_Relative expiration)
  934. {
  935. struct GNUNET_TRANSPORT_AddressIdentifier *ai;
  936. ai = GNUNET_new (struct GNUNET_TRANSPORT_AddressIdentifier);
  937. ai->ch = ch;
  938. ai->address = GNUNET_strdup (address);
  939. ai->nt = nt;
  940. ai->expiration = expiration;
  941. ai->aid = ch->aid_gen++;
  942. GNUNET_CONTAINER_DLL_insert (ch->ai_head, ch->ai_tail, ai);
  943. send_add_address (ai);
  944. return ai;
  945. }
  946. /**
  947. * Notify transport service about an address that this communicator no
  948. * longer provides for this peer.
  949. *
  950. * @param ai address that is no longer provided
  951. */
  952. void
  953. GNUNET_TRANSPORT_communicator_address_remove (
  954. struct GNUNET_TRANSPORT_AddressIdentifier *ai)
  955. {
  956. struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch;
  957. send_del_address (ai);
  958. GNUNET_CONTAINER_DLL_remove (ch->ai_head, ch->ai_tail, ai);
  959. GNUNET_free (ai->address);
  960. GNUNET_free (ai);
  961. }
  962. /* ************************* Backchannel *************************** */
  963. /**
  964. * The communicator asks the transport service to route a message via
  965. * a different path to another communicator service at another peer.
  966. * This must only be done for special control traffic (as there is no
  967. * flow control for this API), such as acknowledgements, and generally
  968. * only be done if the communicator is uni-directional (i.e. cannot
  969. * send the message back itself).
  970. *
  971. * @param ch handle of this communicator
  972. * @param pid peer to send the message to
  973. * @param comm name of the communicator to send the message to
  974. * @param header header of the message to transmit and pass via the
  975. * notify-API to @a pid's communicator @a comm
  976. */
  977. void
  978. GNUNET_TRANSPORT_communicator_notify (
  979. struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
  980. const struct GNUNET_PeerIdentity *pid,
  981. const char *comm,
  982. const struct GNUNET_MessageHeader *header)
  983. {
  984. struct GNUNET_MQ_Envelope *env;
  985. struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb;
  986. size_t slen = strlen (comm) + 1;
  987. uint16_t mlen = ntohs (header->size);
  988. GNUNET_assert (mlen + slen + sizeof(*cb) < UINT16_MAX);
  989. env =
  990. GNUNET_MQ_msg_extra (cb,
  991. slen + mlen,
  992. GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL);
  993. cb->pid = *pid;
  994. memcpy (&cb[1], header, mlen);
  995. memcpy (((char *) &cb[1]) + mlen, comm, slen);
  996. GNUNET_MQ_send (ch->mq, env);
  997. }
  998. /* end of transport_api2_communication.c */