transport_api2_core.c 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801
  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2009-2013, 2016, 2018 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file transport/transport_api_core.c
  18. * @brief library to access the transport service for message exchange
  19. * @author Christian Grothoff
  20. */
  21. #include "platform.h"
  22. #include "gnunet_util_lib.h"
  23. #include "gnunet_constants.h"
  24. #include "gnunet_arm_service.h"
  25. #include "gnunet_hello_lib.h"
  26. #include "gnunet_protocols.h"
  27. #include "gnunet_transport_core_service.h"
  28. #include "transport.h"
  29. #define LOG(kind, ...) GNUNET_log_from (kind, "transport-api-core", __VA_ARGS__)
  30. /**
  31. * How large to start with for the hashmap of neighbours.
  32. */
  33. #define STARTING_NEIGHBOURS_SIZE 16
  34. /**
  35. * Window size. How many messages to the same target do we pass
  36. * to TRANSPORT without a SEND_OK in between? Small values limit
  37. * thoughput, large values will increase latency.
  38. *
  39. * FIXME-OPTIMIZE: find out what good values are experimentally,
  40. * maybe set adaptively (i.e. to observed available bandwidth).
  41. */
  42. #define SEND_WINDOW_SIZE 4
  43. /**
  44. * Entry in hash table of all of our current (connected) neighbours.
  45. */
  46. struct Neighbour
  47. {
  48. /**
  49. * Identity of this neighbour.
  50. */
  51. struct GNUNET_PeerIdentity id;
  52. /**
  53. * Overall transport handle.
  54. */
  55. struct GNUNET_TRANSPORT_CoreHandle *h;
  56. /**
  57. * Active message queue for the peer.
  58. */
  59. struct GNUNET_MQ_Handle *mq;
  60. /**
  61. * Envelope with the message we are currently transmitting (or NULL).
  62. */
  63. struct GNUNET_MQ_Envelope *env;
  64. /**
  65. * Closure for @e mq handlers.
  66. */
  67. void *handlers_cls;
  68. /**
  69. * How many messages can we still send to this peer before we should
  70. * throttle?
  71. */
  72. unsigned int ready_window;
  73. /**
  74. * Used to indicate our status if @e env is non-NULL. Set to
  75. * #GNUNET_YES if we did pass a message to the MQ and are waiting
  76. * for the call to #notify_send_done(). Set to #GNUNET_NO if the @e
  77. * ready_window is 0 and @e env is waiting for a
  78. * #GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK?
  79. */
  80. int16_t awaiting_done;
  81. /**
  82. * Size of the message in @e env.
  83. */
  84. uint16_t env_size;
  85. };
  86. /**
  87. * Handle for the transport service (includes all of the
  88. * state for the transport service).
  89. */
  90. struct GNUNET_TRANSPORT_CoreHandle
  91. {
  92. /**
  93. * Closure for the callbacks.
  94. */
  95. void *cls;
  96. /**
  97. * Functions to call for received data (template for
  98. * new message queues).
  99. */
  100. struct GNUNET_MQ_MessageHandler *handlers;
  101. /**
  102. * function to call on connect events
  103. */
  104. GNUNET_TRANSPORT_NotifyConnect nc_cb;
  105. /**
  106. * function to call on disconnect events
  107. */
  108. GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
  109. /**
  110. * My client connection to the transport service.
  111. */
  112. struct GNUNET_MQ_Handle *mq;
  113. /**
  114. * My configuration.
  115. */
  116. const struct GNUNET_CONFIGURATION_Handle *cfg;
  117. /**
  118. * Hash map of the current connected neighbours of this peer.
  119. * Maps peer identities to `struct Neighbour` entries.
  120. */
  121. struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
  122. /**
  123. * Peer identity as assumed by this process, or all zeros.
  124. */
  125. struct GNUNET_PeerIdentity self;
  126. /**
  127. * ID of the task trying to reconnect to the service.
  128. */
  129. struct GNUNET_SCHEDULER_Task *reconnect_task;
  130. /**
  131. * Delay until we try to reconnect.
  132. */
  133. struct GNUNET_TIME_Relative reconnect_delay;
  134. /**
  135. * Should we check that @e self matches what the service thinks?
  136. * (if #GNUNET_NO, then @e self is all zeros!).
  137. */
  138. int check_self;
  139. };
  140. /**
  141. * Function that will schedule the job that will try
  142. * to connect us again to the client.
  143. *
  144. * @param h transport service to reconnect
  145. */
  146. static void
  147. disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h);
  148. /**
  149. * Get the neighbour list entry for the given peer
  150. *
  151. * @param h our context
  152. * @param peer peer to look up
  153. * @return NULL if no such peer entry exists
  154. */
  155. static struct Neighbour *
  156. neighbour_find (struct GNUNET_TRANSPORT_CoreHandle *h,
  157. const struct GNUNET_PeerIdentity *peer)
  158. {
  159. return GNUNET_CONTAINER_multipeermap_get (h->neighbours, peer);
  160. }
  161. /**
  162. * Iterator over hash map entries, for deleting state of a neighbour.
  163. *
  164. * @param cls the `struct GNUNET_TRANSPORT_CoreHandle *`
  165. * @param key peer identity
  166. * @param value value in the hash map, the neighbour entry to delete
  167. * @return #GNUNET_YES if we should continue to
  168. * iterate,
  169. * #GNUNET_NO if not.
  170. */
  171. static int
  172. neighbour_delete (void *cls, const struct GNUNET_PeerIdentity *key, void *value)
  173. {
  174. struct GNUNET_TRANSPORT_CoreHandle *handle = cls;
  175. struct Neighbour *n = value;
  176. LOG (GNUNET_ERROR_TYPE_DEBUG,
  177. "Dropping entry for neighbour `%s'.\n",
  178. GNUNET_i2s (key));
  179. if (NULL != handle->nd_cb)
  180. handle->nd_cb (handle->cls, &n->id, n->handlers_cls);
  181. if (NULL != n->env)
  182. {
  183. GNUNET_MQ_send_cancel (n->env);
  184. n->env = NULL;
  185. }
  186. GNUNET_MQ_destroy (n->mq);
  187. GNUNET_assert (NULL == n->mq);
  188. GNUNET_assert (
  189. GNUNET_YES ==
  190. GNUNET_CONTAINER_multipeermap_remove (handle->neighbours, key, n));
  191. GNUNET_free (n);
  192. return GNUNET_YES;
  193. }
  194. /**
  195. * Generic error handler, called with the appropriate
  196. * error code and the same closure specified at the creation of
  197. * the message queue.
  198. * Not every message queue implementation supports an error handler.
  199. *
  200. * @param cls closure with the `struct GNUNET_TRANSPORT_CoreHandle *`
  201. * @param error error code
  202. */
  203. static void
  204. mq_error_handler (void *cls, enum GNUNET_MQ_Error error)
  205. {
  206. struct GNUNET_TRANSPORT_CoreHandle *h = cls;
  207. LOG (GNUNET_ERROR_TYPE_DEBUG,
  208. "Error receiving from transport service, disconnecting temporarily.\n");
  209. disconnect_and_schedule_reconnect (h);
  210. }
  211. /**
  212. * A message from the handler's message queue to a neighbour was
  213. * transmitted. Now trigger (possibly delayed) notification of the
  214. * neighbour's message queue that we are done and thus ready for
  215. * the next message. Note that the MQ being ready is independent
  216. * of the send window, as we may queue many messages and simply
  217. * not pass them to TRANSPORT if the send window is insufficient.
  218. *
  219. * @param cls the `struct Neighbour` where the message was sent
  220. */
  221. static void
  222. notify_send_done (void *cls)
  223. {
  224. struct Neighbour *n = cls;
  225. n->awaiting_done = GNUNET_NO;
  226. n->env = NULL;
  227. GNUNET_MQ_impl_send_continue (n->mq);
  228. }
  229. /**
  230. * We have an envelope waiting for transmission at @a n, and
  231. * our transmission window is positive. Perform the transmission.
  232. *
  233. * @param n neighbour to perform transmission for
  234. */
  235. static void
  236. do_send (struct Neighbour *n)
  237. {
  238. GNUNET_assert (0 < n->ready_window);
  239. GNUNET_assert (NULL != n->env);
  240. n->ready_window--;
  241. n->awaiting_done = GNUNET_YES;
  242. GNUNET_MQ_notify_sent (n->env, &notify_send_done, n);
  243. GNUNET_MQ_send (n->h->mq, n->env);
  244. LOG (GNUNET_ERROR_TYPE_DEBUG,
  245. "Passed message of type %u for neighbour `%s' to TRANSPORT.\n",
  246. ntohs (GNUNET_MQ_env_get_msg (n->env)->type),
  247. GNUNET_i2s (&n->id));
  248. }
  249. /**
  250. * Implement sending functionality of a message queue.
  251. * Called one message at a time. Should send the @a msg
  252. * to the transport service and then notify the queue
  253. * once we are ready for the next one.
  254. *
  255. * @param mq the message queue
  256. * @param msg the message to send
  257. * @param impl_state state of the implementation
  258. */
  259. static void
  260. mq_send_impl (struct GNUNET_MQ_Handle *mq,
  261. const struct GNUNET_MessageHeader *msg,
  262. void *impl_state)
  263. {
  264. struct Neighbour *n = impl_state;
  265. struct OutboundMessage *obm;
  266. uint16_t msize;
  267. msize = ntohs (msg->size);
  268. if (msize >= GNUNET_MAX_MESSAGE_SIZE - sizeof(*obm))
  269. {
  270. GNUNET_break (0);
  271. GNUNET_MQ_impl_send_continue (mq);
  272. return;
  273. }
  274. LOG (GNUNET_ERROR_TYPE_DEBUG,
  275. "CORE requested transmission of message of type %u to neighbour `%s'.\n",
  276. ntohs (msg->type),
  277. GNUNET_i2s (&n->id));
  278. GNUNET_assert (NULL == n->env);
  279. n->env =
  280. GNUNET_MQ_msg_nested_mh (obm, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, msg);
  281. n->env_size = ntohs (msg->size);
  282. {
  283. struct GNUNET_MQ_Envelope *env;
  284. env = GNUNET_MQ_get_current_envelope (mq);
  285. obm->priority = htonl ((uint32_t) GNUNET_MQ_env_get_options (env));
  286. }
  287. obm->peer = n->id;
  288. if (0 == n->ready_window)
  289. {
  290. LOG (GNUNET_ERROR_TYPE_DEBUG,
  291. "Flow control delays transmission to CORE until we see SEND_OK.\n");
  292. return; /* can't send yet, need to wait for SEND_OK */
  293. }
  294. do_send (n);
  295. }
  296. /**
  297. * Handle destruction of a message queue. Implementations must not
  298. * free @a mq, but should take care of @a impl_state.
  299. *
  300. * @param mq the message queue to destroy
  301. * @param impl_state state of the implementation
  302. */
  303. static void
  304. mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
  305. {
  306. struct Neighbour *n = impl_state;
  307. GNUNET_assert (mq == n->mq);
  308. n->mq = NULL;
  309. }
  310. /**
  311. * Implementation function that cancels the currently sent message.
  312. * Should basically undo whatever #mq_send_impl() did.
  313. *
  314. * @param mq message queue
  315. * @param impl_state state specific to the implementation
  316. */
  317. static void
  318. mq_cancel_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
  319. {
  320. struct Neighbour *n = impl_state;
  321. n->ready_window++;
  322. if (GNUNET_YES == n->awaiting_done)
  323. {
  324. GNUNET_MQ_send_cancel (n->env);
  325. n->env = NULL;
  326. n->awaiting_done = GNUNET_NO;
  327. }
  328. else
  329. {
  330. GNUNET_assert (0 == n->ready_window);
  331. n->env = NULL;
  332. }
  333. }
  334. /**
  335. * We had an error processing a message we forwarded from a peer to
  336. * the CORE service. We should just complain about it but otherwise
  337. * continue processing.
  338. *
  339. * @param cls closure
  340. * @param error error code
  341. */
  342. static void
  343. peer_mq_error_handler (void *cls, enum GNUNET_MQ_Error error)
  344. {
  345. /* struct Neighbour *n = cls; */
  346. GNUNET_break_op (0);
  347. }
  348. /**
  349. * Function we use for handling incoming connect messages.
  350. *
  351. * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
  352. * @param cim message received
  353. */
  354. static void
  355. handle_connect (void *cls, const struct ConnectInfoMessage *cim)
  356. {
  357. struct GNUNET_TRANSPORT_CoreHandle *h = cls;
  358. struct Neighbour *n;
  359. LOG (GNUNET_ERROR_TYPE_DEBUG,
  360. "Receiving CONNECT message for `%s'\n",
  361. GNUNET_i2s (&cim->id));
  362. n = neighbour_find (h, &cim->id);
  363. if (NULL != n)
  364. {
  365. GNUNET_break (0);
  366. disconnect_and_schedule_reconnect (h);
  367. return;
  368. }
  369. n = GNUNET_new (struct Neighbour);
  370. n->id = cim->id;
  371. n->h = h;
  372. n->ready_window = SEND_WINDOW_SIZE;
  373. GNUNET_assert (GNUNET_OK ==
  374. GNUNET_CONTAINER_multipeermap_put (
  375. h->neighbours,
  376. &n->id,
  377. n,
  378. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
  379. n->mq = GNUNET_MQ_queue_for_callbacks (&mq_send_impl,
  380. &mq_destroy_impl,
  381. &mq_cancel_impl,
  382. n,
  383. h->handlers,
  384. &peer_mq_error_handler,
  385. n);
  386. if (NULL != h->nc_cb)
  387. {
  388. n->handlers_cls = h->nc_cb (h->cls, &n->id, n->mq);
  389. GNUNET_MQ_set_handlers_closure (n->mq, n->handlers_cls);
  390. }
  391. }
  392. /**
  393. * Function we use for handling incoming disconnect messages.
  394. *
  395. * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
  396. * @param dim message received
  397. */
  398. static void
  399. handle_disconnect (void *cls, const struct DisconnectInfoMessage *dim)
  400. {
  401. struct GNUNET_TRANSPORT_CoreHandle *h = cls;
  402. struct Neighbour *n;
  403. GNUNET_break (ntohl (dim->reserved) == 0);
  404. LOG (GNUNET_ERROR_TYPE_DEBUG,
  405. "Receiving DISCONNECT message for `%s'.\n",
  406. GNUNET_i2s (&dim->peer));
  407. n = neighbour_find (h, &dim->peer);
  408. if (NULL == n)
  409. {
  410. GNUNET_break (0);
  411. disconnect_and_schedule_reconnect (h);
  412. return;
  413. }
  414. GNUNET_assert (GNUNET_YES == neighbour_delete (h, &dim->peer, n));
  415. }
  416. /**
  417. * Function we use for handling incoming send-ok messages.
  418. *
  419. * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
  420. * @param okm message received
  421. */
  422. static void
  423. handle_send_ok (void *cls, const struct SendOkMessage *okm)
  424. {
  425. struct GNUNET_TRANSPORT_CoreHandle *h = cls;
  426. struct Neighbour *n;
  427. LOG (GNUNET_ERROR_TYPE_DEBUG,
  428. "Receiving SEND_OK message for transmission to %s\n",
  429. GNUNET_i2s (&okm->peer));
  430. n = neighbour_find (h, &okm->peer);
  431. if (NULL == n)
  432. {
  433. /* We should never get a 'SEND_OK' for a peer that we are not
  434. connected to */
  435. GNUNET_break (0);
  436. disconnect_and_schedule_reconnect (h);
  437. return;
  438. }
  439. n->ready_window++;
  440. if ((NULL != n->env) && (1 == n->ready_window))
  441. do_send (n);
  442. }
  443. /**
  444. * Function we use for checking incoming "inbound" messages.
  445. *
  446. * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
  447. * @param im message received
  448. */
  449. static int
  450. check_recv (void *cls, const struct InboundMessage *im)
  451. {
  452. const struct GNUNET_MessageHeader *imm;
  453. uint16_t size;
  454. size = ntohs (im->header.size) - sizeof(*im);
  455. if (size < sizeof(struct GNUNET_MessageHeader))
  456. {
  457. GNUNET_break (0);
  458. return GNUNET_SYSERR;
  459. }
  460. imm = (const struct GNUNET_MessageHeader *) &im[1];
  461. if (ntohs (imm->size) != size)
  462. {
  463. GNUNET_break (0);
  464. return GNUNET_SYSERR;
  465. }
  466. return GNUNET_OK;
  467. }
  468. /**
  469. * Function we use for handling incoming messages.
  470. *
  471. * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
  472. * @param im message received
  473. */
  474. static void
  475. handle_recv (void *cls, const struct InboundMessage *im)
  476. {
  477. struct GNUNET_TRANSPORT_CoreHandle *h = cls;
  478. const struct GNUNET_MessageHeader *imm =
  479. (const struct GNUNET_MessageHeader *) &im[1];
  480. struct Neighbour *n;
  481. LOG (GNUNET_ERROR_TYPE_DEBUG,
  482. "Received message of type %u with %u bytes from `%s'.\n",
  483. (unsigned int) ntohs (imm->type),
  484. (unsigned int) ntohs (imm->size),
  485. GNUNET_i2s (&im->peer));
  486. n = neighbour_find (h, &im->peer);
  487. if (NULL == n)
  488. {
  489. GNUNET_break (0);
  490. disconnect_and_schedule_reconnect (h);
  491. return;
  492. }
  493. GNUNET_MQ_inject_message (n->mq, imm);
  494. }
  495. /**
  496. * Try again to connect to transport service.
  497. *
  498. * @param cls the handle to the transport service
  499. */
  500. static void
  501. reconnect (void *cls)
  502. {
  503. struct GNUNET_TRANSPORT_CoreHandle *h = cls;
  504. struct GNUNET_MQ_MessageHandler handlers[] =
  505. { GNUNET_MQ_hd_fixed_size (connect,
  506. GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT,
  507. struct ConnectInfoMessage,
  508. h),
  509. GNUNET_MQ_hd_fixed_size (disconnect,
  510. GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT,
  511. struct DisconnectInfoMessage,
  512. h),
  513. GNUNET_MQ_hd_fixed_size (send_ok,
  514. GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK,
  515. struct SendOkMessage,
  516. h),
  517. GNUNET_MQ_hd_var_size (recv,
  518. GNUNET_MESSAGE_TYPE_TRANSPORT_RECV,
  519. struct InboundMessage,
  520. h),
  521. GNUNET_MQ_handler_end () };
  522. struct GNUNET_MQ_Envelope *env;
  523. struct StartMessage *s;
  524. uint32_t options;
  525. h->reconnect_task = NULL;
  526. LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n");
  527. GNUNET_assert (NULL == h->mq);
  528. h->mq =
  529. GNUNET_CLIENT_connect (h->cfg, "transport", handlers, &mq_error_handler, h);
  530. if (NULL == h->mq)
  531. return;
  532. env = GNUNET_MQ_msg (s, GNUNET_MESSAGE_TYPE_TRANSPORT_START);
  533. options = 0;
  534. if (h->check_self)
  535. options |= 1;
  536. if (NULL != h->handlers)
  537. options |= 2;
  538. s->options = htonl (options);
  539. s->self = h->self;
  540. GNUNET_MQ_send (h->mq, env);
  541. }
  542. /**
  543. * Disconnect from the transport service.
  544. *
  545. * @param h transport service to reconnect
  546. */
  547. static void
  548. disconnect (struct GNUNET_TRANSPORT_CoreHandle *h)
  549. {
  550. GNUNET_CONTAINER_multipeermap_iterate (h->neighbours, &neighbour_delete, h);
  551. if (NULL != h->mq)
  552. {
  553. GNUNET_MQ_destroy (h->mq);
  554. h->mq = NULL;
  555. }
  556. }
  557. /**
  558. * Function that will schedule the job that will try
  559. * to connect us again to the client.
  560. *
  561. * @param h transport service to reconnect
  562. */
  563. static void
  564. disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h)
  565. {
  566. GNUNET_assert (NULL == h->reconnect_task);
  567. disconnect (h);
  568. LOG (GNUNET_ERROR_TYPE_DEBUG,
  569. "Scheduling task to reconnect to transport service in %s.\n",
  570. GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES));
  571. h->reconnect_task =
  572. GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
  573. h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
  574. }
  575. /**
  576. * Checks if a given peer is connected to us and get the message queue.
  577. *
  578. * @param handle connection to transport service
  579. * @param peer the peer to check
  580. * @return NULL if disconnected, otherwise message queue for @a peer
  581. */
  582. struct GNUNET_MQ_Handle *
  583. GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle,
  584. const struct GNUNET_PeerIdentity *peer)
  585. {
  586. struct Neighbour *n;
  587. n = neighbour_find (handle, peer);
  588. if (NULL == n)
  589. return NULL;
  590. return n->mq;
  591. }
  592. /**
  593. * Notification from the CORE service to the TRANSPORT service
  594. * that the CORE service has finished processing a message from
  595. * TRANSPORT (via the @code{handlers} of #GNUNET_TRANSPORT_core_connect())
  596. * and that it is thus now OK for TRANSPORT to send more messages
  597. * for @a pid.
  598. *
  599. * Used to provide flow control, this is our equivalent to
  600. * #GNUNET_SERVICE_client_continue() of an ordinary service.
  601. *
  602. * Note that due to the use of a window, TRANSPORT may send multiple
  603. * messages destined for the same peer even without an intermediate
  604. * call to this function. However, CORE must still call this function
  605. * once per message received, as otherwise eventually the window will
  606. * be full and TRANSPORT will stop providing messages to CORE for @a
  607. * pid.
  608. *
  609. * @param ch core handle
  610. * @param pid which peer was the message from that was fully processed by CORE
  611. */
  612. void
  613. GNUNET_TRANSPORT_core_receive_continue (struct GNUNET_TRANSPORT_CoreHandle *ch,
  614. const struct GNUNET_PeerIdentity *pid)
  615. {
  616. struct GNUNET_MQ_Envelope *env;
  617. struct RecvOkMessage *rok;
  618. LOG (GNUNET_ERROR_TYPE_DEBUG,
  619. "Message for %s finished CORE processing, sending RECV_OK.\n",
  620. GNUNET_i2s (pid));
  621. if (NULL == ch->mq)
  622. return;
  623. env = GNUNET_MQ_msg (rok, GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK);
  624. rok->increase_window_delta = htonl (1);
  625. rok->peer = *pid;
  626. GNUNET_MQ_send (ch->mq, env);
  627. }
  628. /**
  629. * Connect to the transport service. Note that the connection may
  630. * complete (or fail) asynchronously.
  631. *
  632. * @param cfg configuration to use
  633. * @param self our own identity (API should check that it matches
  634. * the identity found by transport), or NULL (no check)
  635. * @param cls closure for the callbacks
  636. * @param rec receive function to call
  637. * @param nc function to call on connect events
  638. * @param nd function to call on disconnect events
  639. * @return NULL on error
  640. */
  641. struct GNUNET_TRANSPORT_CoreHandle *
  642. GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
  643. const struct GNUNET_PeerIdentity *self,
  644. const struct GNUNET_MQ_MessageHandler *handlers,
  645. void *cls,
  646. GNUNET_TRANSPORT_NotifyConnect nc,
  647. GNUNET_TRANSPORT_NotifyDisconnect nd)
  648. {
  649. struct GNUNET_TRANSPORT_CoreHandle *h;
  650. unsigned int i;
  651. h = GNUNET_new (struct GNUNET_TRANSPORT_CoreHandle);
  652. if (NULL != self)
  653. {
  654. h->self = *self;
  655. h->check_self = GNUNET_YES;
  656. }
  657. h->cfg = cfg;
  658. h->cls = cls;
  659. h->nc_cb = nc;
  660. h->nd_cb = nd;
  661. h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
  662. if (NULL != handlers)
  663. {
  664. for (i = 0; NULL != handlers[i].cb; i++)
  665. ;
  666. h->handlers = GNUNET_new_array (i + 1, struct GNUNET_MQ_MessageHandler);
  667. GNUNET_memcpy (h->handlers,
  668. handlers,
  669. i * sizeof(struct GNUNET_MQ_MessageHandler));
  670. }
  671. LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service\n");
  672. reconnect (h);
  673. if (NULL == h->mq)
  674. {
  675. GNUNET_free (h->handlers);
  676. GNUNET_free (h);
  677. return NULL;
  678. }
  679. h->neighbours =
  680. GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE, GNUNET_YES);
  681. return h;
  682. }
  683. /**
  684. * Disconnect from the transport service.
  685. *
  686. * @param handle handle to the service as returned from
  687. * #GNUNET_TRANSPORT_core_connect()
  688. */
  689. void
  690. GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle)
  691. {
  692. LOG (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n");
  693. /* this disconnects all neighbours... */
  694. disconnect (handle);
  695. /* and now we stop trying to connect again... */
  696. if (NULL != handle->reconnect_task)
  697. {
  698. GNUNET_SCHEDULER_cancel (handle->reconnect_task);
  699. handle->reconnect_task = NULL;
  700. }
  701. GNUNET_CONTAINER_multipeermap_destroy (handle->neighbours);
  702. handle->neighbours = NULL;
  703. GNUNET_free (handle->handlers);
  704. handle->handlers = NULL;
  705. GNUNET_free (handle);
  706. }
  707. /* end of transport_api_core.c */