transport_api.c 54 KB


  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2009-2013 Christian Grothoff (and other contributing authors)
  4. GNUnet is free software; you can redistribute it and/or modify
  5. it under the terms of the GNU General Public License as published
  6. by the Free Software Foundation; either version 3, or (at your
  7. option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. General Public License for more details.
  12. You should have received a copy of the GNU General Public License
  13. along with GNUnet; see the file COPYING. If not, write to the
  14. Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  15. Boston, MA 02111-1307, USA.
  16. */
  17. /**
  18. * @file transport/transport_api.c
  19. * @brief library to access the low-level P2P IO service
  20. * @author Christian Grothoff
  21. *
  22. * TODO:
  23. * - test test test
  24. */
  25. #include "platform.h"
  26. #include "gnunet_util_lib.h"
  27. #include "gnunet_constants.h"
  28. #include "gnunet_arm_service.h"
  29. #include "gnunet_hello_lib.h"
  30. #include "gnunet_protocols.h"
  31. #include "gnunet_transport_service.h"
  32. #include "transport.h"
  33. #define LOG(kind,...) GNUNET_log_from (kind, "transport-api",__VA_ARGS__)
  34. /**
  35. * How large to start with for the hashmap of neighbours.
  36. */
  37. #define STARTING_NEIGHBOURS_SIZE 16
  38. /**
  39. * Handle for a message that should be transmitted to the service.
  40. * Used for both control messages and normal messages.
  41. */
  42. struct GNUNET_TRANSPORT_TransmitHandle
  43. {
  44. /**
  45. * We keep all requests in a DLL.
  46. */
  47. struct GNUNET_TRANSPORT_TransmitHandle *next;
  48. /**
  49. * We keep all requests in a DLL.
  50. */
  51. struct GNUNET_TRANSPORT_TransmitHandle *prev;
  52. /**
  53. * Neighbour for this handle, NULL for control messages.
  54. */
  55. struct Neighbour *neighbour;
  56. /**
  57. * Function to call when @e notify_size bytes are available
  58. * for transmission.
  59. */
  60. GNUNET_TRANSPORT_TransmitReadyNotify notify;
  61. /**
  62. * Closure for @e notify.
  63. */
  64. void *notify_cls;
  65. /**
  66. * Timeout for this request, 0 for control messages.
  67. */
  68. struct GNUNET_TIME_Absolute timeout;
  69. /**
  70. * Task to trigger request timeout if the request is stalled due to
  71. * congestion.
  72. */
  73. struct GNUNET_SCHEDULER_Task * timeout_task;
  74. /**
  75. * How many bytes is our notify callback waiting for?
  76. */
  77. size_t notify_size;
  78. };
  79. /**
  80. * Entry in hash table of all of our current (connected) neighbours.
  81. */
  82. struct Neighbour
  83. {
  84. /**
  85. * Overall transport handle.
  86. */
  87. struct GNUNET_TRANSPORT_Handle *h;
  88. /**
  89. * Active transmit handle or NULL.
  90. */
  91. struct GNUNET_TRANSPORT_TransmitHandle *th;
  92. /**
  93. * Identity of this neighbour.
  94. */
  95. struct GNUNET_PeerIdentity id;
  96. /**
  97. * Outbound bandwidh tracker.
  98. */
  99. struct GNUNET_BANDWIDTH_Tracker out_tracker;
  100. /**
  101. * Entry in our readyness heap (which is sorted by @e next_ready
  102. * value). NULL if there is no pending transmission request for
  103. * this neighbour or if we're waiting for @e is_ready to become
  104. * true AFTER the @e out_tracker suggested that this peer's quota
  105. * has been satisfied (so once @e is_ready goes to #GNUNET_YES,
  106. * we should immediately go back into the heap).
  107. */
  108. struct GNUNET_CONTAINER_HeapNode *hn;
  109. /**
  110. * Is this peer currently ready to receive a message?
  111. */
  112. int is_ready;
  113. /**
  114. * Sending consumed more bytes on wire than payload was announced
  115. * This overhead is added to the delay of next sending operation
  116. */
  117. size_t traffic_overhead;
  118. };
  119. /**
  120. * Linked list of functions to call whenever our HELLO is updated.
  121. */
  122. struct GNUNET_TRANSPORT_GetHelloHandle
  123. {
  124. /**
  125. * This is a doubly linked list.
  126. */
  127. struct GNUNET_TRANSPORT_GetHelloHandle *next;
  128. /**
  129. * This is a doubly linked list.
  130. */
  131. struct GNUNET_TRANSPORT_GetHelloHandle *prev;
  132. /**
  133. * Transport handle.
  134. */
  135. struct GNUNET_TRANSPORT_Handle *handle;
  136. /**
  137. * Callback to call once we got our HELLO.
  138. */
  139. GNUNET_TRANSPORT_HelloUpdateCallback rec;
  140. /**
  141. * Task for calling the HelloUpdateCallback when we already have a HELLO
  142. */
  143. struct GNUNET_SCHEDULER_Task * notify_task;
  144. /**
  145. * Closure for @e rec.
  146. */
  147. void *rec_cls;
  148. };
  149. /**
  150. * Entry in linked list for a try-connect request.
  151. */
  152. struct GNUNET_TRANSPORT_TryConnectHandle
  153. {
  154. /**
  155. * For the DLL.
  156. */
  157. struct GNUNET_TRANSPORT_TryConnectHandle *prev;
  158. /**
  159. * For the DLL.
  160. */
  161. struct GNUNET_TRANSPORT_TryConnectHandle *next;
  162. /**
  163. * Peer we should try to connect to.
  164. */
  165. struct GNUNET_PeerIdentity pid;
  166. /**
  167. * Transport service handle this request is part of.
  168. */
  169. struct GNUNET_TRANSPORT_Handle *th;
  170. /**
  171. * Message transmission request to communicate to service.
  172. */
  173. struct GNUNET_TRANSPORT_TransmitHandle *tth;
  174. /**
  175. * Function to call upon completion (of request transmission).
  176. */
  177. GNUNET_TRANSPORT_TryConnectCallback cb;
  178. /**
  179. * Closure for @e cb.
  180. */
  181. void *cb_cls;
  182. };
  183. /**
  184. * Entry in linked list for all try-disconnect requests
  185. */
  186. struct GNUNET_TRANSPORT_TryDisconnectHandle
  187. {
  188. /**
  189. * For the DLL.
  190. */
  191. struct GNUNET_TRANSPORT_TryDisconnectHandle *prev;
  192. /**
  193. * For the DLL.
  194. */
  195. struct GNUNET_TRANSPORT_TryDisconnectHandle *next;
  196. /**
  197. * Peer we should try to connect to.
  198. */
  199. struct GNUNET_PeerIdentity pid;
  200. /**
  201. * Transport service handle this request is part of.
  202. */
  203. struct GNUNET_TRANSPORT_Handle *th;
  204. /**
  205. * Message transmission request to communicate to service.
  206. */
  207. struct GNUNET_TRANSPORT_TransmitHandle *tth;
  208. /**
  209. * Function to call upon completion (of request transmission).
  210. */
  211. GNUNET_TRANSPORT_TryDisconnectCallback cb;
  212. /**
  213. * Closure for @e cb.
  214. */
  215. void *cb_cls;
  216. };
  217. /**
  218. * Entry in linked list for all offer-HELLO requests.
  219. */
  220. struct GNUNET_TRANSPORT_OfferHelloHandle
  221. {
  222. /**
  223. * For the DLL.
  224. */
  225. struct GNUNET_TRANSPORT_OfferHelloHandle *prev;
  226. /**
  227. * For the DLL.
  228. */
  229. struct GNUNET_TRANSPORT_OfferHelloHandle *next;
  230. /**
  231. * Transport service handle we use for transmission.
  232. */
  233. struct GNUNET_TRANSPORT_Handle *th;
  234. /**
  235. * Transmission handle for this request.
  236. */
  237. struct GNUNET_TRANSPORT_TransmitHandle *tth;
  238. /**
  239. * Function to call once we are done.
  240. */
  241. GNUNET_SCHEDULER_TaskCallback cont;
  242. /**
  243. * Closure for @e cont
  244. */
  245. void *cls;
  246. /**
  247. * The HELLO message to be transmitted.
  248. */
  249. struct GNUNET_MessageHeader *msg;
  250. };
  251. /**
  252. * Handle for the transport service (includes all of the
  253. * state for the transport service).
  254. */
  255. struct GNUNET_TRANSPORT_Handle
  256. {
  257. /**
  258. * Closure for the callbacks.
  259. */
  260. void *cls;
  261. /**
  262. * Function to call for received data.
  263. */
  264. GNUNET_TRANSPORT_ReceiveCallback rec;
  265. /**
  266. * function to call on connect events
  267. */
  268. GNUNET_TRANSPORT_NotifyConnect nc_cb;
  269. /**
  270. * function to call on disconnect events
  271. */
  272. GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
  273. /**
  274. * function to call on excess bandwidth events
  275. */
  276. GNUNET_TRANSPORT_NotifyExcessBandwidth neb_cb;
  277. /**
  278. * Head of DLL of control messages.
  279. */
  280. struct GNUNET_TRANSPORT_TransmitHandle *control_head;
  281. /**
  282. * Tail of DLL of control messages.
  283. */
  284. struct GNUNET_TRANSPORT_TransmitHandle *control_tail;
  285. /**
  286. * The current HELLO message for this peer. Updated
  287. * whenever transports change their addresses.
  288. */
  289. struct GNUNET_MessageHeader *my_hello;
  290. /**
  291. * My client connection to the transport service.
  292. */
  293. struct GNUNET_CLIENT_Connection *client;
  294. /**
  295. * Handle to our registration with the client for notification.
  296. */
  297. struct GNUNET_CLIENT_TransmitHandle *cth;
  298. /**
  299. * Linked list of pending requests for our HELLO.
  300. */
  301. struct GNUNET_TRANSPORT_GetHelloHandle *hwl_head;
  302. /**
  303. * Linked list of pending requests for our HELLO.
  304. */
  305. struct GNUNET_TRANSPORT_GetHelloHandle *hwl_tail;
  306. /**
  307. * Linked list of pending try connect requests head
  308. */
  309. struct GNUNET_TRANSPORT_TryConnectHandle *tc_head;
  310. /**
  311. * Linked list of pending try connect requests tail
  312. */
  313. struct GNUNET_TRANSPORT_TryConnectHandle *tc_tail;
  314. /**
  315. * Linked list of pending try disconnect requests head
  316. */
  317. struct GNUNET_TRANSPORT_TryDisconnectHandle *td_head;
  318. /**
  319. * Linked list of pending try connect requests tail
  320. */
  321. struct GNUNET_TRANSPORT_TryDisconnectHandle *td_tail;
  322. /**
  323. * Linked list of pending offer HELLO requests head
  324. */
  325. struct GNUNET_TRANSPORT_OfferHelloHandle *oh_head;
  326. /**
  327. * Linked list of pending offer HELLO requests tail
  328. */
  329. struct GNUNET_TRANSPORT_OfferHelloHandle *oh_tail;
  330. /**
  331. * My configuration.
  332. */
  333. const struct GNUNET_CONFIGURATION_Handle *cfg;
  334. /**
  335. * Hash map of the current connected neighbours of this peer.
  336. * Maps peer identities to 'struct Neighbour' entries.
  337. */
  338. struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
  339. /**
  340. * Heap sorting peers with pending messages by the timestamps that
  341. * specify when we could next send a message to the respective peer.
  342. * Excludes control messages (which can always go out immediately).
  343. * Maps time stamps to 'struct Neighbour' entries.
  344. */
  345. struct GNUNET_CONTAINER_Heap *ready_heap;
  346. /**
  347. * Peer identity as assumed by this process, or all zeros.
  348. */
  349. struct GNUNET_PeerIdentity self;
  350. /**
  351. * ID of the task trying to reconnect to the service.
  352. */
  353. struct GNUNET_SCHEDULER_Task * reconnect_task;
  354. /**
  355. * ID of the task trying to trigger transmission for a peer while
  356. * maintaining bandwidth quotas. In use if there are no control
  357. * messages and the smallest entry in the 'ready_heap' has a time
  358. * stamp in the future.
  359. */
  360. struct GNUNET_SCHEDULER_Task * quota_task;
  361. /**
  362. * Delay until we try to reconnect.
  363. */
  364. struct GNUNET_TIME_Relative reconnect_delay;
  365. /**
  366. * Should we check that @e self matches what the service thinks?
  367. * (if #GNUNET_NO, then @e self is all zeros!).
  368. */
  369. int check_self;
  370. /**
  371. * Reconnect in progress
  372. */
  373. int reconnecting;
  374. };
  375. /**
  376. * Schedule the task to send one message, either from the control
  377. * list or the peer message queues to the service.
  378. *
  379. * @param h transport service to schedule a transmission for
  380. */
  381. static void
  382. schedule_transmission (struct GNUNET_TRANSPORT_Handle *h);
  383. /**
  384. * Function that will schedule the job that will try
  385. * to connect us again to the client.
  386. *
  387. * @param h transport service to reconnect
  388. */
  389. static void
  390. disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h);
  391. /**
  392. * Get the neighbour list entry for the given peer
  393. *
  394. * @param h our context
  395. * @param peer peer to look up
  396. * @return NULL if no such peer entry exists
  397. */
  398. static struct Neighbour *
  399. neighbour_find (struct GNUNET_TRANSPORT_Handle *h,
  400. const struct GNUNET_PeerIdentity *peer)
  401. {
  402. return GNUNET_CONTAINER_multipeermap_get (h->neighbours, peer);
  403. }
  404. /**
  405. * The outbound quota has changed in a way that may require
  406. * us to reset the timeout. Update the timeout.
  407. *
  408. * @param cls the `struct Neighbour` for which the timeout changed
  409. */
  410. static void
  411. outbound_bw_tracker_update (void *cls)
  412. {
  413. struct Neighbour *n = cls;
  414. struct GNUNET_TIME_Relative delay;
  415. if (NULL == n->hn)
  416. return;
  417. delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
  418. n->th->notify_size + n->traffic_overhead);
  419. LOG (GNUNET_ERROR_TYPE_DEBUG,
  420. "New outbound delay %llu us\n",
  421. GNUNET_STRINGS_relative_time_to_string (delay,
  422. GNUNET_NO));
  423. GNUNET_CONTAINER_heap_update_cost (n->h->ready_heap,
  424. n->hn, delay.rel_value_us);
  425. schedule_transmission (n->h);
  426. }
  427. /**
  428. * Function called by the bandwidth tracker if we have excess
  429. * bandwidth.
  430. *
  431. * @param cls the `struct Neighbour` that has excess bandwidth
  432. */
  433. static void
  434. notify_excess_cb (void *cls)
  435. {
  436. struct Neighbour *n = cls;
  437. struct GNUNET_TRANSPORT_Handle *h = n->h;
  438. if (NULL != h->neb_cb)
  439. h->neb_cb (h->cls,
  440. &n->id);
  441. }
  442. /**
  443. * Add neighbour to our list
  444. *
  445. * @return NULL if this API is currently disconnecting from the service
  446. */
  447. static struct Neighbour *
  448. neighbour_add (struct GNUNET_TRANSPORT_Handle *h,
  449. const struct GNUNET_PeerIdentity *pid)
  450. {
  451. struct Neighbour *n;
  452. LOG (GNUNET_ERROR_TYPE_DEBUG,
  453. "Creating entry for neighbour `%4s'.\n",
  454. GNUNET_i2s (pid));
  455. n = GNUNET_new (struct Neighbour);
  456. n->id = *pid;
  457. n->h = h;
  458. n->is_ready = GNUNET_YES;
  459. n->traffic_overhead = 0;
  460. GNUNET_BANDWIDTH_tracker_init2 (&n->out_tracker,
  461. &outbound_bw_tracker_update, n,
  462. GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
  463. MAX_BANDWIDTH_CARRY_S,
  464. &notify_excess_cb,
  465. n);
  466. GNUNET_assert (GNUNET_OK ==
  467. GNUNET_CONTAINER_multipeermap_put (h->neighbours,
  468. &n->id, n,
  469. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
  470. return n;
  471. }
  472. /**
  473. * Iterator over hash map entries, for deleting state of a neighbour.
  474. *
  475. * @param cls the `struct GNUNET_TRANSPORT_Handle *`
  476. * @param key peer identity
  477. * @param value value in the hash map, the neighbour entry to delete
  478. * @return #GNUNET_YES if we should continue to
  479. * iterate,
  480. * #GNUNET_NO if not.
  481. */
  482. static int
  483. neighbour_delete (void *cls,
  484. const struct GNUNET_PeerIdentity *key, void *value)
  485. {
  486. struct GNUNET_TRANSPORT_Handle *handle = cls;
  487. struct Neighbour *n = value;
  488. if (NULL != handle->nd_cb)
  489. handle->nd_cb (handle->cls, &n->id);
  490. GNUNET_assert (NULL == n->th);
  491. GNUNET_assert (NULL == n->hn);
  492. GNUNET_assert (GNUNET_YES ==
  493. GNUNET_CONTAINER_multipeermap_remove (handle->neighbours, key,
  494. n));
  495. GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker);
  496. GNUNET_free (n);
  497. return GNUNET_YES;
  498. }
  499. /**
  500. * Function we use for handling incoming messages.
  501. *
  502. * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
  503. * @param msg message received, NULL on timeout or fatal error
  504. */
  505. static void
  506. demultiplexer (void *cls,
  507. const struct GNUNET_MessageHeader *msg)
  508. {
  509. struct GNUNET_TRANSPORT_Handle *h = cls;
  510. const struct DisconnectInfoMessage *dim;
  511. const struct ConnectInfoMessage *cim;
  512. const struct InboundMessage *im;
  513. const struct GNUNET_MessageHeader *imm;
  514. const struct SendOkMessage *okm;
  515. const struct QuotaSetMessage *qm;
  516. struct GNUNET_TRANSPORT_GetHelloHandle *hwl;
  517. struct GNUNET_TRANSPORT_GetHelloHandle *next_hwl;
  518. struct Neighbour *n;
  519. struct GNUNET_PeerIdentity me;
  520. uint16_t size;
  521. uint32_t bytes_msg;
  522. uint32_t bytes_physical;
  523. GNUNET_assert (NULL != h->client);
  524. if (GNUNET_YES == h->reconnecting)
  525. {
  526. return;
  527. }
  528. if (NULL == msg)
  529. {
  530. LOG (GNUNET_ERROR_TYPE_DEBUG,
  531. "Error receiving from transport service, disconnecting temporarily.\n");
  532. h->reconnecting = GNUNET_YES;
  533. disconnect_and_schedule_reconnect (h);
  534. return;
  535. }
  536. GNUNET_CLIENT_receive (h->client, &demultiplexer, h,
  537. GNUNET_TIME_UNIT_FOREVER_REL);
  538. size = ntohs (msg->size);
  539. switch (ntohs (msg->type))
  540. {
  541. case GNUNET_MESSAGE_TYPE_HELLO:
  542. if (GNUNET_OK !=
  543. GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg, &me))
  544. {
  545. GNUNET_break (0);
  546. break;
  547. }
  548. LOG (GNUNET_ERROR_TYPE_DEBUG,
  549. "Receiving (my own) HELLO message (%u bytes), I am `%4s'.\n",
  550. (unsigned int) size,
  551. GNUNET_i2s (&me));
  552. GNUNET_free_non_null (h->my_hello);
  553. h->my_hello = NULL;
  554. if (size < sizeof (struct GNUNET_MessageHeader))
  555. {
  556. GNUNET_break (0);
  557. break;
  558. }
  559. h->my_hello = GNUNET_copy_message (msg);
  560. hwl = h->hwl_head;
  561. while (NULL != hwl)
  562. {
  563. next_hwl = hwl->next;
  564. hwl->rec (hwl->rec_cls,
  565. h->my_hello);
  566. hwl = next_hwl;
  567. }
  568. break;
  569. case GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT:
  570. if (size < sizeof (struct ConnectInfoMessage))
  571. {
  572. GNUNET_break (0);
  573. break;
  574. }
  575. cim = (const struct ConnectInfoMessage *) msg;
  576. if (size !=
  577. sizeof (struct ConnectInfoMessage))
  578. {
  579. GNUNET_break (0);
  580. break;
  581. }
  582. LOG (GNUNET_ERROR_TYPE_DEBUG,
  583. "Receiving CONNECT message for `%4s'.\n",
  584. GNUNET_i2s (&cim->id));
  585. n = neighbour_find (h, &cim->id);
  586. if (NULL != n)
  587. {
  588. GNUNET_break (0);
  589. break;
  590. }
  591. n = neighbour_add (h, &cim->id);
  592. LOG (GNUNET_ERROR_TYPE_DEBUG,
  593. "Receiving CONNECT message for `%4s' with quota %u\n",
  594. GNUNET_i2s (&cim->id),
  595. ntohl (cim->quota_out.value__));
  596. GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
  597. cim->quota_out);
  598. if (h->nc_cb != NULL)
  599. h->nc_cb (h->cls, &n->id);
  600. break;
  601. case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT:
  602. if (size != sizeof (struct DisconnectInfoMessage))
  603. {
  604. GNUNET_break (0);
  605. break;
  606. }
  607. dim = (const struct DisconnectInfoMessage *) msg;
  608. GNUNET_break (ntohl (dim->reserved) == 0);
  609. LOG (GNUNET_ERROR_TYPE_DEBUG,
  610. "Receiving DISCONNECT message for `%4s'.\n",
  611. GNUNET_i2s (&dim->peer));
  612. n = neighbour_find (h, &dim->peer);
  613. if (NULL == n)
  614. {
  615. GNUNET_break (0);
  616. break;
  617. }
  618. neighbour_delete (h, &dim->peer, n);
  619. break;
  620. case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK:
  621. if (size != sizeof (struct SendOkMessage))
  622. {
  623. GNUNET_break (0);
  624. break;
  625. }
  626. okm = (const struct SendOkMessage *) msg;
  627. bytes_msg = ntohl (okm->bytes_msg);
  628. bytes_physical = ntohl (okm->bytes_physical);
  629. LOG (GNUNET_ERROR_TYPE_DEBUG,
  630. "Receiving SEND_OK message, transmission %s.\n",
  631. ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
  632. n = neighbour_find (h, &okm->peer);
  633. if (NULL == n)
  634. break;
  635. if (bytes_physical >= bytes_msg)
  636. {
  637. LOG (GNUNET_ERROR_TYPE_DEBUG,
  638. "Overhead for %u byte message: %u\n",
  639. bytes_msg,
  640. bytes_physical - bytes_msg);
  641. n->traffic_overhead += bytes_physical - bytes_msg;
  642. }
  643. GNUNET_break (GNUNET_NO == n->is_ready);
  644. n->is_ready = GNUNET_YES;
  645. if ((NULL != n->th) && (NULL == n->hn))
  646. {
  647. GNUNET_assert (NULL != n->th->timeout_task);
  648. GNUNET_SCHEDULER_cancel (n->th->timeout_task);
  649. n->th->timeout_task = NULL;
  650. /* we've been waiting for this (congestion, not quota,
  651. * caused delayed transmission) */
  652. n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap, n, 0);
  653. schedule_transmission (h);
  654. }
  655. break;
  656. case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV:
  657. if (size <
  658. sizeof (struct InboundMessage) + sizeof (struct GNUNET_MessageHeader))
  659. {
  660. GNUNET_break (0);
  661. break;
  662. }
  663. im = (const struct InboundMessage *) msg;
  664. imm = (const struct GNUNET_MessageHeader *) &im[1];
  665. if (ntohs (imm->size) + sizeof (struct InboundMessage) != size)
  666. {
  667. GNUNET_break (0);
  668. break;
  669. }
  670. LOG (GNUNET_ERROR_TYPE_DEBUG,
  671. "Received message of type %u from `%4s'.\n",
  672. ntohs (imm->type), GNUNET_i2s (&im->peer));
  673. n = neighbour_find (h, &im->peer);
  674. if (NULL == n)
  675. {
  676. GNUNET_break (0);
  677. break;
  678. }
  679. if (NULL != h->rec)
  680. h->rec (h->cls, &im->peer, imm);
  681. break;
  682. case GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA:
  683. if (size != sizeof (struct QuotaSetMessage))
  684. {
  685. GNUNET_break (0);
  686. break;
  687. }
  688. qm = (const struct QuotaSetMessage *) msg;
  689. n = neighbour_find (h, &qm->peer);
  690. if (NULL == n)
  691. break;
  692. LOG (GNUNET_ERROR_TYPE_DEBUG,
  693. "Receiving SET_QUOTA message for `%4s' with quota %u\n",
  694. GNUNET_i2s (&qm->peer),
  695. ntohl (qm->quota.value__));
  696. GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
  697. qm->quota);
  698. break;
  699. default:
  700. LOG (GNUNET_ERROR_TYPE_ERROR,
  701. _("Received unexpected message of type %u in %s:%u\n"),
  702. ntohs (msg->type),
  703. __FILE__,
  704. __LINE__);
  705. GNUNET_break (0);
  706. break;
  707. }
  708. }
  709. /**
  710. * A transmission request could not be satisfied because of
  711. * network congestion. Notify the initiator and clean up.
  712. *
  713. * @param cls the `struct GNUNET_TRANSPORT_TransmitHandle`
  714. * @param tc scheduler context
  715. */
  716. static void
  717. timeout_request_due_to_congestion (void *cls,
  718. const struct GNUNET_SCHEDULER_TaskContext *tc)
  719. {
  720. struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
  721. struct Neighbour *n = th->neighbour;
  722. n->th->timeout_task = NULL;
  723. GNUNET_assert (th == n->th);
  724. GNUNET_assert (NULL == n->hn);
  725. n->th = NULL;
  726. th->notify (th->notify_cls, 0, NULL);
  727. GNUNET_free (th);
  728. }
  729. /**
  730. * Transmit message(s) to service.
  731. *
  732. * @param cls handle to transport
  733. * @param size number of bytes available in @a buf
  734. * @param buf where to copy the message
  735. * @return number of bytes copied to @a buf
  736. */
  737. static size_t
  738. transport_notify_ready (void *cls, size_t size, void *buf)
  739. {
  740. struct GNUNET_TRANSPORT_Handle *h = cls;
  741. struct GNUNET_TRANSPORT_TransmitHandle *th;
  742. struct Neighbour *n;
  743. char *cbuf;
  744. struct OutboundMessage obm;
  745. size_t ret;
  746. size_t nret;
  747. size_t mret;
  748. GNUNET_assert (NULL != h->client);
  749. h->cth = NULL;
  750. if (NULL == buf)
  751. {
  752. /* transmission failed */
  753. disconnect_and_schedule_reconnect (h);
  754. return 0;
  755. }
  756. cbuf = buf;
  757. ret = 0;
  758. /* first send control messages */
  759. while ((NULL != (th = h->control_head)) && (th->notify_size <= size))
  760. {
  761. GNUNET_CONTAINER_DLL_remove (h->control_head,
  762. h->control_tail,
  763. th);
  764. nret = th->notify (th->notify_cls, size, &cbuf[ret]);
  765. LOG (GNUNET_ERROR_TYPE_DEBUG,
  766. "Added %u bytes of control message at %u\n",
  767. nret,
  768. ret);
  769. GNUNET_free (th);
  770. ret += nret;
  771. size -= nret;
  772. }
  773. /* then, if possible and no control messages pending, send data messages */
  774. while ((NULL == h->control_head) &&
  775. (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))))
  776. {
  777. if (GNUNET_YES != n->is_ready)
  778. {
  779. /* peer not ready, wait for notification! */
  780. GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
  781. n->hn = NULL;
  782. GNUNET_assert (NULL == n->th->timeout_task);
  783. n->th->timeout_task =
  784. GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
  785. (n->th->timeout),
  786. &timeout_request_due_to_congestion,
  787. n->th);
  788. continue;
  789. }
  790. th = n->th;
  791. if (th->notify_size + sizeof (struct OutboundMessage) > size)
  792. break; /* does not fit */
  793. if (GNUNET_BANDWIDTH_tracker_get_delay
  794. (&n->out_tracker, th->notify_size).rel_value_us > 0)
  795. break; /* too early */
  796. GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
  797. n->hn = NULL;
  798. n->th = NULL;
  799. n->is_ready = GNUNET_NO;
  800. GNUNET_assert (size >= sizeof (struct OutboundMessage));
  801. mret =
  802. th->notify (th->notify_cls, size - sizeof (struct OutboundMessage),
  803. &cbuf[ret + sizeof (struct OutboundMessage)]);
  804. GNUNET_assert (mret <= size - sizeof (struct OutboundMessage));
  805. if (mret != 0)
  806. {
  807. GNUNET_assert (mret + sizeof (struct OutboundMessage) <
  808. GNUNET_SERVER_MAX_MESSAGE_SIZE);
  809. obm.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
  810. obm.header.size = htons (mret + sizeof (struct OutboundMessage));
  811. obm.reserved = htonl (0);
  812. obm.timeout =
  813. GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
  814. (th->timeout));
  815. obm.peer = n->id;
  816. memcpy (&cbuf[ret], &obm, sizeof (struct OutboundMessage));
  817. ret += (mret + sizeof (struct OutboundMessage));
  818. size -= (mret + sizeof (struct OutboundMessage));
  819. GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker, mret);
  820. }
  821. GNUNET_free (th);
  822. }
  823. /* if there are more pending messages, try to schedule those */
  824. schedule_transmission (h);
  825. LOG (GNUNET_ERROR_TYPE_DEBUG,
  826. "Transmitting %u bytes to transport service\n",
  827. ret);
  828. return ret;
  829. }
  830. /**
  831. * Schedule the task to send one message, either from the control
  832. * list or the peer message queues to the service.
  833. *
  834. * @param cls transport service to schedule a transmission for
  835. * @param tc scheduler context
  836. */
  837. static void
  838. schedule_transmission_task (void *cls,
  839. const struct GNUNET_SCHEDULER_TaskContext *tc)
  840. {
  841. struct GNUNET_TRANSPORT_Handle *h = cls;
  842. size_t size;
  843. struct GNUNET_TRANSPORT_TransmitHandle *th;
  844. struct Neighbour *n;
  845. h->quota_task = NULL;
  846. GNUNET_assert (NULL != h->client);
  847. /* destroy all requests that have timed out */
  848. while ((NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) &&
  849. (0 == GNUNET_TIME_absolute_get_remaining (n->th->timeout).rel_value_us))
  850. {
  851. /* notify client that the request could not be satisfied within
  852. * the given time constraints */
  853. th = n->th;
  854. n->th = NULL;
  855. GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
  856. n->hn = NULL;
  857. LOG (GNUNET_ERROR_TYPE_DEBUG,
  858. "Signalling timeout for transmission to peer %s due to congestion\n",
  859. GNUNET_i2s (&n->id));
  860. GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
  861. GNUNET_free (th);
  862. }
  863. if (NULL != h->cth)
  864. return;
  865. if (NULL != h->control_head)
  866. {
  867. size = h->control_head->notify_size;
  868. }
  869. else
  870. {
  871. n = GNUNET_CONTAINER_heap_peek (h->ready_heap);
  872. if (NULL == n)
  873. return; /* no pending messages */
  874. size = n->th->notify_size + sizeof (struct OutboundMessage);
  875. }
  876. LOG (GNUNET_ERROR_TYPE_DEBUG,
  877. "Calling notify_transmit_ready\n");
  878. h->cth =
  879. GNUNET_CLIENT_notify_transmit_ready (h->client, size,
  880. GNUNET_TIME_UNIT_FOREVER_REL,
  881. GNUNET_NO, &transport_notify_ready,
  882. h);
  883. GNUNET_assert (NULL != h->cth);
  884. }
  885. /**
  886. * Schedule the task to send one message, either from the control
  887. * list or the peer message queues to the service.
  888. *
  889. * @param h transport service to schedule a transmission for
  890. */
  891. static void
  892. schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
  893. {
  894. struct GNUNET_TIME_Relative delay;
  895. struct Neighbour *n;
  896. GNUNET_assert (NULL != h->client);
  897. if (h->quota_task != NULL)
  898. {
  899. GNUNET_SCHEDULER_cancel (h->quota_task);
  900. h->quota_task = NULL;
  901. }
  902. if (NULL != h->control_head)
  903. delay = GNUNET_TIME_UNIT_ZERO;
  904. else if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))
  905. {
  906. delay =
  907. GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
  908. n->th->notify_size + n->traffic_overhead);
  909. n->traffic_overhead = 0;
  910. }
  911. else
  912. return; /* no work to be done */
  913. LOG (GNUNET_ERROR_TYPE_DEBUG,
  914. "Scheduling next transmission to service in %s\n",
  915. GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES));
  916. h->quota_task =
  917. GNUNET_SCHEDULER_add_delayed (delay, &schedule_transmission_task, h);
  918. }
  919. /**
  920. * Queue control request for transmission to the transport
  921. * service.
  922. *
  923. * @param h handle to the transport service
  924. * @param size number of bytes to be transmitted
  925. * @param notify function to call to get the content
  926. * @param notify_cls closure for @a notify
  927. * @return a `struct GNUNET_TRANSPORT_TransmitHandle`
  928. */
  929. static struct GNUNET_TRANSPORT_TransmitHandle *
  930. schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h,
  931. size_t size,
  932. GNUNET_TRANSPORT_TransmitReadyNotify notify,
  933. void *notify_cls)
  934. {
  935. struct GNUNET_TRANSPORT_TransmitHandle *th;
  936. LOG (GNUNET_ERROR_TYPE_DEBUG,
  937. "Control transmit of %u bytes requested\n",
  938. size);
  939. th = GNUNET_new (struct GNUNET_TRANSPORT_TransmitHandle);
  940. th->notify = notify;
  941. th->notify_cls = notify_cls;
  942. th->notify_size = size;
  943. GNUNET_CONTAINER_DLL_insert_tail (h->control_head, h->control_tail, th);
  944. schedule_transmission (h);
  945. return th;
  946. }
  947. /**
  948. * Transmit START message to service.
  949. *
  950. * @param cls unused
  951. * @param size number of bytes available in @a buf
  952. * @param buf where to copy the message
  953. * @return number of bytes copied to @a buf
  954. */
  955. static size_t
  956. send_start (void *cls, size_t size, void *buf)
  957. {
  958. struct GNUNET_TRANSPORT_Handle *h = cls;
  959. struct StartMessage s;
  960. uint32_t options;
  961. if (NULL == buf)
  962. {
  963. /* Can only be shutdown, just give up */
  964. LOG (GNUNET_ERROR_TYPE_DEBUG,
  965. "Shutdown while trying to transmit START request.\n");
  966. return 0;
  967. }
  968. LOG (GNUNET_ERROR_TYPE_DEBUG,
  969. "Transmitting START request.\n");
  970. GNUNET_assert (size >= sizeof (struct StartMessage));
  971. s.header.size = htons (sizeof (struct StartMessage));
  972. s.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_START);
  973. options = 0;
  974. if (h->check_self)
  975. options |= 1;
  976. if (h->rec != NULL)
  977. options |= 2;
  978. s.options = htonl (options);
  979. s.self = h->self;
  980. memcpy (buf, &s, sizeof (struct StartMessage));
  981. GNUNET_CLIENT_receive (h->client, &demultiplexer, h,
  982. GNUNET_TIME_UNIT_FOREVER_REL);
  983. return sizeof (struct StartMessage);
  984. }
  985. /**
  986. * Try again to connect to transport service.
  987. *
  988. * @param cls the handle to the transport service
  989. * @param tc scheduler context
  990. */
  991. static void
  992. reconnect (void *cls,
  993. const struct GNUNET_SCHEDULER_TaskContext *tc)
  994. {
  995. struct GNUNET_TRANSPORT_Handle *h = cls;
  996. h->reconnect_task = NULL;
  997. if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
  998. {
  999. /* shutdown, just give up */
  1000. return;
  1001. }
  1002. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1003. "Connecting to transport service.\n");
  1004. GNUNET_assert (NULL == h->client);
  1005. GNUNET_assert (NULL == h->control_head);
  1006. GNUNET_assert (NULL == h->control_tail);
  1007. h->reconnecting = GNUNET_NO;
  1008. h->client = GNUNET_CLIENT_connect ("transport", h->cfg);
  1009. GNUNET_assert (NULL != h->client);
  1010. schedule_control_transmit (h, sizeof (struct StartMessage),
  1011. &send_start, h);
  1012. }
  1013. /**
  1014. * Function that will schedule the job that will try
  1015. * to connect us again to the client.
  1016. *
  1017. * @param h transport service to reconnect
  1018. */
  1019. static void
  1020. disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
  1021. {
  1022. struct GNUNET_TRANSPORT_TransmitHandle *th;
  1023. GNUNET_assert (h->reconnect_task == NULL);
  1024. if (NULL != h->cth)
  1025. {
  1026. GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth);
  1027. h->cth = NULL;
  1028. }
  1029. if (NULL != h->client)
  1030. {
  1031. GNUNET_CLIENT_disconnect (h->client);
  1032. h->client = NULL;
  1033. /* LOG (GNUNET_ERROR_TYPE_ERROR,
  1034. "Client disconnect done \n");*/
  1035. }
  1036. /* Forget about all neighbours that we used to be connected to */
  1037. GNUNET_CONTAINER_multipeermap_iterate (h->neighbours,
  1038. &neighbour_delete, h);
  1039. if (h->quota_task != NULL)
  1040. {
  1041. GNUNET_SCHEDULER_cancel (h->quota_task);
  1042. h->quota_task = NULL;
  1043. }
  1044. while ((NULL != (th = h->control_head)))
  1045. {
  1046. GNUNET_CONTAINER_DLL_remove (h->control_head, h->control_tail, th);
  1047. th->notify (th->notify_cls, 0, NULL);
  1048. GNUNET_free (th);
  1049. }
  1050. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1051. "Scheduling task to reconnect to transport service in %s.\n",
  1052. GNUNET_STRINGS_relative_time_to_string(h->reconnect_delay, GNUNET_YES));
  1053. h->reconnect_task =
  1054. GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
  1055. h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
  1056. }
  1057. /**
  1058. * Cancel control request for transmission to the transport service.
  1059. *
  1060. * @param th handle to the transport service
  1061. * @param tth transmit handle to cancel
  1062. */
  1063. static void
  1064. cancel_control_transmit (struct GNUNET_TRANSPORT_Handle *th,
  1065. struct GNUNET_TRANSPORT_TransmitHandle *tth)
  1066. {
  1067. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1068. "Canceling transmit of contral transmission requested\n");
  1069. GNUNET_CONTAINER_DLL_remove (th->control_head, th->control_tail, tth);
  1070. GNUNET_free (tth);
  1071. }
  1072. /**
  1073. * Send #GNUNET_MESSAGE_TYPE_TRANSPORT_REQUEST_CONNECT message to the
  1074. * service.
  1075. *
  1076. * @param cls the `struct GNUNET_TRANSPORT_TryConnectHandle`
  1077. * @param size number of bytes available in @a buf
  1078. * @param buf where to copy the message
  1079. * @return number of bytes copied to @a buf
  1080. */
  1081. static size_t
  1082. send_try_connect (void *cls,
  1083. size_t size,
  1084. void *buf)
  1085. {
  1086. struct GNUNET_TRANSPORT_TryConnectHandle *tch = cls;
  1087. struct TransportRequestConnectMessage msg;
  1088. tch->tth = NULL;
  1089. if (NULL == buf)
  1090. {
  1091. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1092. "Discarding `%s' request to `%4s' due to error in transport service connection.\n",
  1093. "REQUEST_CONNECT",
  1094. GNUNET_i2s (&tch->pid));
  1095. if (NULL != tch->cb)
  1096. tch->cb (tch->cb_cls,
  1097. GNUNET_SYSERR);
  1098. GNUNET_TRANSPORT_try_connect_cancel (tch);
  1099. return 0;
  1100. }
  1101. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1102. "Transmitting `%s' request with respect to `%4s'.\n",
  1103. "REQUEST_CONNECT",
  1104. GNUNET_i2s (&tch->pid));
  1105. GNUNET_assert (size >= sizeof (struct TransportRequestConnectMessage));
  1106. msg.header.size = htons (sizeof (struct TransportRequestConnectMessage));
  1107. msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_REQUEST_CONNECT);
  1108. msg.reserved = htonl (0);
  1109. msg.peer = tch->pid;
  1110. memcpy (buf, &msg, sizeof (msg));
  1111. if (NULL != tch->cb)
  1112. tch->cb (tch->cb_cls, GNUNET_OK);
  1113. GNUNET_TRANSPORT_try_connect_cancel (tch);
  1114. return sizeof (struct TransportRequestConnectMessage);
  1115. }
  1116. /**
  1117. * Ask the transport service to establish a connection to
  1118. * the given peer.
  1119. *
  1120. * @param handle connection to transport service
  1121. * @param target who we should try to connect to
  1122. * @param cb callback to be called when request was transmitted to transport
  1123. * service
  1124. * @param cb_cls closure for the callback
  1125. * @return a `struct GNUNET_TRANSPORT_TryConnectHandle` handle or
  1126. * NULL on failure (cb will not be called)
  1127. */
  1128. struct GNUNET_TRANSPORT_TryConnectHandle *
  1129. GNUNET_TRANSPORT_try_connect (struct GNUNET_TRANSPORT_Handle *handle,
  1130. const struct GNUNET_PeerIdentity *target,
  1131. GNUNET_TRANSPORT_TryConnectCallback cb,
  1132. void *cb_cls)
  1133. {
  1134. struct GNUNET_TRANSPORT_TryConnectHandle *tch;
  1135. if (NULL == handle->client)
  1136. return NULL;
  1137. tch = GNUNET_new (struct GNUNET_TRANSPORT_TryConnectHandle);
  1138. tch->th = handle;
  1139. tch->pid = *target;
  1140. tch->cb = cb;
  1141. tch->cb_cls = cb_cls;
  1142. tch->tth = schedule_control_transmit (handle,
  1143. sizeof (struct TransportRequestConnectMessage),
  1144. &send_try_connect, tch);
  1145. GNUNET_CONTAINER_DLL_insert (handle->tc_head,
  1146. handle->tc_tail,
  1147. tch);
  1148. return tch;
  1149. }
  1150. /**
  1151. * Cancel the request to transport to try a connect
  1152. * Callback will not be called
  1153. *
  1154. * @param tch the handle to cancel
  1155. */
  1156. void
  1157. GNUNET_TRANSPORT_try_connect_cancel (struct GNUNET_TRANSPORT_TryConnectHandle *tch)
  1158. {
  1159. struct GNUNET_TRANSPORT_Handle *th;
  1160. th = tch->th;
  1161. if (NULL != tch->tth)
  1162. cancel_control_transmit (th, tch->tth);
  1163. GNUNET_CONTAINER_DLL_remove (th->tc_head,
  1164. th->tc_tail,
  1165. tch);
  1166. GNUNET_free (tch);
  1167. }
  1168. /**
  1169. * Send #GNUNET_MESSAGE_TYPE_TRANSPORT_REQUEST_DISCONNECT message to the
  1170. * service.
  1171. *
  1172. * @param cls the `struct GNUNET_TRANSPORT_TryDisconnectHandle`
  1173. * @param size number of bytes available in @a buf
  1174. * @param buf where to copy the message
  1175. * @return number of bytes copied to @a buf
  1176. */
  1177. static size_t
  1178. send_try_disconnect (void *cls,
  1179. size_t size,
  1180. void *buf)
  1181. {
  1182. struct GNUNET_TRANSPORT_TryDisconnectHandle *tdh = cls;
  1183. struct TransportRequestConnectMessage msg;
  1184. tdh->th = NULL;
  1185. if (NULL == buf)
  1186. {
  1187. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1188. "Discarding `%s' request to `%4s' due to error in transport service connection.\n",
  1189. "REQUEST_DISCONNECT",
  1190. GNUNET_i2s (&tdh->pid));
  1191. if (NULL != tdh->cb)
  1192. tdh->cb (tdh->cb_cls,
  1193. GNUNET_SYSERR);
  1194. GNUNET_TRANSPORT_try_disconnect_cancel (tdh);
  1195. return 0;
  1196. }
  1197. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1198. "Transmitting `%s' request with respect to `%4s'.\n",
  1199. "REQUEST_DISCONNECT",
  1200. GNUNET_i2s (&tdh->pid));
  1201. GNUNET_assert (size >= sizeof (struct TransportRequestDisconnectMessage));
  1202. msg.header.size = htons (sizeof (struct TransportRequestDisconnectMessage));
  1203. msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_REQUEST_DISCONNECT);
  1204. msg.reserved = htonl (0);
  1205. msg.peer = tdh->pid;
  1206. memcpy (buf, &msg, sizeof (msg));
  1207. if (NULL != tdh->cb)
  1208. tdh->cb (tdh->cb_cls, GNUNET_OK);
  1209. GNUNET_TRANSPORT_try_disconnect_cancel (tdh);
  1210. return sizeof (struct TransportRequestDisconnectMessage);
  1211. }
  1212. /**
  1213. * Ask the transport service to shutdown a connection to
  1214. * the given peer.
  1215. *
  1216. * @param handle connection to transport service
  1217. * @param target who we should try to connect to
  1218. * @param cb callback to be called when request was transmitted to transport
  1219. * service
  1220. * @param cb_cls closure for the callback @a cb
  1221. * @return a `struct GNUNET_TRANSPORT_TryDisconnectHandle` handle or
  1222. * NULL on failure (cb will not be called)
  1223. */
  1224. struct GNUNET_TRANSPORT_TryDisconnectHandle *
  1225. GNUNET_TRANSPORT_try_disconnect (struct GNUNET_TRANSPORT_Handle *handle,
  1226. const struct GNUNET_PeerIdentity *target,
  1227. GNUNET_TRANSPORT_TryDisconnectCallback cb,
  1228. void *cb_cls)
  1229. {
  1230. struct GNUNET_TRANSPORT_TryDisconnectHandle *tdh;
  1231. if (NULL == handle->client)
  1232. return NULL;
  1233. tdh = GNUNET_new (struct GNUNET_TRANSPORT_TryDisconnectHandle);
  1234. tdh->th = handle;
  1235. tdh->pid = *target;
  1236. tdh->cb = cb;
  1237. tdh->cb_cls = cb_cls;
  1238. tdh->tth = schedule_control_transmit (handle,
  1239. sizeof (struct TransportRequestDisconnectMessage),
  1240. &send_try_disconnect, tdh);
  1241. GNUNET_CONTAINER_DLL_insert (handle->td_head,
  1242. handle->td_tail,
  1243. tdh);
  1244. return tdh;
  1245. }
  1246. /**
  1247. * Cancel the request to transport to try a disconnect
  1248. * Callback will not be called
  1249. *
  1250. * @param tdh the handle to cancel
  1251. */
  1252. void
  1253. GNUNET_TRANSPORT_try_disconnect_cancel (struct GNUNET_TRANSPORT_TryDisconnectHandle *tdh)
  1254. {
  1255. struct GNUNET_TRANSPORT_Handle *th;
  1256. th = tdh->th;
  1257. if (NULL != tdh->tth)
  1258. cancel_control_transmit (th, tdh->tth);
  1259. GNUNET_CONTAINER_DLL_remove (th->td_head,
  1260. th->td_tail,
  1261. tdh);
  1262. GNUNET_free (tdh);
  1263. }
  1264. /**
  1265. * Send HELLO message to the service.
  1266. *
  1267. * @param cls the HELLO message to send
  1268. * @param size number of bytes available in @a buf
  1269. * @param buf where to copy the message
  1270. * @return number of bytes copied to @a buf
  1271. */
  1272. static size_t
  1273. send_hello (void *cls, size_t size, void *buf)
  1274. {
  1275. struct GNUNET_TRANSPORT_OfferHelloHandle *ohh = cls;
  1276. struct GNUNET_MessageHeader *msg = ohh->msg;
  1277. uint16_t ssize;
  1278. struct GNUNET_SCHEDULER_TaskContext tc;
  1279. tc.read_ready = NULL;
  1280. tc.write_ready = NULL;
  1281. tc.reason = GNUNET_SCHEDULER_REASON_TIMEOUT;
  1282. if (NULL == buf)
  1283. {
  1284. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1285. "Timeout while trying to transmit `%s' request.\n",
  1286. "HELLO");
  1287. if (NULL != ohh->cont)
  1288. ohh->cont (ohh->cls, &tc);
  1289. GNUNET_free (msg);
  1290. GNUNET_CONTAINER_DLL_remove (ohh->th->oh_head, ohh->th->oh_tail, ohh);
  1291. GNUNET_free (ohh);
  1292. return 0;
  1293. }
  1294. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1295. "Transmitting `%s' request.\n",
  1296. "HELLO");
  1297. ssize = ntohs (msg->size);
  1298. GNUNET_assert (size >= ssize);
  1299. memcpy (buf, msg, ssize);
  1300. GNUNET_free (msg);
  1301. tc.reason = GNUNET_SCHEDULER_REASON_READ_READY;
  1302. if (NULL != ohh->cont)
  1303. ohh->cont (ohh->cls, &tc);
  1304. GNUNET_CONTAINER_DLL_remove (ohh->th->oh_head, ohh->th->oh_tail, ohh);
  1305. GNUNET_free (ohh);
  1306. return ssize;
  1307. }
  1308. /**
  1309. * Send traffic metric message to the service.
  1310. *
  1311. * @param cls the message to send
  1312. * @param size number of bytes available in @a buf
  1313. * @param buf where to copy the message
  1314. * @return number of bytes copied to @a buf
  1315. */
  1316. static size_t
  1317. send_metric (void *cls,
  1318. size_t size,
  1319. void *buf)
  1320. {
  1321. struct TrafficMetricMessage *msg = cls;
  1322. uint16_t ssize;
  1323. if (NULL == buf)
  1324. {
  1325. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1326. "Timeout while trying to transmit TRAFFIC_METRIC request.\n");
  1327. GNUNET_free (msg);
  1328. return 0;
  1329. }
  1330. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1331. "Transmitting TRAFFIC_METRIC request.\n");
  1332. ssize = ntohs (msg->header.size);
  1333. GNUNET_assert (size >= ssize);
  1334. memcpy (buf, msg, ssize);
  1335. GNUNET_free (msg);
  1336. return ssize;
  1337. }
  1338. /**
  1339. * Set transport metrics for a peer and a direction
  1340. *
  1341. * @param handle transport handle
  1342. * @param peer the peer to set the metric for
  1343. * @param prop the performance metrics to set
  1344. * @param delay_in inbound delay to introduce
  1345. * @param delay_out outbound delay to introduce
  1346. *
  1347. * Note: Delay restrictions in receiving direction will be enforced
  1348. * with one message delay.
  1349. */
  1350. void
  1351. GNUNET_TRANSPORT_set_traffic_metric (struct GNUNET_TRANSPORT_Handle *handle,
  1352. const struct GNUNET_PeerIdentity *peer,
  1353. const struct GNUNET_ATS_Properties *prop,
  1354. struct GNUNET_TIME_Relative delay_in,
  1355. struct GNUNET_TIME_Relative delay_out)
  1356. {
  1357. struct TrafficMetricMessage *msg;
  1358. msg = GNUNET_new (struct TrafficMetricMessage);
  1359. msg->header.size = htons (sizeof (struct TrafficMetricMessage));
  1360. msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRAFFIC_METRIC);
  1361. msg->reserved = htonl (0);
  1362. msg->peer = *peer;
  1363. GNUNET_ATS_properties_hton (&msg->properties,
  1364. prop);
  1365. msg->delay_in = GNUNET_TIME_relative_hton (delay_in);
  1366. msg->delay_out = GNUNET_TIME_relative_hton (delay_out);
  1367. schedule_control_transmit (handle,
  1368. sizeof (struct TrafficMetricMessage),
  1369. &send_metric,
  1370. msg);
  1371. }
  1372. /**
  1373. * Offer the transport service the HELLO of another peer. Note that
  1374. * the transport service may just ignore this message if the HELLO is
  1375. * malformed or useless due to our local configuration.
  1376. *
  1377. * @param handle connection to transport service
  1378. * @param hello the hello message
  1379. * @param cont continuation to call when HELLO has been sent,
  1380. * tc reason #GNUNET_SCHEDULER_REASON_TIMEOUT for fail
  1381. * tc reasong #GNUNET_SCHEDULER_REASON_READ_READY for success
  1382. * @param cls closure for continuation
  1383. * @return a `struct GNUNET_TRANSPORT_OfferHelloHandle` handle or NULL on failure,
  1384. * in case of failure cont will not be called
  1385. *
  1386. */
  1387. struct GNUNET_TRANSPORT_OfferHelloHandle *
  1388. GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
  1389. const struct GNUNET_MessageHeader *hello,
  1390. GNUNET_SCHEDULER_TaskCallback cont, void *cls)
  1391. {
  1392. struct GNUNET_TRANSPORT_OfferHelloHandle *ohh;
  1393. struct GNUNET_MessageHeader *msg;
  1394. struct GNUNET_PeerIdentity peer;
  1395. uint16_t size;
  1396. if (NULL == handle->client)
  1397. return NULL;
  1398. GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
  1399. size = ntohs (hello->size);
  1400. GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader));
  1401. if (GNUNET_OK !=
  1402. GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello, &peer))
  1403. {
  1404. GNUNET_break (0);
  1405. return NULL;
  1406. }
  1407. msg = GNUNET_malloc (size);
  1408. memcpy (msg, hello, size);
  1409. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1410. "Offering `%s' message of `%4s' to transport for validation.\n", "HELLO",
  1411. GNUNET_i2s (&peer));
  1412. ohh = GNUNET_new (struct GNUNET_TRANSPORT_OfferHelloHandle);
  1413. ohh->th = handle;
  1414. ohh->cont = cont;
  1415. ohh->cls = cls;
  1416. ohh->msg = msg;
  1417. ohh->tth = schedule_control_transmit (handle, size,
  1418. &send_hello, ohh);
  1419. GNUNET_CONTAINER_DLL_insert (handle->oh_head, handle->oh_tail, ohh);
  1420. return ohh;
  1421. }
  1422. /**
  1423. * Cancel the request to transport to offer the HELLO message
  1424. *
  1425. * @param ohh the GNUNET_TRANSPORT_OfferHelloHandle to cancel
  1426. */
  1427. void
  1428. GNUNET_TRANSPORT_offer_hello_cancel (struct GNUNET_TRANSPORT_OfferHelloHandle *ohh)
  1429. {
  1430. struct GNUNET_TRANSPORT_Handle *th = ohh->th;
  1431. cancel_control_transmit (ohh->th, ohh->tth);
  1432. GNUNET_CONTAINER_DLL_remove (th->oh_head, th->oh_tail, ohh);
  1433. GNUNET_free (ohh->msg);
  1434. GNUNET_free (ohh);
  1435. }
  1436. /**
  1437. * Checks if a given peer is connected to us
  1438. *
  1439. * @param handle connection to transport service
  1440. * @param peer the peer to check
  1441. * @return #GNUNET_YES (connected) or #GNUNET_NO (disconnected)
  1442. */
  1443. int
  1444. GNUNET_TRANSPORT_check_peer_connected (struct GNUNET_TRANSPORT_Handle *handle,
  1445. const struct GNUNET_PeerIdentity *peer)
  1446. {
  1447. if (GNUNET_YES ==
  1448. GNUNET_CONTAINER_multipeermap_contains (handle->neighbours,
  1449. peer))
  1450. return GNUNET_YES;
  1451. return GNUNET_NO;
  1452. }
  1453. /**
  1454. * Task to call the HelloUpdateCallback of the GetHelloHandle
  1455. *
  1456. * @param cls the `struct GNUNET_TRANSPORT_GetHelloHandle`
  1457. * @param tc the scheduler task context
  1458. */
  1459. static void
  1460. call_hello_update_cb_async (void *cls,
  1461. const struct GNUNET_SCHEDULER_TaskContext *tc)
  1462. {
  1463. struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls;
  1464. GNUNET_assert (NULL != ghh->handle->my_hello);
  1465. GNUNET_assert (NULL != ghh->notify_task);
  1466. ghh->notify_task = NULL;
  1467. ghh->rec (ghh->rec_cls,
  1468. ghh->handle->my_hello);
  1469. }
  1470. /**
  1471. * Obtain the HELLO message for this peer. The callback given in this function
  1472. * is never called synchronously.
  1473. *
  1474. * @param handle connection to transport service
  1475. * @param rec function to call with the HELLO, sender will be our peer
  1476. * identity; message and sender will be NULL on timeout
  1477. * (handshake with transport service pending/failed).
  1478. * cost estimate will be 0.
  1479. * @param rec_cls closure for @a rec
  1480. * @return handle to cancel the operation
  1481. */
  1482. struct GNUNET_TRANSPORT_GetHelloHandle *
  1483. GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
  1484. GNUNET_TRANSPORT_HelloUpdateCallback rec,
  1485. void *rec_cls)
  1486. {
  1487. struct GNUNET_TRANSPORT_GetHelloHandle *hwl;
  1488. hwl = GNUNET_new (struct GNUNET_TRANSPORT_GetHelloHandle);
  1489. hwl->rec = rec;
  1490. hwl->rec_cls = rec_cls;
  1491. hwl->handle = handle;
  1492. GNUNET_CONTAINER_DLL_insert (handle->hwl_head, handle->hwl_tail, hwl);
  1493. if (NULL != handle->my_hello)
  1494. hwl->notify_task = GNUNET_SCHEDULER_add_now (&call_hello_update_cb_async,
  1495. hwl);
  1496. return hwl;
  1497. }
  1498. /**
  1499. * Stop receiving updates about changes to our HELLO message.
  1500. *
  1501. * @param ghh handle to cancel
  1502. */
  1503. void
  1504. GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_GetHelloHandle *ghh)
  1505. {
  1506. struct GNUNET_TRANSPORT_Handle *handle = ghh->handle;
  1507. if (NULL != ghh->notify_task)
  1508. GNUNET_SCHEDULER_cancel (ghh->notify_task);
  1509. GNUNET_CONTAINER_DLL_remove (handle->hwl_head, handle->hwl_tail, ghh);
  1510. GNUNET_free (ghh);
  1511. }
  1512. /**
  1513. * Connect to the transport service. Note that the connection may
  1514. * complete (or fail) asynchronously.
  1515. *
  1516. * @param cfg configuration to use
  1517. * @param self our own identity (API should check that it matches
  1518. * the identity found by transport), or NULL (no check)
  1519. * @param cls closure for the callbacks
  1520. * @param rec receive function to call
  1521. * @param nc function to call on connect events
  1522. * @param nd function to call on disconnect events
  1523. * @return NULL on error
  1524. */
  1525. struct GNUNET_TRANSPORT_Handle *
  1526. GNUNET_TRANSPORT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
  1527. const struct GNUNET_PeerIdentity *self, void *cls,
  1528. GNUNET_TRANSPORT_ReceiveCallback rec,
  1529. GNUNET_TRANSPORT_NotifyConnect nc,
  1530. GNUNET_TRANSPORT_NotifyDisconnect nd)
  1531. {
  1532. return GNUNET_TRANSPORT_connect2 (cfg, self, cls,
  1533. rec, nc, nd, NULL);
  1534. }
  1535. /**
  1536. * Connect to the transport service. Note that the connection may
  1537. * complete (or fail) asynchronously.
  1538. *
  1539. * @param cfg configuration to use
  1540. * @param self our own identity (API should check that it matches
  1541. * the identity found by transport), or NULL (no check)
  1542. * @param cls closure for the callbacks
  1543. * @param rec receive function to call
  1544. * @param nc function to call on connect events
  1545. * @param nd function to call on disconnect events
  1546. * @param neb function to call if we have excess bandwidth to a peer
  1547. * @return NULL on error
  1548. */
  1549. struct GNUNET_TRANSPORT_Handle *
  1550. GNUNET_TRANSPORT_connect2 (const struct GNUNET_CONFIGURATION_Handle *cfg,
  1551. const struct GNUNET_PeerIdentity *self, void *cls,
  1552. GNUNET_TRANSPORT_ReceiveCallback rec,
  1553. GNUNET_TRANSPORT_NotifyConnect nc,
  1554. GNUNET_TRANSPORT_NotifyDisconnect nd,
  1555. GNUNET_TRANSPORT_NotifyExcessBandwidth neb)
  1556. {
  1557. struct GNUNET_TRANSPORT_Handle *ret;
  1558. ret = GNUNET_new (struct GNUNET_TRANSPORT_Handle);
  1559. if (NULL != self)
  1560. {
  1561. ret->self = *self;
  1562. ret->check_self = GNUNET_YES;
  1563. }
  1564. ret->cfg = cfg;
  1565. ret->cls = cls;
  1566. ret->rec = rec;
  1567. ret->nc_cb = nc;
  1568. ret->nd_cb = nd;
  1569. ret->neb_cb = neb;
  1570. ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
  1571. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1572. "Connecting to transport service.\n");
  1573. ret->client = GNUNET_CLIENT_connect ("transport", cfg);
  1574. if (NULL == ret->client)
  1575. {
  1576. GNUNET_free (ret);
  1577. return NULL;
  1578. }
  1579. ret->neighbours =
  1580. GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE,
  1581. GNUNET_YES);
  1582. ret->ready_heap =
  1583. GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
  1584. schedule_control_transmit (ret, sizeof (struct StartMessage),
  1585. &send_start, ret);
  1586. return ret;
  1587. }
  1588. /**
  1589. * Disconnect from the transport service.
  1590. *
  1591. * @param handle handle to the service as returned from #GNUNET_TRANSPORT_connect()
  1592. */
  1593. void
  1594. GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
  1595. {
  1596. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1597. "Transport disconnect called!\n");
  1598. /* this disconnects all neighbours... */
  1599. if (handle->reconnect_task == NULL)
  1600. disconnect_and_schedule_reconnect (handle);
  1601. /* and now we stop trying to connect again... */
  1602. if (handle->reconnect_task != NULL)
  1603. {
  1604. GNUNET_SCHEDULER_cancel (handle->reconnect_task);
  1605. handle->reconnect_task = NULL;
  1606. }
  1607. GNUNET_CONTAINER_multipeermap_destroy (handle->neighbours);
  1608. handle->neighbours = NULL;
  1609. if (handle->quota_task != NULL)
  1610. {
  1611. GNUNET_SCHEDULER_cancel (handle->quota_task);
  1612. handle->quota_task = NULL;
  1613. }
  1614. GNUNET_free_non_null (handle->my_hello);
  1615. handle->my_hello = NULL;
  1616. GNUNET_assert (NULL == handle->tc_head);
  1617. GNUNET_assert (NULL == handle->tc_tail);
  1618. GNUNET_assert (NULL == handle->hwl_head);
  1619. GNUNET_assert (NULL == handle->hwl_tail);
  1620. GNUNET_CONTAINER_heap_destroy (handle->ready_heap);
  1621. handle->ready_heap = NULL;
  1622. GNUNET_free (handle);
  1623. }
  1624. /**
  1625. * Check if we could queue a message of the given size for
  1626. * transmission. The transport service will take both its
  1627. * internal buffers and bandwidth limits imposed by the
  1628. * other peer into consideration when answering this query.
  1629. *
  1630. * @param handle connection to transport service
  1631. * @param target who should receive the message
  1632. * @param size how big is the message we want to transmit?
  1633. * @param timeout after how long should we give up (and call
  1634. * notify with buf NULL and size 0)?
  1635. * @param notify function to call when we are ready to
  1636. * send such a message
  1637. * @param notify_cls closure for @a notify
  1638. * @return NULL if someone else is already waiting to be notified
  1639. * non-NULL if the notify callback was queued (can be used to cancel
  1640. * using #GNUNET_TRANSPORT_notify_transmit_ready_cancel)
  1641. */
  1642. struct GNUNET_TRANSPORT_TransmitHandle *
  1643. GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle *handle,
  1644. const struct GNUNET_PeerIdentity *target,
  1645. size_t size,
  1646. struct GNUNET_TIME_Relative timeout,
  1647. GNUNET_TRANSPORT_TransmitReadyNotify notify,
  1648. void *notify_cls)
  1649. {
  1650. struct Neighbour *n;
  1651. struct GNUNET_TRANSPORT_TransmitHandle *th;
  1652. struct GNUNET_TIME_Relative delay;
  1653. n = neighbour_find (handle, target);
  1654. if (NULL == n)
  1655. {
  1656. /* use GNUNET_TRANSPORT_try_connect first, only use this function
  1657. * once a connection has been established */
  1658. GNUNET_assert (0);
  1659. return NULL;
  1660. }
  1661. if (NULL != n->th)
  1662. {
  1663. /* attempt to send two messages at the same time to the same peer */
  1664. GNUNET_assert (0);
  1665. return NULL;
  1666. }
  1667. GNUNET_assert (NULL == n->hn);
  1668. th = GNUNET_new (struct GNUNET_TRANSPORT_TransmitHandle);
  1669. th->neighbour = n;
  1670. th->notify = notify;
  1671. th->notify_cls = notify_cls;
  1672. th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
  1673. th->notify_size = size;
  1674. n->th = th;
  1675. /* calculate when our transmission should be ready */
  1676. delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, size + n->traffic_overhead);
  1677. n->traffic_overhead = 0;
  1678. if (delay.rel_value_us > timeout.rel_value_us)
  1679. delay.rel_value_us = 0; /* notify immediately (with failure) */
  1680. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1681. "Bandwidth tracker allows next transmission to peer %s in %s\n",
  1682. GNUNET_i2s (target),
  1683. GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES));
  1684. n->hn = GNUNET_CONTAINER_heap_insert (handle->ready_heap, n, delay.rel_value_us);
  1685. schedule_transmission (handle);
  1686. return th;
  1687. }
  1688. /**
  1689. * Cancel the specified transmission-ready notification.
  1690. *
  1691. * @param th handle returned from #GNUNET_TRANSPORT_notify_transmit_ready()
  1692. */
  1693. void
  1694. GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct GNUNET_TRANSPORT_TransmitHandle *th)
  1695. {
  1696. struct Neighbour *n;
  1697. GNUNET_assert (NULL == th->next);
  1698. GNUNET_assert (NULL == th->prev);
  1699. n = th->neighbour;
  1700. GNUNET_assert (th == n->th);
  1701. n->th = NULL;
  1702. if (NULL != n->hn)
  1703. {
  1704. GNUNET_CONTAINER_heap_remove_node (n->hn);
  1705. n->hn = NULL;
  1706. }
  1707. else
  1708. {
  1709. GNUNET_assert (NULL != th->timeout_task);
  1710. GNUNET_SCHEDULER_cancel (th->timeout_task);
  1711. th->timeout_task = NULL;
  1712. }
  1713. GNUNET_free (th);
  1714. }
  1715. /* end of transport_api.c */