gnunet-service-fs_pe.c 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817
  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2011 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file fs/gnunet-service-fs_pe.c
  18. * @brief API to manage query plan
  19. * @author Christian Grothoff
  20. */
  21. #include "platform.h"
  22. #include "gnunet-service-fs.h"
  23. #include "gnunet-service-fs_cp.h"
  24. #include "gnunet-service-fs_pe.h"
  25. #include "gnunet-service-fs_pr.h"
  26. /**
  27. * Collect an instane number of statistics? May cause excessive IPC.
  28. */
  29. #define INSANE_STATISTICS GNUNET_NO
  30. /**
  31. * List of GSF_PendingRequests this request plan
  32. * participates with.
  33. */
  34. struct PendingRequestList;
  35. /**
  36. * Transmission plan for a peer.
  37. */
  38. struct PeerPlan;
  39. /**
  40. * M:N binding of plans to pending requests.
  41. * Each pending request can be in a number of plans,
  42. * and each plan can have a number of pending requests.
  43. * Objects of this type indicate a mapping of a plan to
  44. * a particular pending request.
  45. *
  46. * The corresponding head and tail of the "PE" MDLL
  47. * are stored in a `struct GSF_RequestPlan`. (We need
  48. * to be able to lookup all pending requests corresponding
  49. * to a given plan entry.)
  50. *
  51. * Similarly head and tail of the "PR" MDLL are stored
  52. * with the `struct GSF_PendingRequest`. (We need
  53. * to be able to lookup all plan entries corresponding
  54. * to a given pending request.)
  55. */
  56. struct GSF_PendingRequestPlanBijection
  57. {
  58. /**
  59. * This is a doubly-linked list.
  60. */
  61. struct GSF_PendingRequestPlanBijection *next_PR;
  62. /**
  63. * This is a doubly-linked list.
  64. */
  65. struct GSF_PendingRequestPlanBijection *prev_PR;
  66. /**
  67. * This is a doubly-linked list.
  68. */
  69. struct GSF_PendingRequestPlanBijection *next_PE;
  70. /**
  71. * This is a doubly-linked list.
  72. */
  73. struct GSF_PendingRequestPlanBijection *prev_PE;
  74. /**
  75. * Associated request plan (tells us one of the peers that
  76. * we plan to forward the request to).
  77. */
  78. struct GSF_RequestPlan *rp;
  79. /**
  80. * Associated pending request (identifies request details
  81. * and one of the origins of the request).
  82. */
  83. struct GSF_PendingRequest *pr;
  84. };
  85. /**
  86. * Information we keep per request per peer. This is a doubly-linked
  87. * list (with head and tail in the `struct GSF_PendingRequestData`)
  88. * with one entry in each heap of each `struct PeerPlan`. Each
  89. * entry tracks information relevant for this request and this peer.
  90. */
  91. struct GSF_RequestPlan
  92. {
  93. /**
  94. * This is a doubly-linked list.
  95. */
  96. struct GSF_RequestPlan *next;
  97. /**
  98. * This is a doubly-linked list.
  99. */
  100. struct GSF_RequestPlan *prev;
  101. /**
  102. * Heap node associated with this request and this peer.
  103. */
  104. struct GNUNET_CONTAINER_HeapNode *hn;
  105. /**
  106. * The transmission plan for a peer that this request is associated with.
  107. */
  108. struct PeerPlan *pp;
  109. /**
  110. * Head of list of associated pending requests. This tells us
  111. * which incoming requests from other peers this plan entry
  112. * corresponds to.
  113. */
  114. struct GSF_PendingRequestPlanBijection *pe_head;
  115. /**
  116. * Tail of list of associated pending requests.
  117. */
  118. struct GSF_PendingRequestPlanBijection *pe_tail;
  119. /**
  120. * Earliest time we'd be happy to (re)transmit this request.
  121. */
  122. struct GNUNET_TIME_Absolute earliest_transmission;
  123. /**
  124. * When was the last time we transmitted this request to this peer? 0 for never.
  125. */
  126. struct GNUNET_TIME_Absolute last_transmission;
  127. /**
  128. * Current priority for this request for this target.
  129. */
  130. uint64_t priority;
  131. /**
  132. * How often did we transmit this request to this peer?
  133. */
  134. unsigned int transmission_counter;
  135. };
  136. /**
  137. * Transmission plan for a peer.
  138. */
  139. struct PeerPlan
  140. {
  141. /**
  142. * Heap with pending queries (`struct GSF_RequestPlan`), higher weights mean higher priority.
  143. */
  144. struct GNUNET_CONTAINER_Heap *priority_heap;
  145. /**
  146. * Heap with pending queries (`struct GSF_RequestPlan`), by transmission time, lowest first.
  147. */
  148. struct GNUNET_CONTAINER_Heap *delay_heap;
  149. /**
  150. * Map of queries to plan entries. All entries in the @e priority_heap
  151. * or @e delay_heap should be in the @e plan_map. Note that it is
  152. * possible for the @e plan_map to have multiple entries for the same
  153. * query.
  154. */
  155. struct GNUNET_CONTAINER_MultiHashMap *plan_map;
  156. /**
  157. * Peer for which this is the plan.
  158. */
  159. struct GSF_ConnectedPeer *cp;
  160. /**
  161. * Current task for executing the plan.
  162. */
  163. struct GNUNET_SCHEDULER_Task *task;
  164. /**
  165. * Current message under transmission for the plan.
  166. */
  167. struct GNUNET_MQ_Envelope *env;
  168. };
  169. /**
  170. * Hash map from peer identities to PeerPlans.
  171. */
  172. static struct GNUNET_CONTAINER_MultiPeerMap *plans;
  173. /**
  174. * Sum of all transmission counters (equals total delay for all plan entries).
  175. */
  176. static unsigned long long total_delay;
  177. /**
  178. * Number of plan entries.
  179. */
  180. static unsigned long long plan_count;
  181. /**
  182. * Return the query (key in the plan_map) for the given request plan.
  183. * Note that this key may change as there can be multiple pending
  184. * requests for the same key and we just return _one_ of them; this
  185. * particular one might complete while another one might still be
  186. * active, hence the lifetime of the returned hash code is NOT
  187. * necessarily identical to that of the `struct GSF_RequestPlan`
  188. * given.
  189. *
  190. * @param rp a request plan
  191. * @return the associated query
  192. */
  193. static const struct GNUNET_HashCode *
  194. get_rp_key (struct GSF_RequestPlan *rp)
  195. {
  196. return &GSF_pending_request_get_data_ (rp->pe_head->pr)->query;
  197. }
  198. /**
  199. * Insert the given request plan into the heap with the appropriate weight.
  200. *
  201. * @param pp associated peer's plan
  202. * @param rp request to plan
  203. */
  204. static void
  205. plan (struct PeerPlan *pp,
  206. struct GSF_RequestPlan *rp)
  207. {
  208. #define N ((double)128.0)
  209. /**
  210. * Running average delay we currently impose.
  211. */
  212. static double avg_delay;
  213. struct GSF_PendingRequestData *prd;
  214. struct GNUNET_TIME_Relative delay;
  215. GNUNET_assert (rp->pp == pp);
  216. GNUNET_STATISTICS_set (GSF_stats,
  217. gettext_noop ("# average retransmission delay (ms)"),
  218. total_delay * 1000LL / plan_count, GNUNET_NO);
  219. prd = GSF_pending_request_get_data_ (rp->pe_head->pr);
  220. if (rp->transmission_counter < 8)
  221. delay =
  222. GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
  223. rp->transmission_counter);
  224. else if (rp->transmission_counter < 32)
  225. delay =
  226. GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
  227. 8 +
  228. (1LL << (rp->transmission_counter - 8)));
  229. else
  230. delay =
  231. GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
  232. 8 + (1LL << 24));
  233. delay.rel_value_us =
  234. GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
  235. delay.rel_value_us + 1);
  236. /* Add 0.01 to avg_delay to avoid division-by-zero later */
  237. avg_delay = (((avg_delay * (N - 1.0)) + delay.rel_value_us) / N) + 0.01;
  238. /*
  239. * For the priority, we need to consider a few basic rules:
  240. * 1) if we just started requesting (delay is small), we should
  241. * virtually always have a priority of zero.
  242. * 2) for requests with average latency, our priority should match
  243. * the average priority observed on the network
  244. * 3) even the longest-running requests should not be WAY out of
  245. * the observed average (thus we bound by a factor of 2)
  246. * 4) we add +1 to the observed average priority to avoid everyone
  247. * staying put at zero (2 * 0 = 0...).
  248. *
  249. * Using the specific calculation below, we get:
  250. *
  251. * delay = 0 => priority = 0;
  252. * delay = avg delay => priority = running-average-observed-priority;
  253. * delay >> avg_delay => priority = 2 * running-average-observed-priority;
  254. *
  255. * which satisfies all of the rules above.
  256. *
  257. * Note: M_PI_4 = PI/4 = arctan(1)
  258. */
  259. rp->priority =
  260. round ((GSF_current_priorities +
  261. 1.0) * atan (delay.rel_value_us / avg_delay)) / M_PI_4;
  262. /* Note: usage of 'round' and 'atan' requires -lm */
  263. if (rp->transmission_counter != 0)
  264. delay.rel_value_us += TTL_DECREMENT * 1000;
  265. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  266. "Considering (re)transmission number %u in %s\n",
  267. (unsigned int) rp->transmission_counter,
  268. GNUNET_STRINGS_relative_time_to_string (delay,
  269. GNUNET_YES));
  270. rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay);
  271. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  272. "Earliest (re)transmission for `%s' in %us\n",
  273. GNUNET_h2s (&prd->query),
  274. rp->transmission_counter);
  275. GNUNET_assert (rp->hn == NULL);
  276. if (0 == GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value_us)
  277. rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
  278. rp,
  279. rp->priority);
  280. else
  281. rp->hn =
  282. GNUNET_CONTAINER_heap_insert (pp->delay_heap,
  283. rp,
  284. rp->earliest_transmission.abs_value_us);
  285. GNUNET_assert (GNUNET_YES ==
  286. GNUNET_CONTAINER_multihashmap_contains_value (pp->plan_map,
  287. get_rp_key (rp),
  288. rp));
  289. #undef N
  290. }
  291. /**
  292. * Get the pending request with the highest TTL from the given plan.
  293. *
  294. * @param rp plan to investigate
  295. * @return pending request with highest TTL
  296. */
  297. struct GSF_PendingRequest *
  298. get_latest (const struct GSF_RequestPlan *rp)
  299. {
  300. struct GSF_PendingRequest *ret;
  301. struct GSF_PendingRequestPlanBijection *bi;
  302. const struct GSF_PendingRequestData *rprd;
  303. const struct GSF_PendingRequestData *prd;
  304. bi = rp->pe_head;
  305. if (NULL == bi)
  306. return NULL; /* should never happen */
  307. ret = bi->pr;
  308. rprd = GSF_pending_request_get_data_ (ret);
  309. for (bi = bi->next_PE; NULL != bi; bi = bi->next_PE)
  310. {
  311. GNUNET_break (GNUNET_YES ==
  312. GSF_pending_request_test_active_ (bi->pr));
  313. prd = GSF_pending_request_get_data_ (bi->pr);
  314. if (prd->ttl.abs_value_us > rprd->ttl.abs_value_us)
  315. {
  316. ret = bi->pr;
  317. rprd = prd;
  318. }
  319. }
  320. return ret;
  321. }
  322. /**
  323. * Figure out when and how to transmit to the given peer.
  324. *
  325. * @param cls the `struct PeerPlan`
  326. */
  327. static void
  328. schedule_peer_transmission (void *cls)
  329. {
  330. struct PeerPlan *pp = cls;
  331. struct GSF_RequestPlan *rp;
  332. struct GNUNET_TIME_Relative delay;
  333. if (NULL != pp->task)
  334. {
  335. pp->task = NULL;
  336. }
  337. else
  338. {
  339. GNUNET_assert (NULL != pp->env);
  340. pp->env = NULL;
  341. }
  342. /* move ready requests to priority queue */
  343. while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
  344. (0 == GNUNET_TIME_absolute_get_remaining
  345. (rp->earliest_transmission).rel_value_us))
  346. {
  347. GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->delay_heap));
  348. rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
  349. rp,
  350. rp->priority);
  351. }
  352. if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
  353. {
  354. /* priority heap (still) empty, check for delay... */
  355. rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
  356. if (NULL == rp)
  357. {
  358. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  359. "No active requests for plan %p.\n",
  360. pp);
  361. return; /* both queues empty */
  362. }
  363. delay = GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission);
  364. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  365. "Sleeping for %s before retrying requests on plan %p.\n",
  366. GNUNET_STRINGS_relative_time_to_string (delay,
  367. GNUNET_YES),
  368. pp);
  369. GNUNET_STATISTICS_set (GSF_stats,
  370. gettext_noop ("# delay heap timeout (ms)"),
  371. delay.rel_value_us / 1000LL, GNUNET_NO);
  372. pp->task
  373. = GNUNET_SCHEDULER_add_at (rp->earliest_transmission,
  374. &schedule_peer_transmission,
  375. pp);
  376. return;
  377. }
  378. #if INSANE_STATISTICS
  379. GNUNET_STATISTICS_update (GSF_stats,
  380. gettext_noop ("# query plans executed"),
  381. 1,
  382. GNUNET_NO);
  383. #endif
  384. /* process from priority heap */
  385. rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap);
  386. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  387. "Executing query plan %p\n",
  388. rp);
  389. GNUNET_assert (NULL != rp);
  390. rp->hn = NULL;
  391. rp->last_transmission = GNUNET_TIME_absolute_get ();
  392. rp->transmission_counter++;
  393. total_delay++;
  394. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  395. "Executing plan %p executed %u times, planning retransmission\n",
  396. rp,
  397. rp->transmission_counter);
  398. GNUNET_assert (NULL == pp->env);
  399. pp->env = GSF_pending_request_get_message_ (get_latest (rp));
  400. GNUNET_MQ_notify_sent (pp->env,
  401. &schedule_peer_transmission,
  402. pp);
  403. GSF_peer_transmit_ (pp->cp,
  404. GNUNET_YES,
  405. rp->priority,
  406. pp->env);
  407. GNUNET_STATISTICS_update (GSF_stats,
  408. gettext_noop ("# query messages sent to other peers"),
  409. 1,
  410. GNUNET_NO);
  411. plan (pp,
  412. rp);
  413. }
  414. /**
  415. * Closure for merge_pr().
  416. */
  417. struct MergeContext
  418. {
  419. /**
  420. * Request we are trying to merge.
  421. */
  422. struct GSF_PendingRequest *pr;
  423. /**
  424. * Set to #GNUNET_YES if we succeeded to merge.
  425. */
  426. int merged;
  427. };
  428. /**
  429. * Iterator that checks if an equivalent request is already
  430. * present for this peer.
  431. *
  432. * @param cls closure
  433. * @param query the query
  434. * @param element request plan stored at the node
  435. * @return #GNUNET_YES if we should continue to iterate,
  436. * #GNUNET_NO if not (merge success)
  437. */
  438. static int
  439. merge_pr (void *cls,
  440. const struct GNUNET_HashCode *query,
  441. void *element)
  442. {
  443. struct MergeContext *mpr = cls;
  444. struct GSF_RequestPlan *rp = element;
  445. struct GSF_PendingRequestData *prd;
  446. struct GSF_PendingRequestPlanBijection *bi;
  447. struct GSF_PendingRequest *latest;
  448. GNUNET_break (GNUNET_YES ==
  449. GSF_pending_request_test_active_ (mpr->pr));
  450. if (GNUNET_OK !=
  451. GSF_pending_request_is_compatible_ (mpr->pr,
  452. rp->pe_head->pr))
  453. return GNUNET_YES;
  454. /* merge new request with existing request plan */
  455. bi = GNUNET_new (struct GSF_PendingRequestPlanBijection);
  456. bi->rp = rp;
  457. bi->pr = mpr->pr;
  458. prd = GSF_pending_request_get_data_ (mpr->pr);
  459. GNUNET_CONTAINER_MDLL_insert (PR,
  460. prd->pr_head,
  461. prd->pr_tail,
  462. bi);
  463. GNUNET_CONTAINER_MDLL_insert (PE,
  464. rp->pe_head,
  465. rp->pe_tail,
  466. bi);
  467. mpr->merged = GNUNET_YES;
  468. #if INSANE_STATISTICS
  469. GNUNET_STATISTICS_update (GSF_stats,
  470. gettext_noop ("# requests merged"),
  471. 1,
  472. GNUNET_NO);
  473. #endif
  474. latest = get_latest (rp);
  475. if (GSF_pending_request_get_data_ (latest)->ttl.abs_value_us <
  476. prd->ttl.abs_value_us)
  477. {
  478. #if INSANE_STATISTICS
  479. GNUNET_STATISTICS_update (GSF_stats,
  480. gettext_noop ("# requests refreshed"),
  481. 1,
  482. GNUNET_NO);
  483. #endif
  484. rp->transmission_counter = 0; /* reset */
  485. }
  486. return GNUNET_NO;
  487. }
  488. /**
  489. * Create a new query plan entry.
  490. *
  491. * @param cp peer with the entry
  492. * @param pr request with the entry
  493. */
  494. void
  495. GSF_plan_add_ (struct GSF_ConnectedPeer *cp,
  496. struct GSF_PendingRequest *pr)
  497. {
  498. const struct GNUNET_PeerIdentity *id;
  499. struct PeerPlan *pp;
  500. struct GSF_PendingRequestData *prd;
  501. struct GSF_RequestPlan *rp;
  502. struct GSF_PendingRequestPlanBijection *bi;
  503. struct MergeContext mpc;
  504. GNUNET_assert (GNUNET_YES ==
  505. GSF_pending_request_test_active_ (pr));
  506. GNUNET_assert (NULL != cp);
  507. id = GSF_connected_peer_get_identity2_ (cp);
  508. pp = GNUNET_CONTAINER_multipeermap_get (plans, id);
  509. if (NULL == pp)
  510. {
  511. pp = GNUNET_new (struct PeerPlan);
  512. pp->plan_map = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_NO);
  513. pp->priority_heap =
  514. GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
  515. pp->delay_heap =
  516. GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
  517. pp->cp = cp;
  518. GNUNET_assert (GNUNET_OK ==
  519. GNUNET_CONTAINER_multipeermap_put (plans,
  520. id,
  521. pp,
  522. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
  523. pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission,
  524. pp);
  525. }
  526. mpc.merged = GNUNET_NO;
  527. mpc.pr = pr;
  528. prd = GSF_pending_request_get_data_ (pr);
  529. GNUNET_CONTAINER_multihashmap_get_multiple (pp->plan_map,
  530. &prd->query,
  531. &merge_pr,
  532. &mpc);
  533. if (GNUNET_NO != mpc.merged)
  534. return;
  535. plan_count++;
  536. GNUNET_STATISTICS_update (GSF_stats,
  537. gettext_noop ("# query plan entries"),
  538. 1,
  539. GNUNET_NO);
  540. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  541. "Planning transmission of query `%s' to peer `%s'\n",
  542. GNUNET_h2s (&prd->query),
  543. GNUNET_i2s (id));
  544. rp = GNUNET_new (struct GSF_RequestPlan);
  545. bi = GNUNET_new (struct GSF_PendingRequestPlanBijection);
  546. bi->rp = rp;
  547. bi->pr = pr;
  548. GNUNET_CONTAINER_MDLL_insert (PR,
  549. prd->pr_head,
  550. prd->pr_tail,
  551. bi);
  552. GNUNET_CONTAINER_MDLL_insert (PE,
  553. rp->pe_head,
  554. rp->pe_tail,
  555. bi);
  556. rp->pp = pp;
  557. GNUNET_assert (GNUNET_YES ==
  558. GNUNET_CONTAINER_multihashmap_put (pp->plan_map,
  559. get_rp_key (rp),
  560. rp,
  561. GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
  562. plan (pp,
  563. rp);
  564. }
  565. /**
  566. * Notify the plan about a peer being no longer available;
  567. * destroy all entries associated with this peer.
  568. *
  569. * @param cp connected peer
  570. */
  571. void
  572. GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
  573. {
  574. const struct GNUNET_PeerIdentity *id;
  575. struct PeerPlan *pp;
  576. struct GSF_RequestPlan *rp;
  577. struct GSF_PendingRequestData *prd;
  578. struct GSF_PendingRequestPlanBijection *bi;
  579. id = GSF_connected_peer_get_identity2_ (cp);
  580. pp = GNUNET_CONTAINER_multipeermap_get (plans, id);
  581. if (NULL == pp)
  582. return; /* nothing was ever planned for this peer */
  583. GNUNET_assert (GNUNET_YES ==
  584. GNUNET_CONTAINER_multipeermap_remove (plans, id,
  585. pp));
  586. if (NULL != pp->task)
  587. {
  588. GNUNET_SCHEDULER_cancel (pp->task);
  589. pp->task = NULL;
  590. }
  591. while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
  592. {
  593. GNUNET_break (GNUNET_YES ==
  594. GNUNET_CONTAINER_multihashmap_remove (pp->plan_map,
  595. get_rp_key (rp),
  596. rp));
  597. while (NULL != (bi = rp->pe_head))
  598. {
  599. GNUNET_CONTAINER_MDLL_remove (PE,
  600. rp->pe_head,
  601. rp->pe_tail,
  602. bi);
  603. prd = GSF_pending_request_get_data_ (bi->pr);
  604. GNUNET_CONTAINER_MDLL_remove (PR,
  605. prd->pr_head,
  606. prd->pr_tail,
  607. bi);
  608. GNUNET_free (bi);
  609. }
  610. plan_count--;
  611. GNUNET_free (rp);
  612. }
  613. GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
  614. while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
  615. {
  616. GNUNET_break (GNUNET_YES ==
  617. GNUNET_CONTAINER_multihashmap_remove (pp->plan_map,
  618. get_rp_key (rp),
  619. rp));
  620. while (NULL != (bi = rp->pe_head))
  621. {
  622. prd = GSF_pending_request_get_data_ (bi->pr);
  623. GNUNET_CONTAINER_MDLL_remove (PE,
  624. rp->pe_head,
  625. rp->pe_tail,
  626. bi);
  627. GNUNET_CONTAINER_MDLL_remove (PR,
  628. prd->pr_head,
  629. prd->pr_tail,
  630. bi);
  631. GNUNET_free (bi);
  632. }
  633. plan_count--;
  634. GNUNET_free (rp);
  635. }
  636. GNUNET_STATISTICS_set (GSF_stats,
  637. gettext_noop ("# query plan entries"),
  638. plan_count,
  639. GNUNET_NO);
  640. GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
  641. GNUNET_CONTAINER_multihashmap_destroy (pp->plan_map);
  642. GNUNET_free (pp);
  643. }
  644. /**
  645. * Get the last transmission attempt time for the request plan list
  646. * referenced by @a pr_head, that was sent to @a sender
  647. *
  648. * @param pr_head request plan reference list to check.
  649. * @param sender the peer that we've sent the request to.
  650. * @param result the timestamp to fill, set to #GNUNET_TIME_UNIT_FOREVER_ABS if never transmitted
  651. * @return #GNUNET_YES if @a result was changed, #GNUNET_NO otherwise.
  652. */
  653. int
  654. GSF_request_plan_reference_get_last_transmission_ (struct GSF_PendingRequestPlanBijection *pr_head,
  655. struct GSF_ConnectedPeer *sender,
  656. struct GNUNET_TIME_Absolute *result)
  657. {
  658. struct GSF_PendingRequestPlanBijection *bi;
  659. for (bi = pr_head; NULL != bi; bi = bi->next_PR)
  660. {
  661. if (bi->rp->pp->cp == sender)
  662. {
  663. if (0 == bi->rp->last_transmission.abs_value_us)
  664. *result = GNUNET_TIME_UNIT_FOREVER_ABS;
  665. else
  666. *result = bi->rp->last_transmission;
  667. return GNUNET_YES;
  668. }
  669. }
  670. return GNUNET_NO;
  671. }
  672. /**
  673. * Notify the plan about a request being done; destroy all entries
  674. * associated with this request.
  675. *
  676. * @param pr request that is done
  677. */
  678. void
  679. GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr)
  680. {
  681. struct GSF_RequestPlan *rp;
  682. struct GSF_PendingRequestData *prd;
  683. struct GSF_PendingRequestPlanBijection *bi;
  684. prd = GSF_pending_request_get_data_ (pr);
  685. while (NULL != (bi = prd->pr_head))
  686. {
  687. rp = bi->rp;
  688. GNUNET_CONTAINER_MDLL_remove (PR,
  689. prd->pr_head,
  690. prd->pr_tail,
  691. bi);
  692. GNUNET_CONTAINER_MDLL_remove (PE,
  693. rp->pe_head,
  694. rp->pe_tail,
  695. bi);
  696. GNUNET_assert (bi->pr == pr);
  697. if (NULL == rp->pe_head)
  698. {
  699. GNUNET_CONTAINER_heap_remove_node (rp->hn);
  700. plan_count--;
  701. GNUNET_break (GNUNET_YES ==
  702. GNUNET_CONTAINER_multihashmap_remove (rp->pp->plan_map,
  703. &prd->query,
  704. rp));
  705. GNUNET_free (rp);
  706. }
  707. GNUNET_free (bi);
  708. }
  709. GNUNET_STATISTICS_set (GSF_stats,
  710. gettext_noop ("# query plan entries"),
  711. plan_count,
  712. GNUNET_NO);
  713. }
  714. /**
  715. * Initialize plan subsystem.
  716. */
  717. void
  718. GSF_plan_init ()
  719. {
  720. plans = GNUNET_CONTAINER_multipeermap_create (256,
  721. GNUNET_YES);
  722. }
  723. /**
  724. * Shutdown plan subsystem.
  725. */
  726. void
  727. GSF_plan_done ()
  728. {
  729. GNUNET_assert (0 == GNUNET_CONTAINER_multipeermap_size (plans));
  730. GNUNET_CONTAINER_multipeermap_destroy (plans);
  731. }
  732. /* end of gnunet-service-fs_pe.h */