gnunet-service-fs_cadet_client.c 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728
  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2012, 2013 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file fs/gnunet-service-fs_cadet_client.c
  18. * @brief non-anonymous file-transfer
  19. * @author Christian Grothoff
  20. *
  21. * TODO:
  22. * - PORT is set to old application type, unsure if we should keep
  23. * it that way (fine for now)
  24. */
  25. #include "platform.h"
  26. #include "gnunet_constants.h"
  27. #include "gnunet_util_lib.h"
  28. #include "gnunet_cadet_service.h"
  29. #include "gnunet_protocols.h"
  30. #include "gnunet_applications.h"
  31. #include "gnunet-service-fs.h"
  32. #include "gnunet-service-fs_indexing.h"
  33. #include "gnunet-service-fs_cadet.h"
  34. /**
  35. * After how long do we reset connections without replies?
  36. */
  37. #define CLIENT_RETRY_TIMEOUT \
  38. GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
  39. /**
  40. * Handle for a cadet to another peer.
  41. */
  42. struct CadetHandle;
  43. /**
  44. * Handle for a request that is going out via cadet API.
  45. */
  46. struct GSF_CadetRequest
  47. {
  48. /**
  49. * DLL.
  50. */
  51. struct GSF_CadetRequest *next;
  52. /**
  53. * DLL.
  54. */
  55. struct GSF_CadetRequest *prev;
  56. /**
  57. * Which cadet is this request associated with?
  58. */
  59. struct CadetHandle *mh;
  60. /**
  61. * Function to call with the result.
  62. */
  63. GSF_CadetReplyProcessor proc;
  64. /**
  65. * Closure for @e proc
  66. */
  67. void *proc_cls;
  68. /**
  69. * Query to transmit to the other peer.
  70. */
  71. struct GNUNET_HashCode query;
  72. /**
  73. * Desired type for the reply.
  74. */
  75. enum GNUNET_BLOCK_Type type;
  76. /**
  77. * Did we transmit this request already? #GNUNET_YES if we are
  78. * in the 'waiting_map', #GNUNET_NO if we are in the 'pending' DLL.
  79. */
  80. int was_transmitted;
  81. };
  82. /**
  83. * Handle for a cadet to another peer.
  84. */
  85. struct CadetHandle
  86. {
  87. /**
  88. * Head of DLL of pending requests on this cadet.
  89. */
  90. struct GSF_CadetRequest *pending_head;
  91. /**
  92. * Tail of DLL of pending requests on this cadet.
  93. */
  94. struct GSF_CadetRequest *pending_tail;
  95. /**
  96. * Map from query to `struct GSF_CadetRequest`s waiting for
  97. * a reply.
  98. */
  99. struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
  100. /**
  101. * Channel to the other peer.
  102. */
  103. struct GNUNET_CADET_Channel *channel;
  104. /**
  105. * Which peer does this cadet go to?
  106. */
  107. struct GNUNET_PeerIdentity target;
  108. /**
  109. * Task to kill inactive cadets (we keep them around for
  110. * a few seconds to give the application a chance to give
  111. * us another query).
  112. */
  113. struct GNUNET_SCHEDULER_Task *timeout_task;
  114. /**
  115. * Task to reset cadets that had errors (asynchronously,
  116. * as we may not be able to do it immediately during a
  117. * callback from the cadet API).
  118. */
  119. struct GNUNET_SCHEDULER_Task *reset_task;
  120. };
  121. /**
  122. * Cadet channel for creating outbound channels.
  123. */
  124. struct GNUNET_CADET_Handle *cadet_handle;
  125. /**
  126. * Map from peer identities to 'struct CadetHandles' with cadet
  127. * channels to those peers.
  128. */
  129. struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
  130. /* ********************* client-side code ************************* */
  131. /**
  132. * Transmit pending requests via the cadet.
  133. *
  134. * @param cls `struct CadetHandle` to process
  135. */
  136. static void
  137. transmit_pending (void *cls);
  138. /**
  139. * Iterator called on each entry in a waiting map to
  140. * move it back to the pending list.
  141. *
  142. * @param cls the `struct CadetHandle`
  143. * @param key the key of the entry in the map (the query)
  144. * @param value the `struct GSF_CadetRequest` to move to pending
  145. * @return #GNUNET_YES (continue to iterate)
  146. */
  147. static int
  148. move_to_pending (void *cls, const struct GNUNET_HashCode *key, void *value)
  149. {
  150. struct CadetHandle *mh = cls;
  151. struct GSF_CadetRequest *sr = value;
  152. GNUNET_assert (
  153. GNUNET_YES ==
  154. GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map, key, value));
  155. GNUNET_CONTAINER_DLL_insert (mh->pending_head, mh->pending_tail, sr);
  156. sr->was_transmitted = GNUNET_NO;
  157. return GNUNET_YES;
  158. }
  159. /**
  160. * Functions with this signature are called whenever a complete reply
  161. * is received.
  162. *
  163. * @param cls closure with the `struct CadetHandle`
  164. * @param srm the actual message
  165. * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
  166. */
  167. static int
  168. check_reply (void *cls, const struct CadetReplyMessage *srm)
  169. {
  170. /* We check later... */
  171. return GNUNET_OK;
  172. }
  173. /**
  174. * Task called when it is time to reset an cadet.
  175. *
  176. * @param cls the `struct CadetHandle` to tear down
  177. */
  178. static void
  179. reset_cadet_task (void *cls);
  180. /**
  181. * We had a serious error, tear down and re-create cadet from scratch,
  182. * but do so asynchronously.
  183. *
  184. * @param mh cadet to reset
  185. */
  186. static void
  187. reset_cadet_async (struct CadetHandle *mh)
  188. {
  189. if (NULL != mh->reset_task)
  190. GNUNET_SCHEDULER_cancel (mh->reset_task);
  191. mh->reset_task = GNUNET_SCHEDULER_add_now (&reset_cadet_task, mh);
  192. }
  193. /**
  194. * Closure for handle_reply().
  195. */
  196. struct HandleReplyClosure
  197. {
  198. /**
  199. * Reply payload.
  200. */
  201. const void *data;
  202. /**
  203. * Expiration time for the block.
  204. */
  205. struct GNUNET_TIME_Absolute expiration;
  206. /**
  207. * Number of bytes in @e data.
  208. */
  209. size_t data_size;
  210. /**
  211. * Type of the block.
  212. */
  213. enum GNUNET_BLOCK_Type type;
  214. /**
  215. * Did we have a matching query?
  216. */
  217. int found;
  218. };
  219. /**
  220. * Iterator called on each entry in a waiting map to
  221. * process a result.
  222. *
  223. * @param cls the `struct HandleReplyClosure`
  224. * @param key the key of the entry in the map (the query)
  225. * @param value the `struct GSF_CadetRequest` to handle result for
  226. * @return #GNUNET_YES (continue to iterate)
  227. */
  228. static int
  229. process_reply (void *cls, const struct GNUNET_HashCode *key, void *value)
  230. {
  231. struct HandleReplyClosure *hrc = cls;
  232. struct GSF_CadetRequest *sr = value;
  233. sr->proc (sr->proc_cls,
  234. hrc->type,
  235. hrc->expiration,
  236. hrc->data_size,
  237. hrc->data);
  238. sr->proc = NULL;
  239. GSF_cadet_query_cancel (sr);
  240. hrc->found = GNUNET_YES;
  241. return GNUNET_YES;
  242. }
  243. /**
  244. * Iterator called on each entry in a waiting map to
  245. * call the 'proc' continuation and release associated
  246. * resources.
  247. *
  248. * @param cls the `struct CadetHandle`
  249. * @param key the key of the entry in the map (the query)
  250. * @param value the `struct GSF_CadetRequest` to clean up
  251. * @return #GNUNET_YES (continue to iterate)
  252. */
  253. static int
  254. free_waiting_entry (void *cls, const struct GNUNET_HashCode *key, void *value)
  255. {
  256. struct GSF_CadetRequest *sr = value;
  257. GSF_cadet_query_cancel (sr);
  258. return GNUNET_YES;
  259. }
  260. /**
  261. * Functions with this signature are called whenever a complete reply
  262. * is received.
  263. *
  264. * @param cls closure with the `struct CadetHandle`
  265. * @param srm the actual message
  266. */
  267. static void
  268. handle_reply (void *cls, const struct CadetReplyMessage *srm)
  269. {
  270. struct CadetHandle *mh = cls;
  271. struct HandleReplyClosure hrc;
  272. uint16_t msize;
  273. enum GNUNET_BLOCK_Type type;
  274. struct GNUNET_HashCode query;
  275. msize = ntohs (srm->header.size) - sizeof(struct CadetReplyMessage);
  276. type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
  277. if (GNUNET_YES !=
  278. GNUNET_BLOCK_get_key (GSF_block_ctx, type, &srm[1], msize, &query))
  279. {
  280. GNUNET_break_op (0);
  281. GNUNET_log (
  282. GNUNET_ERROR_TYPE_WARNING,
  283. "Received bogus reply of type %u with %u bytes via cadet from peer %s\n",
  284. type,
  285. msize,
  286. GNUNET_i2s (&mh->target));
  287. reset_cadet_async (mh);
  288. return;
  289. }
  290. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  291. "Received reply `%s' via cadet from peer %s\n",
  292. GNUNET_h2s (&query),
  293. GNUNET_i2s (&mh->target));
  294. GNUNET_CADET_receive_done (mh->channel);
  295. GNUNET_STATISTICS_update (GSF_stats,
  296. gettext_noop ("# replies received via cadet"),
  297. 1,
  298. GNUNET_NO);
  299. hrc.data = &srm[1];
  300. hrc.data_size = msize;
  301. hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
  302. hrc.type = type;
  303. hrc.found = GNUNET_NO;
  304. GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map,
  305. &query,
  306. &process_reply,
  307. &hrc);
  308. if (GNUNET_NO == hrc.found)
  309. {
  310. GNUNET_STATISTICS_update (GSF_stats,
  311. gettext_noop (
  312. "# replies received via cadet dropped"),
  313. 1,
  314. GNUNET_NO);
  315. }
  316. }
  317. /**
  318. * Function called by cadet when a client disconnects.
  319. * Cleans up our `struct CadetClient` of that channel.
  320. *
  321. * @param cls our `struct CadetClient`
  322. * @param channel channel of the disconnecting client
  323. */
  324. static void
  325. disconnect_cb (void *cls, const struct GNUNET_CADET_Channel *channel)
  326. {
  327. struct CadetHandle *mh = cls;
  328. struct GSF_CadetRequest *sr;
  329. if (NULL == mh->channel)
  330. return; /* being destroyed elsewhere */
  331. GNUNET_assert (channel == mh->channel);
  332. mh->channel = NULL;
  333. while (NULL != (sr = mh->pending_head))
  334. GSF_cadet_query_cancel (sr);
  335. /* first remove `mh` from the `cadet_map`, so that if the
  336. callback from `free_waiting_entry()` happens to re-issue
  337. the request, we don't immediately have it back in the
  338. `waiting_map`. */
  339. GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multipeermap_remove (cadet_map,
  340. &mh->target,
  341. mh));
  342. GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
  343. &free_waiting_entry,
  344. mh);
  345. if (NULL != mh->timeout_task)
  346. GNUNET_SCHEDULER_cancel (mh->timeout_task);
  347. if (NULL != mh->reset_task)
  348. GNUNET_SCHEDULER_cancel (mh->reset_task);
  349. GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map));
  350. GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map);
  351. GNUNET_free (mh);
  352. }
  353. /**
  354. * Function called whenever an MQ-channel's transmission window size changes.
  355. *
  356. * The first callback in an outgoing channel will be with a non-zero value
  357. * and will mean the channel is connected to the destination.
  358. *
  359. * For an incoming channel it will be called immediately after the
  360. * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
  361. *
  362. * @param cls Channel closure.
  363. * @param channel Connection to the other end (henceforth invalid).
  364. * @param window_size New window size. If the is more messages than buffer size
  365. * this value will be negative..
  366. */
  367. static void
  368. window_change_cb (void *cls,
  369. const struct GNUNET_CADET_Channel *channel,
  370. int window_size)
  371. {
  372. /* FIXME: for flow control, implement? */
  373. #if 0
  374. /* Something like this instead of the GNUNET_MQ_notify_sent() in
  375. transmit_pending() might be good (once the window change CB works...) */
  376. if (0 < window_size) /* test needed? */
  377. transmit_pending (mh);
  378. #endif
  379. }
  380. /**
  381. * We had a serious error, tear down and re-create cadet from scratch.
  382. *
  383. * @param mh cadet to reset
  384. */
  385. static void
  386. reset_cadet (struct CadetHandle *mh)
  387. {
  388. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  389. "Resetting cadet channel to %s\n",
  390. GNUNET_i2s (&mh->target));
  391. if (NULL != mh->channel)
  392. {
  393. GNUNET_CADET_channel_destroy (mh->channel);
  394. mh->channel = NULL;
  395. }
  396. GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map, &move_to_pending, mh);
  397. {
  398. struct GNUNET_MQ_MessageHandler handlers[] =
  399. { GNUNET_MQ_hd_var_size (reply,
  400. GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
  401. struct CadetReplyMessage,
  402. mh),
  403. GNUNET_MQ_handler_end () };
  404. struct GNUNET_HashCode port;
  405. GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
  406. strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
  407. &port);
  408. mh->channel = GNUNET_CADET_channel_create (cadet_handle,
  409. mh,
  410. &mh->target,
  411. &port,
  412. &window_change_cb,
  413. &disconnect_cb,
  414. handlers);
  415. }
  416. transmit_pending (mh);
  417. }
  418. /**
  419. * Task called when it is time to destroy an inactive cadet channel.
  420. *
  421. * @param cls the `struct CadetHandle` to tear down
  422. */
  423. static void
  424. cadet_timeout (void *cls)
  425. {
  426. struct CadetHandle *mh = cls;
  427. struct GNUNET_CADET_Channel *tun;
  428. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  429. "Timeout on cadet channel to %s\n",
  430. GNUNET_i2s (&mh->target));
  431. mh->timeout_task = NULL;
  432. tun = mh->channel;
  433. mh->channel = NULL;
  434. if (NULL != tun)
  435. GNUNET_CADET_channel_destroy (tun);
  436. }
  437. /**
  438. * Task called when it is time to reset an cadet.
  439. *
  440. * @param cls the `struct CadetHandle` to tear down
  441. */
  442. static void
  443. reset_cadet_task (void *cls)
  444. {
  445. struct CadetHandle *mh = cls;
  446. mh->reset_task = NULL;
  447. reset_cadet (mh);
  448. }
  449. /**
  450. * Transmit pending requests via the cadet.
  451. *
  452. * @param cls `struct CadetHandle` to process
  453. */
  454. static void
  455. transmit_pending (void *cls)
  456. {
  457. struct CadetHandle *mh = cls;
  458. struct GNUNET_MQ_Handle *mq = GNUNET_CADET_get_mq (mh->channel);
  459. struct GSF_CadetRequest *sr;
  460. struct GNUNET_MQ_Envelope *env;
  461. struct CadetQueryMessage *sqm;
  462. if ((0 != GNUNET_MQ_get_length (mq)) || (NULL == (sr = mh->pending_head)))
  463. return;
  464. GNUNET_CONTAINER_DLL_remove (mh->pending_head, mh->pending_tail, sr);
  465. GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (
  466. mh->waiting_map,
  467. &sr->query,
  468. sr,
  469. GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
  470. sr->was_transmitted = GNUNET_YES;
  471. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  472. "Sending query for %s via cadet to %s\n",
  473. GNUNET_h2s (&sr->query),
  474. GNUNET_i2s (&mh->target));
  475. env = GNUNET_MQ_msg (sqm, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY);
  476. GNUNET_MQ_env_set_options (env,
  477. GNUNET_MQ_PREF_GOODPUT
  478. | GNUNET_MQ_PREF_CORK_ALLOWED
  479. | GNUNET_MQ_PREF_OUT_OF_ORDER);
  480. sqm->type = htonl (sr->type);
  481. sqm->query = sr->query;
  482. GNUNET_MQ_notify_sent (env, &transmit_pending, mh);
  483. GNUNET_MQ_send (mq, env);
  484. }
  485. /**
  486. * Get (or create) a cadet to talk to the given peer.
  487. *
  488. * @param target peer we want to communicate with
  489. */
  490. static struct CadetHandle *
  491. get_cadet (const struct GNUNET_PeerIdentity *target)
  492. {
  493. struct CadetHandle *mh;
  494. mh = GNUNET_CONTAINER_multipeermap_get (cadet_map, target);
  495. if (NULL != mh)
  496. {
  497. if (NULL != mh->timeout_task)
  498. {
  499. GNUNET_SCHEDULER_cancel (mh->timeout_task);
  500. mh->timeout_task = NULL;
  501. }
  502. return mh;
  503. }
  504. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  505. "Creating cadet channel to %s\n",
  506. GNUNET_i2s (target));
  507. mh = GNUNET_new (struct CadetHandle);
  508. mh->reset_task =
  509. GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT, &reset_cadet_task, mh);
  510. mh->waiting_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
  511. mh->target = *target;
  512. GNUNET_assert (GNUNET_OK ==
  513. GNUNET_CONTAINER_multipeermap_put (
  514. cadet_map,
  515. &mh->target,
  516. mh,
  517. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
  518. {
  519. struct GNUNET_MQ_MessageHandler handlers[] =
  520. { GNUNET_MQ_hd_var_size (reply,
  521. GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
  522. struct CadetReplyMessage,
  523. mh),
  524. GNUNET_MQ_handler_end () };
  525. struct GNUNET_HashCode port;
  526. GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
  527. strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
  528. &port);
  529. mh->channel = GNUNET_CADET_channel_create (cadet_handle,
  530. mh,
  531. &mh->target,
  532. &port,
  533. &window_change_cb,
  534. &disconnect_cb,
  535. handlers);
  536. }
  537. return mh;
  538. }
  539. /**
  540. * Look for a block by directly contacting a particular peer.
  541. *
  542. * @param target peer that should have the block
  543. * @param query hash to query for the block
  544. * @param type desired type for the block
  545. * @param proc function to call with result
  546. * @param proc_cls closure for @a proc
  547. * @return handle to cancel the operation
  548. */
  549. struct GSF_CadetRequest *
  550. GSF_cadet_query (const struct GNUNET_PeerIdentity *target,
  551. const struct GNUNET_HashCode *query,
  552. enum GNUNET_BLOCK_Type type,
  553. GSF_CadetReplyProcessor proc,
  554. void *proc_cls)
  555. {
  556. struct CadetHandle *mh;
  557. struct GSF_CadetRequest *sr;
  558. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  559. "Preparing to send query for %s via cadet to %s\n",
  560. GNUNET_h2s (query),
  561. GNUNET_i2s (target));
  562. mh = get_cadet (target);
  563. sr = GNUNET_new (struct GSF_CadetRequest);
  564. sr->mh = mh;
  565. sr->proc = proc;
  566. sr->proc_cls = proc_cls;
  567. sr->type = type;
  568. sr->query = *query;
  569. GNUNET_CONTAINER_DLL_insert (mh->pending_head, mh->pending_tail, sr);
  570. transmit_pending (mh);
  571. return sr;
  572. }
  573. /**
  574. * Cancel an active request; must not be called after 'proc'
  575. * was calld.
  576. *
  577. * @param sr request to cancel
  578. */
  579. void
  580. GSF_cadet_query_cancel (struct GSF_CadetRequest *sr)
  581. {
  582. struct CadetHandle *mh = sr->mh;
  583. GSF_CadetReplyProcessor p;
  584. p = sr->proc;
  585. sr->proc = NULL;
  586. if (NULL != p)
  587. {
  588. /* signal failure / cancellation to callback */
  589. p (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY, GNUNET_TIME_UNIT_ZERO_ABS, 0, NULL);
  590. }
  591. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  592. "Cancelled query for %s via cadet to %s\n",
  593. GNUNET_h2s (&sr->query),
  594. GNUNET_i2s (&sr->mh->target));
  595. if (GNUNET_YES == sr->was_transmitted)
  596. GNUNET_assert (
  597. GNUNET_OK ==
  598. GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map, &sr->query, sr));
  599. else
  600. GNUNET_CONTAINER_DLL_remove (mh->pending_head, mh->pending_tail, sr);
  601. GNUNET_free (sr);
  602. if ((0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)) &&
  603. (NULL == mh->pending_head))
  604. mh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
  605. &cadet_timeout,
  606. mh);
  607. }
  608. /**
  609. * Function called on each active cadets to shut them down.
  610. *
  611. * @param cls NULL
  612. * @param key target peer, unused
  613. * @param value the `struct CadetHandle` to destroy
  614. * @return #GNUNET_YES (continue to iterate)
  615. */
  616. int
  617. GSF_cadet_release_clients (void *cls,
  618. const struct GNUNET_PeerIdentity *key,
  619. void *value)
  620. {
  621. struct CadetHandle *mh = value;
  622. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  623. "Timeout on cadet channel to %s\n",
  624. GNUNET_i2s (&mh->target));
  625. if (NULL != mh->channel)
  626. {
  627. struct GNUNET_CADET_Channel *channel = mh->channel;
  628. mh->channel = NULL;
  629. GNUNET_CADET_channel_destroy (channel);
  630. }
  631. if (NULL != mh->reset_task)
  632. {
  633. GNUNET_SCHEDULER_cancel (mh->reset_task);
  634. mh->reset_task = NULL;
  635. }
  636. return GNUNET_YES;
  637. }
  638. /* end of gnunet-service-fs_cadet_client.c */