gnunet-service-fs_pe.c 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816
  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2011 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file fs/gnunet-service-fs_pe.c
  18. * @brief API to manage query plan
  19. * @author Christian Grothoff
  20. */
  21. #include "platform.h"
  22. #include "gnunet-service-fs.h"
  23. #include "gnunet-service-fs_cp.h"
  24. #include "gnunet-service-fs_pe.h"
  25. #include "gnunet-service-fs_pr.h"
  26. /**
  27. * Collect an instane number of statistics? May cause excessive IPC.
  28. */
  29. #define INSANE_STATISTICS GNUNET_NO
  30. /**
  31. * List of GSF_PendingRequests this request plan
  32. * participates with.
  33. */
  34. struct PendingRequestList;
  35. /**
  36. * Transmission plan for a peer.
  37. */
  38. struct PeerPlan;
  39. /**
  40. * M:N binding of plans to pending requests.
  41. * Each pending request can be in a number of plans,
  42. * and each plan can have a number of pending requests.
  43. * Objects of this type indicate a mapping of a plan to
  44. * a particular pending request.
  45. *
  46. * The corresponding head and tail of the "PE" MDLL
  47. * are stored in a `struct GSF_RequestPlan`. (We need
  48. * to be able to lookup all pending requests corresponding
  49. * to a given plan entry.)
  50. *
  51. * Similarly head and tail of the "PR" MDLL are stored
  52. * with the `struct GSF_PendingRequest`. (We need
  53. * to be able to lookup all plan entries corresponding
  54. * to a given pending request.)
  55. */
  56. struct GSF_PendingRequestPlanBijection
  57. {
  58. /**
  59. * This is a doubly-linked list.
  60. */
  61. struct GSF_PendingRequestPlanBijection *next_PR;
  62. /**
  63. * This is a doubly-linked list.
  64. */
  65. struct GSF_PendingRequestPlanBijection *prev_PR;
  66. /**
  67. * This is a doubly-linked list.
  68. */
  69. struct GSF_PendingRequestPlanBijection *next_PE;
  70. /**
  71. * This is a doubly-linked list.
  72. */
  73. struct GSF_PendingRequestPlanBijection *prev_PE;
  74. /**
  75. * Associated request plan (tells us one of the peers that
  76. * we plan to forward the request to).
  77. */
  78. struct GSF_RequestPlan *rp;
  79. /**
  80. * Associated pending request (identifies request details
  81. * and one of the origins of the request).
  82. */
  83. struct GSF_PendingRequest *pr;
  84. };
  85. /**
  86. * Information we keep per request per peer. This is a doubly-linked
  87. * list (with head and tail in the `struct GSF_PendingRequestData`)
  88. * with one entry in each heap of each `struct PeerPlan`. Each
  89. * entry tracks information relevant for this request and this peer.
  90. */
  91. struct GSF_RequestPlan
  92. {
  93. /**
  94. * This is a doubly-linked list.
  95. */
  96. struct GSF_RequestPlan *next;
  97. /**
  98. * This is a doubly-linked list.
  99. */
  100. struct GSF_RequestPlan *prev;
  101. /**
  102. * Heap node associated with this request and this peer.
  103. */
  104. struct GNUNET_CONTAINER_HeapNode *hn;
  105. /**
  106. * The transmission plan for a peer that this request is associated with.
  107. */
  108. struct PeerPlan *pp;
  109. /**
  110. * Head of list of associated pending requests. This tells us
  111. * which incoming requests from other peers this plan entry
  112. * corresponds to.
  113. */
  114. struct GSF_PendingRequestPlanBijection *pe_head;
  115. /**
  116. * Tail of list of associated pending requests.
  117. */
  118. struct GSF_PendingRequestPlanBijection *pe_tail;
  119. /**
  120. * Earliest time we'd be happy to (re)transmit this request.
  121. */
  122. struct GNUNET_TIME_Absolute earliest_transmission;
  123. /**
  124. * When was the last time we transmitted this request to this peer? 0 for never.
  125. */
  126. struct GNUNET_TIME_Absolute last_transmission;
  127. /**
  128. * Current priority for this request for this target.
  129. */
  130. uint64_t priority;
  131. /**
  132. * How often did we transmit this request to this peer?
  133. */
  134. unsigned int transmission_counter;
  135. };
  136. /**
  137. * Transmission plan for a peer.
  138. */
  139. struct PeerPlan
  140. {
  141. /**
  142. * Heap with pending queries (`struct GSF_RequestPlan`), higher weights mean higher priority.
  143. */
  144. struct GNUNET_CONTAINER_Heap *priority_heap;
  145. /**
  146. * Heap with pending queries (`struct GSF_RequestPlan`), by transmission time, lowest first.
  147. */
  148. struct GNUNET_CONTAINER_Heap *delay_heap;
  149. /**
  150. * Map of queries to plan entries. All entries in the @e priority_heap
  151. * or @e delay_heap should be in the @e plan_map. Note that it is
  152. * possible for the @e plan_map to have multiple entries for the same
  153. * query.
  154. */
  155. struct GNUNET_CONTAINER_MultiHashMap *plan_map;
  156. /**
  157. * Peer for which this is the plan.
  158. */
  159. struct GSF_ConnectedPeer *cp;
  160. /**
  161. * Current task for executing the plan.
  162. */
  163. struct GNUNET_SCHEDULER_Task *task;
  164. /**
  165. * Current message under transmission for the plan.
  166. */
  167. struct GNUNET_MQ_Envelope *env;
  168. };
  169. /**
  170. * Hash map from peer identities to PeerPlans.
  171. */
  172. static struct GNUNET_CONTAINER_MultiPeerMap *plans;
  173. /**
  174. * Sum of all transmission counters (equals total delay for all plan entries).
  175. */
  176. static unsigned long long total_delay;
  177. /**
  178. * Number of plan entries.
  179. */
  180. static unsigned long long plan_count;
  181. /**
  182. * Return the query (key in the plan_map) for the given request plan.
  183. * Note that this key may change as there can be multiple pending
  184. * requests for the same key and we just return _one_ of them; this
  185. * particular one might complete while another one might still be
  186. * active, hence the lifetime of the returned hash code is NOT
  187. * necessarily identical to that of the `struct GSF_RequestPlan`
  188. * given.
  189. *
  190. * @param rp a request plan
  191. * @return the associated query
  192. */
  193. static const struct GNUNET_HashCode *
  194. get_rp_key (struct GSF_RequestPlan *rp)
  195. {
  196. return &GSF_pending_request_get_data_ (rp->pe_head->pr)->query;
  197. }
  198. /**
  199. * Insert the given request plan into the heap with the appropriate weight.
  200. *
  201. * @param pp associated peer's plan
  202. * @param rp request to plan
  203. */
  204. static void
  205. plan (struct PeerPlan *pp,
  206. struct GSF_RequestPlan *rp)
  207. {
  208. #define N ((double) 128.0)
  209. /**
  210. * Running average delay we currently impose.
  211. */
  212. static double avg_delay;
  213. struct GSF_PendingRequestData *prd;
  214. struct GNUNET_TIME_Relative delay;
  215. GNUNET_assert (rp->pp == pp);
  216. GNUNET_STATISTICS_set (GSF_stats,
  217. gettext_noop ("# average retransmission delay (ms)"),
  218. total_delay * 1000LL / plan_count, GNUNET_NO);
  219. prd = GSF_pending_request_get_data_ (rp->pe_head->pr);
  220. if (rp->transmission_counter < 8)
  221. delay =
  222. GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
  223. rp->transmission_counter);
  224. else if (rp->transmission_counter < 32)
  225. delay =
  226. GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
  227. 8
  228. + (1LL << (rp->transmission_counter - 8)));
  229. else
  230. delay =
  231. GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
  232. 8 + (1LL << 24));
  233. delay.rel_value_us =
  234. GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
  235. delay.rel_value_us + 1);
  236. /* Add 0.01 to avg_delay to avoid division-by-zero later */
  237. avg_delay = (((avg_delay * (N - 1.0)) + delay.rel_value_us) / N) + 0.01;
  238. /*
  239. * For the priority, we need to consider a few basic rules:
  240. * 1) if we just started requesting (delay is small), we should
  241. * virtually always have a priority of zero.
  242. * 2) for requests with average latency, our priority should match
  243. * the average priority observed on the network
  244. * 3) even the longest-running requests should not be WAY out of
  245. * the observed average (thus we bound by a factor of 2)
  246. * 4) we add +1 to the observed average priority to avoid everyone
  247. * staying put at zero (2 * 0 = 0...).
  248. *
  249. * Using the specific calculation below, we get:
  250. *
  251. * delay = 0 => priority = 0;
  252. * delay = avg delay => priority = running-average-observed-priority;
  253. * delay >> avg_delay => priority = 2 * running-average-observed-priority;
  254. *
  255. * which satisfies all of the rules above.
  256. *
  257. * Note: M_PI_4 = PI/4 = arctan(1)
  258. */
  259. rp->priority =
  260. round ((GSF_current_priorities
  261. + 1.0) * atan (delay.rel_value_us / avg_delay)) / M_PI_4;
  262. /* Note: usage of 'round' and 'atan' requires -lm */
  263. if (rp->transmission_counter != 0)
  264. delay.rel_value_us += TTL_DECREMENT * 1000;
  265. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  266. "Considering (re)transmission number %u in %s\n",
  267. (unsigned int) rp->transmission_counter,
  268. GNUNET_STRINGS_relative_time_to_string (delay,
  269. GNUNET_YES));
  270. rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay);
  271. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  272. "Earliest (re)transmission for `%s' in %us\n",
  273. GNUNET_h2s (&prd->query),
  274. rp->transmission_counter);
  275. GNUNET_assert (rp->hn == NULL);
  276. if (0 == GNUNET_TIME_absolute_get_remaining (
  277. rp->earliest_transmission).rel_value_us)
  278. rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
  279. rp,
  280. rp->priority);
  281. else
  282. rp->hn =
  283. GNUNET_CONTAINER_heap_insert (pp->delay_heap,
  284. rp,
  285. rp->earliest_transmission.abs_value_us);
  286. GNUNET_assert (GNUNET_YES ==
  287. GNUNET_CONTAINER_multihashmap_contains_value (pp->plan_map,
  288. get_rp_key (rp),
  289. rp));
  290. #undef N
  291. }
  292. /**
  293. * Get the pending request with the highest TTL from the given plan.
  294. *
  295. * @param rp plan to investigate
  296. * @return pending request with highest TTL
  297. */
  298. struct GSF_PendingRequest *
  299. get_latest (const struct GSF_RequestPlan *rp)
  300. {
  301. struct GSF_PendingRequest *ret;
  302. struct GSF_PendingRequestPlanBijection *bi;
  303. const struct GSF_PendingRequestData *rprd;
  304. const struct GSF_PendingRequestData *prd;
  305. bi = rp->pe_head;
  306. if (NULL == bi)
  307. return NULL; /* should never happen */
  308. ret = bi->pr;
  309. rprd = GSF_pending_request_get_data_ (ret);
  310. for (bi = bi->next_PE; NULL != bi; bi = bi->next_PE)
  311. {
  312. GNUNET_break (GNUNET_YES ==
  313. GSF_pending_request_test_active_ (bi->pr));
  314. prd = GSF_pending_request_get_data_ (bi->pr);
  315. if (prd->ttl.abs_value_us > rprd->ttl.abs_value_us)
  316. {
  317. ret = bi->pr;
  318. rprd = prd;
  319. }
  320. }
  321. return ret;
  322. }
  323. /**
  324. * Figure out when and how to transmit to the given peer.
  325. *
  326. * @param cls the `struct PeerPlan`
  327. */
  328. static void
  329. schedule_peer_transmission (void *cls)
  330. {
  331. struct PeerPlan *pp = cls;
  332. struct GSF_RequestPlan *rp;
  333. struct GNUNET_TIME_Relative delay;
  334. if (NULL != pp->task)
  335. {
  336. pp->task = NULL;
  337. }
  338. else
  339. {
  340. GNUNET_assert (NULL != pp->env);
  341. pp->env = NULL;
  342. }
  343. /* move ready requests to priority queue */
  344. while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
  345. (0 == GNUNET_TIME_absolute_get_remaining
  346. (rp->earliest_transmission).rel_value_us))
  347. {
  348. GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->delay_heap));
  349. rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
  350. rp,
  351. rp->priority);
  352. }
  353. if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
  354. {
  355. /* priority heap (still) empty, check for delay... */
  356. rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
  357. if (NULL == rp)
  358. {
  359. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  360. "No active requests for plan %p.\n",
  361. pp);
  362. return; /* both queues empty */
  363. }
  364. delay = GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission);
  365. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  366. "Sleeping for %s before retrying requests on plan %p.\n",
  367. GNUNET_STRINGS_relative_time_to_string (delay,
  368. GNUNET_YES),
  369. pp);
  370. GNUNET_STATISTICS_set (GSF_stats,
  371. gettext_noop ("# delay heap timeout (ms)"),
  372. delay.rel_value_us / 1000LL, GNUNET_NO);
  373. pp->task
  374. = GNUNET_SCHEDULER_add_at (rp->earliest_transmission,
  375. &schedule_peer_transmission,
  376. pp);
  377. return;
  378. }
  379. #if INSANE_STATISTICS
  380. GNUNET_STATISTICS_update (GSF_stats,
  381. gettext_noop ("# query plans executed"),
  382. 1,
  383. GNUNET_NO);
  384. #endif
  385. /* process from priority heap */
  386. rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap);
  387. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  388. "Executing query plan %p\n",
  389. rp);
  390. GNUNET_assert (NULL != rp);
  391. rp->hn = NULL;
  392. rp->last_transmission = GNUNET_TIME_absolute_get ();
  393. rp->transmission_counter++;
  394. total_delay++;
  395. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  396. "Executing plan %p executed %u times, planning retransmission\n",
  397. rp,
  398. rp->transmission_counter);
  399. GNUNET_assert (NULL == pp->env);
  400. pp->env = GSF_pending_request_get_message_ (get_latest (rp));
  401. GNUNET_MQ_notify_sent (pp->env,
  402. &schedule_peer_transmission,
  403. pp);
  404. GSF_peer_transmit_ (pp->cp,
  405. GNUNET_YES,
  406. rp->priority,
  407. pp->env);
  408. GNUNET_STATISTICS_update (GSF_stats,
  409. gettext_noop (
  410. "# query messages sent to other peers"),
  411. 1,
  412. GNUNET_NO);
  413. plan (pp,
  414. rp);
  415. }
  416. /**
  417. * Closure for merge_pr().
  418. */
  419. struct MergeContext
  420. {
  421. /**
  422. * Request we are trying to merge.
  423. */
  424. struct GSF_PendingRequest *pr;
  425. /**
  426. * Set to #GNUNET_YES if we succeeded to merge.
  427. */
  428. int merged;
  429. };
  430. /**
  431. * Iterator that checks if an equivalent request is already
  432. * present for this peer.
  433. *
  434. * @param cls closure
  435. * @param query the query
  436. * @param element request plan stored at the node
  437. * @return #GNUNET_YES if we should continue to iterate,
  438. * #GNUNET_NO if not (merge success)
  439. */
  440. static int
  441. merge_pr (void *cls,
  442. const struct GNUNET_HashCode *query,
  443. void *element)
  444. {
  445. struct MergeContext *mpr = cls;
  446. struct GSF_RequestPlan *rp = element;
  447. struct GSF_PendingRequestData *prd;
  448. struct GSF_PendingRequestPlanBijection *bi;
  449. struct GSF_PendingRequest *latest;
  450. GNUNET_break (GNUNET_YES ==
  451. GSF_pending_request_test_active_ (mpr->pr));
  452. if (GNUNET_OK !=
  453. GSF_pending_request_is_compatible_ (mpr->pr,
  454. rp->pe_head->pr))
  455. return GNUNET_YES;
  456. /* merge new request with existing request plan */
  457. bi = GNUNET_new (struct GSF_PendingRequestPlanBijection);
  458. bi->rp = rp;
  459. bi->pr = mpr->pr;
  460. prd = GSF_pending_request_get_data_ (mpr->pr);
  461. GNUNET_CONTAINER_MDLL_insert (PR,
  462. prd->pr_head,
  463. prd->pr_tail,
  464. bi);
  465. GNUNET_CONTAINER_MDLL_insert (PE,
  466. rp->pe_head,
  467. rp->pe_tail,
  468. bi);
  469. mpr->merged = GNUNET_YES;
  470. #if INSANE_STATISTICS
  471. GNUNET_STATISTICS_update (GSF_stats,
  472. gettext_noop ("# requests merged"),
  473. 1,
  474. GNUNET_NO);
  475. #endif
  476. latest = get_latest (rp);
  477. if (GSF_pending_request_get_data_ (latest)->ttl.abs_value_us <
  478. prd->ttl.abs_value_us)
  479. {
  480. #if INSANE_STATISTICS
  481. GNUNET_STATISTICS_update (GSF_stats,
  482. gettext_noop ("# requests refreshed"),
  483. 1,
  484. GNUNET_NO);
  485. #endif
  486. rp->transmission_counter = 0; /* reset */
  487. }
  488. return GNUNET_NO;
  489. }
  490. /**
  491. * Create a new query plan entry.
  492. *
  493. * @param cp peer with the entry
  494. * @param pr request with the entry
  495. */
  496. void
  497. GSF_plan_add_ (struct GSF_ConnectedPeer *cp,
  498. struct GSF_PendingRequest *pr)
  499. {
  500. const struct GNUNET_PeerIdentity *id;
  501. struct PeerPlan *pp;
  502. struct GSF_PendingRequestData *prd;
  503. struct GSF_RequestPlan *rp;
  504. struct GSF_PendingRequestPlanBijection *bi;
  505. struct MergeContext mpc;
  506. GNUNET_assert (GNUNET_YES ==
  507. GSF_pending_request_test_active_ (pr));
  508. GNUNET_assert (NULL != cp);
  509. id = GSF_connected_peer_get_identity2_ (cp);
  510. pp = GNUNET_CONTAINER_multipeermap_get (plans, id);
  511. if (NULL == pp)
  512. {
  513. pp = GNUNET_new (struct PeerPlan);
  514. pp->plan_map = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_NO);
  515. pp->priority_heap =
  516. GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
  517. pp->delay_heap =
  518. GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
  519. pp->cp = cp;
  520. GNUNET_assert (GNUNET_OK ==
  521. GNUNET_CONTAINER_multipeermap_put (plans,
  522. id,
  523. pp,
  524. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
  525. pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission,
  526. pp);
  527. }
  528. mpc.merged = GNUNET_NO;
  529. mpc.pr = pr;
  530. prd = GSF_pending_request_get_data_ (pr);
  531. GNUNET_CONTAINER_multihashmap_get_multiple (pp->plan_map,
  532. &prd->query,
  533. &merge_pr,
  534. &mpc);
  535. if (GNUNET_NO != mpc.merged)
  536. return;
  537. plan_count++;
  538. GNUNET_STATISTICS_update (GSF_stats,
  539. gettext_noop ("# query plan entries"),
  540. 1,
  541. GNUNET_NO);
  542. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  543. "Planning transmission of query `%s' to peer `%s'\n",
  544. GNUNET_h2s (&prd->query),
  545. GNUNET_i2s (id));
  546. rp = GNUNET_new (struct GSF_RequestPlan);
  547. bi = GNUNET_new (struct GSF_PendingRequestPlanBijection);
  548. bi->rp = rp;
  549. bi->pr = pr;
  550. GNUNET_CONTAINER_MDLL_insert (PR,
  551. prd->pr_head,
  552. prd->pr_tail,
  553. bi);
  554. GNUNET_CONTAINER_MDLL_insert (PE,
  555. rp->pe_head,
  556. rp->pe_tail,
  557. bi);
  558. rp->pp = pp;
  559. GNUNET_assert (GNUNET_YES ==
  560. GNUNET_CONTAINER_multihashmap_put (pp->plan_map,
  561. get_rp_key (rp),
  562. rp,
  563. GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
  564. plan (pp,
  565. rp);
  566. }
  567. /**
  568. * Notify the plan about a peer being no longer available;
  569. * destroy all entries associated with this peer.
  570. *
  571. * @param cp connected peer
  572. */
  573. void
  574. GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
  575. {
  576. const struct GNUNET_PeerIdentity *id;
  577. struct PeerPlan *pp;
  578. struct GSF_RequestPlan *rp;
  579. struct GSF_PendingRequestData *prd;
  580. struct GSF_PendingRequestPlanBijection *bi;
  581. id = GSF_connected_peer_get_identity2_ (cp);
  582. pp = GNUNET_CONTAINER_multipeermap_get (plans, id);
  583. if (NULL == pp)
  584. return; /* nothing was ever planned for this peer */
  585. GNUNET_assert (GNUNET_YES ==
  586. GNUNET_CONTAINER_multipeermap_remove (plans, id,
  587. pp));
  588. if (NULL != pp->task)
  589. {
  590. GNUNET_SCHEDULER_cancel (pp->task);
  591. pp->task = NULL;
  592. }
  593. while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
  594. {
  595. GNUNET_break (GNUNET_YES ==
  596. GNUNET_CONTAINER_multihashmap_remove (pp->plan_map,
  597. get_rp_key (rp),
  598. rp));
  599. while (NULL != (bi = rp->pe_head))
  600. {
  601. GNUNET_CONTAINER_MDLL_remove (PE,
  602. rp->pe_head,
  603. rp->pe_tail,
  604. bi);
  605. prd = GSF_pending_request_get_data_ (bi->pr);
  606. GNUNET_CONTAINER_MDLL_remove (PR,
  607. prd->pr_head,
  608. prd->pr_tail,
  609. bi);
  610. GNUNET_free (bi);
  611. }
  612. plan_count--;
  613. GNUNET_free (rp);
  614. }
  615. GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
  616. while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
  617. {
  618. GNUNET_break (GNUNET_YES ==
  619. GNUNET_CONTAINER_multihashmap_remove (pp->plan_map,
  620. get_rp_key (rp),
  621. rp));
  622. while (NULL != (bi = rp->pe_head))
  623. {
  624. prd = GSF_pending_request_get_data_ (bi->pr);
  625. GNUNET_CONTAINER_MDLL_remove (PE,
  626. rp->pe_head,
  627. rp->pe_tail,
  628. bi);
  629. GNUNET_CONTAINER_MDLL_remove (PR,
  630. prd->pr_head,
  631. prd->pr_tail,
  632. bi);
  633. GNUNET_free (bi);
  634. }
  635. plan_count--;
  636. GNUNET_free (rp);
  637. }
  638. GNUNET_STATISTICS_set (GSF_stats,
  639. gettext_noop ("# query plan entries"),
  640. plan_count,
  641. GNUNET_NO);
  642. GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
  643. GNUNET_CONTAINER_multihashmap_destroy (pp->plan_map);
  644. GNUNET_free (pp);
  645. }
  646. /**
  647. * Get the last transmission attempt time for the request plan list
  648. * referenced by @a pr_head, that was sent to @a sender
  649. *
  650. * @param pr_head request plan reference list to check.
  651. * @param sender the peer that we've sent the request to.
  652. * @param result the timestamp to fill, set to #GNUNET_TIME_UNIT_FOREVER_ABS if never transmitted
  653. * @return #GNUNET_YES if @a result was changed, #GNUNET_NO otherwise.
  654. */
  655. int
  656. GSF_request_plan_reference_get_last_transmission_ (struct
  657. GSF_PendingRequestPlanBijection
  658. *pr_head,
  659. struct GSF_ConnectedPeer *
  660. sender,
  661. struct GNUNET_TIME_Absolute *
  662. result)
  663. {
  664. struct GSF_PendingRequestPlanBijection *bi;
  665. for (bi = pr_head; NULL != bi; bi = bi->next_PR)
  666. {
  667. if (bi->rp->pp->cp == sender)
  668. {
  669. if (0 == bi->rp->last_transmission.abs_value_us)
  670. *result = GNUNET_TIME_UNIT_FOREVER_ABS;
  671. else
  672. *result = bi->rp->last_transmission;
  673. return GNUNET_YES;
  674. }
  675. }
  676. return GNUNET_NO;
  677. }
  678. /**
  679. * Notify the plan about a request being done; destroy all entries
  680. * associated with this request.
  681. *
  682. * @param pr request that is done
  683. */
  684. void
  685. GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr)
  686. {
  687. struct GSF_RequestPlan *rp;
  688. struct GSF_PendingRequestData *prd;
  689. struct GSF_PendingRequestPlanBijection *bi;
  690. prd = GSF_pending_request_get_data_ (pr);
  691. while (NULL != (bi = prd->pr_head))
  692. {
  693. rp = bi->rp;
  694. GNUNET_CONTAINER_MDLL_remove (PR,
  695. prd->pr_head,
  696. prd->pr_tail,
  697. bi);
  698. GNUNET_CONTAINER_MDLL_remove (PE,
  699. rp->pe_head,
  700. rp->pe_tail,
  701. bi);
  702. GNUNET_assert (bi->pr == pr);
  703. if (NULL == rp->pe_head)
  704. {
  705. GNUNET_CONTAINER_heap_remove_node (rp->hn);
  706. plan_count--;
  707. GNUNET_break (GNUNET_YES ==
  708. GNUNET_CONTAINER_multihashmap_remove (rp->pp->plan_map,
  709. &prd->query,
  710. rp));
  711. GNUNET_free (rp);
  712. }
  713. GNUNET_free (bi);
  714. }
  715. GNUNET_STATISTICS_set (GSF_stats,
  716. gettext_noop ("# query plan entries"),
  717. plan_count,
  718. GNUNET_NO);
  719. }
  720. /**
  721. * Initialize plan subsystem.
  722. */
  723. void
  724. GSF_plan_init ()
  725. {
  726. plans = GNUNET_CONTAINER_multipeermap_create (256,
  727. GNUNET_YES);
  728. }
  729. /**
  730. * Shutdown plan subsystem.
  731. */
  732. void
  733. GSF_plan_done ()
  734. {
  735. GNUNET_assert (0 == GNUNET_CONTAINER_multipeermap_size (plans));
  736. GNUNET_CONTAINER_multipeermap_destroy (plans);
  737. }
  738. /* end of gnunet-service-fs_pe.h */