gnunet-service-fs_pe.c 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798
  1. /*
  2. This file is part of GNUnet.
  3. (C) 2011 Christian Grothoff (and other contributing authors)
  4. GNUnet is free software; you can redistribute it and/or modify
  5. it under the terms of the GNU General Public License as published
  6. by the Free Software Foundation; either version 3, or (at your
  7. option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. General Public License for more details.
  12. You should have received a copy of the GNU General Public License
  13. along with GNUnet; see the file COPYING. If not, write to the
  14. Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  15. Boston, MA 02111-1307, USA.
  16. */
  17. /**
  18. * @file fs/gnunet-service-fs_pe.c
  19. * @brief API to manage query plan
  20. * @author Christian Grothoff
  21. */
  22. #include "platform.h"
  23. #include "gnunet-service-fs.h"
  24. #include "gnunet-service-fs_cp.h"
  25. #include "gnunet-service-fs_pe.h"
  26. #include "gnunet-service-fs_pr.h"
  27. /**
  28. * Collect an instane number of statistics? May cause excessive IPC.
  29. */
  30. #define INSANE_STATISTICS GNUNET_NO
  31. /**
  32. * List of GSF_PendingRequests this request plan
  33. * participates with.
  34. */
  35. struct PendingRequestList;
  36. /**
  37. * Transmission plan for a peer.
  38. */
  39. struct PeerPlan;
  40. /**
  41. * M:N binding of plans to pending requests.
  42. * Each pending request can be in a number of plans,
  43. * and each plan can have a number of pending requests.
  44. * Objects of this type indicate a mapping of a plan to
  45. * a particular pending request.
  46. *
  47. * The corresponding head and tail of the "PE" MDLL
  48. * are stored in a `struct GSF_RequestPlan`. (We need
  49. * to be able to lookup all pending requests corresponding
  50. * to a given plan entry.)
  51. *
  52. * Similarly head and tail of the "PR" MDLL are stored
  53. * with the 'struct GSF_PendingRequest'. (We need
  54. * to be able to lookup all plan entries corresponding
  55. * to a given pending request.)
  56. */
  57. struct GSF_PendingRequestPlanBijection
  58. {
  59. /**
  60. * This is a doubly-linked list.
  61. */
  62. struct GSF_PendingRequestPlanBijection *next_PR;
  63. /**
  64. * This is a doubly-linked list.
  65. */
  66. struct GSF_PendingRequestPlanBijection *prev_PR;
  67. /**
  68. * This is a doubly-linked list.
  69. */
  70. struct GSF_PendingRequestPlanBijection *next_PE;
  71. /**
  72. * This is a doubly-linked list.
  73. */
  74. struct GSF_PendingRequestPlanBijection *prev_PE;
  75. /**
  76. * Associated request plan.
  77. */
  78. struct GSF_RequestPlan *rp;
  79. /**
  80. * Associated pending request.
  81. */
  82. struct GSF_PendingRequest *pr;
  83. };
  84. /**
  85. * Information we keep per request per peer. This is a doubly-linked
  86. * list (with head and tail in the 'struct GSF_PendingRequestData')
  87. * with one entry in each heap of each 'struct PeerPlan'. Each
  88. * entry tracks information relevant for this request and this peer.
  89. */
  90. struct GSF_RequestPlan
  91. {
  92. /**
  93. * This is a doubly-linked list.
  94. */
  95. struct GSF_RequestPlan *next;
  96. /**
  97. * This is a doubly-linked list.
  98. */
  99. struct GSF_RequestPlan *prev;
  100. /**
  101. * Heap node associated with this request and this peer.
  102. */
  103. struct GNUNET_CONTAINER_HeapNode *hn;
  104. /**
  105. * The transmission plan for a peer that this request is associated with.
  106. */
  107. struct PeerPlan *pp;
  108. /**
  109. * Head of list of associated pending requests.
  110. */
  111. struct GSF_PendingRequestPlanBijection *pe_head;
  112. /**
  113. * Tail of list of associated pending requests.
  114. */
  115. struct GSF_PendingRequestPlanBijection *pe_tail;
  116. /**
  117. * Earliest time we'd be happy to (re)transmit this request.
  118. */
  119. struct GNUNET_TIME_Absolute earliest_transmission;
  120. /**
  121. * When was the last time we transmitted this request to this peer? 0 for never.
  122. */
  123. struct GNUNET_TIME_Absolute last_transmission;
  124. /**
  125. * Current priority for this request for this target.
  126. */
  127. uint64_t priority;
  128. /**
  129. * How often did we transmit this request to this peer?
  130. */
  131. unsigned int transmission_counter;
  132. };
  133. /**
  134. * Transmission plan for a peer.
  135. */
  136. struct PeerPlan
  137. {
  138. /**
  139. * Heap with pending queries (struct GSF_RequestPlan), higher weights mean higher priority.
  140. */
  141. struct GNUNET_CONTAINER_Heap *priority_heap;
  142. /**
  143. * Heap with pending queries (struct GSF_RequestPlan), by transmission time, lowest first.
  144. */
  145. struct GNUNET_CONTAINER_Heap *delay_heap;
  146. /**
  147. * Map of queries to plan entries. All entries in the priority_heap or delay_heap
  148. * should be in the plan map. Note that it IS possible for the plan map to have
  149. * multiple entries for the same query.
  150. */
  151. struct GNUNET_CONTAINER_MultiHashMap *plan_map;
  152. /**
  153. * Current transmission request handle.
  154. */
  155. struct GSF_PeerTransmitHandle *pth;
  156. /**
  157. * Peer for which this is the plan.
  158. */
  159. struct GSF_ConnectedPeer *cp;
  160. /**
  161. * Current task for executing the plan.
  162. */
  163. GNUNET_SCHEDULER_TaskIdentifier task;
  164. };
  165. /**
  166. * Hash map from peer identities to PeerPlans.
  167. */
  168. static struct GNUNET_CONTAINER_MultiPeerMap *plans;
  169. /**
  170. * Sum of all transmission counters (equals total delay for all plan entries).
  171. */
  172. static unsigned long long total_delay;
  173. /**
  174. * Number of plan entries.
  175. */
  176. static unsigned long long plan_count;
  177. /**
  178. * Return the query (key in the plan_map) for the given request plan.
  179. * Note that this key may change as there can be multiple pending
  180. * requests for the same key and we just return _one_ of them; this
  181. * particular one might complete while another one might still be
  182. * active, hence the lifetime of the returned hash code is NOT
  183. * necessarily identical to that of the 'struct GSF_RequestPlan'
  184. * given.
  185. *
  186. * @param rp a request plan
  187. * @return the associated query
  188. */
  189. static const struct GNUNET_HashCode *
  190. get_rp_key (struct GSF_RequestPlan *rp)
  191. {
  192. return &GSF_pending_request_get_data_ (rp->pe_head->pr)->query;
  193. }
  194. /**
  195. * Figure out when and how to transmit to the given peer.
  196. *
  197. * @param cls the `struct GSF_ConnectedPeer` for transmission
  198. * @param tc scheduler context
  199. */
  200. static void
  201. schedule_peer_transmission (void *cls,
  202. const struct GNUNET_SCHEDULER_TaskContext *tc);
  203. /**
  204. * Insert the given request plan into the heap with the appropriate weight.
  205. *
  206. * @param pp associated peer's plan
  207. * @param rp request to plan
  208. */
  209. static void
  210. plan (struct PeerPlan *pp, struct GSF_RequestPlan *rp)
  211. {
  212. #define N ((double)128.0)
  213. /**
  214. * Running average delay we currently impose.
  215. */
  216. static double avg_delay;
  217. struct GSF_PendingRequestData *prd;
  218. struct GNUNET_TIME_Relative delay;
  219. GNUNET_assert (rp->pp == pp);
  220. GNUNET_STATISTICS_set (GSF_stats,
  221. gettext_noop ("# average retransmission delay (ms)"),
  222. total_delay * 1000LL / plan_count, GNUNET_NO);
  223. prd = GSF_pending_request_get_data_ (rp->pe_head->pr);
  224. if (rp->transmission_counter < 8)
  225. delay =
  226. GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
  227. rp->transmission_counter);
  228. else if (rp->transmission_counter < 32)
  229. delay =
  230. GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
  231. 8 +
  232. (1LL << (rp->transmission_counter - 8)));
  233. else
  234. delay =
  235. GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
  236. 8 + (1LL << 24));
  237. delay.rel_value_us =
  238. GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
  239. delay.rel_value_us + 1);
  240. /* Add 0.01 to avg_delay to avoid division-by-zero later */
  241. avg_delay = (((avg_delay * (N - 1.0)) + delay.rel_value_us) / N) + 0.01;
  242. /*
  243. * For the priority, we need to consider a few basic rules:
  244. * 1) if we just started requesting (delay is small), we should
  245. * virtually always have a priority of zero.
  246. * 2) for requests with average latency, our priority should match
  247. * the average priority observed on the network
  248. * 3) even the longest-running requests should not be WAY out of
  249. * the observed average (thus we bound by a factor of 2)
  250. * 4) we add +1 to the observed average priority to avoid everyone
  251. * staying put at zero (2 * 0 = 0...).
  252. *
  253. * Using the specific calculation below, we get:
  254. *
  255. * delay = 0 => priority = 0;
  256. * delay = avg delay => priority = running-average-observed-priority;
  257. * delay >> avg_delay => priority = 2 * running-average-observed-priority;
  258. *
  259. * which satisfies all of the rules above.
  260. *
  261. * Note: M_PI_4 = PI/4 = arctan(1)
  262. */
  263. rp->priority =
  264. round ((GSF_current_priorities +
  265. 1.0) * atan (delay.rel_value_us / avg_delay)) / M_PI_4;
  266. /* Note: usage of 'round' and 'atan' requires -lm */
  267. if (rp->transmission_counter != 0)
  268. delay.rel_value_us += TTL_DECREMENT * 1000;
  269. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  270. "Considering (re)transmission number %u in %s\n",
  271. (unsigned int) rp->transmission_counter,
  272. GNUNET_STRINGS_relative_time_to_string (delay,
  273. GNUNET_YES));
  274. rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay);
  275. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  276. "Earliest (re)transmission for `%s' in %us\n",
  277. GNUNET_h2s (&prd->query), rp->transmission_counter);
  278. GNUNET_assert (rp->hn == NULL);
  279. if (0 == GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value_us)
  280. rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority);
  281. else
  282. rp->hn =
  283. GNUNET_CONTAINER_heap_insert (pp->delay_heap, rp,
  284. rp->earliest_transmission.abs_value_us);
  285. GNUNET_assert (GNUNET_YES ==
  286. GNUNET_CONTAINER_multihashmap_contains_value (pp->plan_map,
  287. get_rp_key (rp),
  288. rp));
  289. if (GNUNET_SCHEDULER_NO_TASK != pp->task)
  290. GNUNET_SCHEDULER_cancel (pp->task);
  291. pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
  292. #undef N
  293. }
  294. /**
  295. * Get the pending request with the highest TTL from the given plan.
  296. *
  297. * @param rp plan to investigate
  298. * @return pending request with highest TTL
  299. */
  300. struct GSF_PendingRequest *
  301. get_latest (const struct GSF_RequestPlan *rp)
  302. {
  303. struct GSF_PendingRequest *ret;
  304. struct GSF_PendingRequestPlanBijection *bi;
  305. bi = rp->pe_head;
  306. if (NULL == bi)
  307. return NULL; /* should never happen */
  308. ret = bi->pr;
  309. bi = bi->next_PE;
  310. while (NULL != bi)
  311. {
  312. if (GSF_pending_request_get_data_ (bi->pr)->ttl.abs_value_us >
  313. GSF_pending_request_get_data_ (ret)->ttl.abs_value_us)
  314. ret = bi->pr;
  315. bi = bi->next_PE;
  316. }
  317. return ret;
  318. }
  319. /**
  320. * Function called to get a message for transmission.
  321. *
  322. * @param cls closure
  323. * @param buf_size number of bytes available in @a buf
  324. * @param buf where to copy the message, NULL on error (peer disconnect)
  325. * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
  326. */
  327. static size_t
  328. transmit_message_callback (void *cls, size_t buf_size, void *buf)
  329. {
  330. struct PeerPlan *pp = cls;
  331. struct GSF_RequestPlan *rp;
  332. size_t msize;
  333. pp->pth = NULL;
  334. if (NULL == buf)
  335. {
  336. /* failed, try again... */
  337. if (GNUNET_SCHEDULER_NO_TASK != pp->task)
  338. GNUNET_SCHEDULER_cancel (pp->task);
  339. pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
  340. GNUNET_STATISTICS_update (GSF_stats,
  341. gettext_noop
  342. ("# transmission failed (core has no bandwidth)"),
  343. 1, GNUNET_NO);
  344. return 0;
  345. }
  346. rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
  347. if (NULL == rp)
  348. {
  349. if (GNUNET_SCHEDULER_NO_TASK != pp->task)
  350. GNUNET_SCHEDULER_cancel (pp->task);
  351. pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
  352. return 0;
  353. }
  354. msize = GSF_pending_request_get_message_ (get_latest (rp), buf_size, buf);
  355. if (msize > buf_size)
  356. {
  357. if (GNUNET_SCHEDULER_NO_TASK != pp->task)
  358. GNUNET_SCHEDULER_cancel (pp->task);
  359. /* buffer to small (message changed), try again */
  360. pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
  361. return 0;
  362. }
  363. /* remove from root, add again elsewhere... */
  364. GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
  365. rp->hn = NULL;
  366. rp->last_transmission = GNUNET_TIME_absolute_get ();
  367. rp->transmission_counter++;
  368. total_delay++;
  369. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  370. "Executing plan %p executed %u times, planning retransmission\n",
  371. rp, rp->transmission_counter);
  372. plan (pp, rp);
  373. GNUNET_STATISTICS_update (GSF_stats,
  374. gettext_noop
  375. ("# query messages sent to other peers"), 1,
  376. GNUNET_NO);
  377. return msize;
  378. }
  379. /**
  380. * Figure out when and how to transmit to the given peer.
  381. *
  382. * @param cls the `struct PeerPlan`
  383. * @param tc scheduler context
  384. */
  385. static void
  386. schedule_peer_transmission (void *cls,
  387. const struct GNUNET_SCHEDULER_TaskContext *tc)
  388. {
  389. struct PeerPlan *pp = cls;
  390. struct GSF_RequestPlan *rp;
  391. size_t msize;
  392. struct GNUNET_TIME_Relative delay;
  393. pp->task = GNUNET_SCHEDULER_NO_TASK;
  394. if (NULL != pp->pth)
  395. {
  396. GSF_peer_transmit_cancel_ (pp->pth);
  397. pp->pth = NULL;
  398. }
  399. /* move ready requests to priority queue */
  400. while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
  401. (0 == GNUNET_TIME_absolute_get_remaining
  402. (rp->earliest_transmission).rel_value_us))
  403. {
  404. GNUNET_assert (rp == GNUNET_CONTAINER_heap_remove_root (pp->delay_heap));
  405. rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority);
  406. }
  407. if (0 == GNUNET_CONTAINER_heap_get_size (pp->priority_heap))
  408. {
  409. /* priority heap (still) empty, check for delay... */
  410. rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap);
  411. if (NULL == rp)
  412. {
  413. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No active requests for plan %p.\n",
  414. pp);
  415. return; /* both queues empty */
  416. }
  417. delay = GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission);
  418. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  419. "Sleeping for %s before retrying requests on plan %p.\n",
  420. GNUNET_STRINGS_relative_time_to_string (delay,
  421. GNUNET_YES),
  422. pp);
  423. GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# delay heap timeout (ms)"),
  424. delay.rel_value_us / 1000LL, GNUNET_NO);
  425. pp->task =
  426. GNUNET_SCHEDULER_add_delayed (delay, &schedule_peer_transmission, pp);
  427. return;
  428. }
  429. #if INSANE_STATISTICS
  430. GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# query plans executed"),
  431. 1, GNUNET_NO);
  432. #endif
  433. /* process from priority heap */
  434. rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
  435. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing query plan %p\n", rp);
  436. GNUNET_assert (NULL != rp);
  437. msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL);
  438. pp->pth =
  439. GSF_peer_transmit_ (pp->cp, GNUNET_YES, rp->priority,
  440. GNUNET_TIME_UNIT_FOREVER_REL, msize,
  441. &transmit_message_callback, pp);
  442. GNUNET_assert (NULL != pp->pth);
  443. }
  444. /**
  445. * Closure for merge_pr().
  446. */
  447. struct MergeContext
  448. {
  449. struct GSF_PendingRequest *pr;
  450. int merged;
  451. };
  452. /**
  453. * Iterator that checks if an equivalent request is already
  454. * present for this peer.
  455. *
  456. * @param cls closure
  457. * @param query the query
  458. * @param element request plan stored at the node
  459. * @return #GNUNET_YES if we should continue to iterate,
  460. * #GNUNET_NO if not (merge success)
  461. */
  462. static int
  463. merge_pr (void *cls, const struct GNUNET_HashCode * query, void *element)
  464. {
  465. struct MergeContext *mpr = cls;
  466. struct GSF_RequestPlan *rp = element;
  467. struct GSF_PendingRequestData *prd;
  468. struct GSF_PendingRequestPlanBijection *bi;
  469. struct GSF_PendingRequest *latest;
  470. if (GNUNET_OK !=
  471. GSF_pending_request_is_compatible_ (mpr->pr, rp->pe_head->pr))
  472. return GNUNET_YES;
  473. /* merge new request with existing request plan */
  474. bi = GNUNET_new (struct GSF_PendingRequestPlanBijection);
  475. bi->rp = rp;
  476. bi->pr = mpr->pr;
  477. prd = GSF_pending_request_get_data_ (mpr->pr);
  478. GNUNET_CONTAINER_MDLL_insert (PR, prd->pr_head, prd->pr_tail, bi);
  479. GNUNET_CONTAINER_MDLL_insert (PE, rp->pe_head, rp->pe_tail, bi);
  480. mpr->merged = GNUNET_YES;
  481. #if INSANE_STATISTICS
  482. GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# requests merged"), 1,
  483. GNUNET_NO);
  484. #endif
  485. latest = get_latest (rp);
  486. if (GSF_pending_request_get_data_ (latest)->ttl.abs_value_us <
  487. prd->ttl.abs_value_us)
  488. {
  489. #if INSANE_STATISTICS
  490. GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# requests refreshed"),
  491. 1, GNUNET_NO);
  492. #endif
  493. rp->transmission_counter = 0; /* reset */
  494. }
  495. return GNUNET_NO;
  496. }
  497. /**
  498. * Create a new query plan entry.
  499. *
  500. * @param cp peer with the entry
  501. * @param pr request with the entry
  502. */
  503. void
  504. GSF_plan_add_ (struct GSF_ConnectedPeer *cp,
  505. struct GSF_PendingRequest *pr)
  506. {
  507. const struct GNUNET_PeerIdentity *id;
  508. struct PeerPlan *pp;
  509. struct GSF_PendingRequestData *prd;
  510. struct GSF_RequestPlan *rp;
  511. struct GSF_PendingRequestPlanBijection *bi;
  512. struct MergeContext mpc;
  513. GNUNET_assert (NULL != cp);
  514. id = GSF_connected_peer_get_identity2_ (cp);
  515. pp = GNUNET_CONTAINER_multipeermap_get (plans, id);
  516. if (NULL == pp)
  517. {
  518. pp = GNUNET_new (struct PeerPlan);
  519. pp->plan_map = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_NO);
  520. pp->priority_heap =
  521. GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MAX);
  522. pp->delay_heap =
  523. GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
  524. pp->cp = cp;
  525. GNUNET_assert (GNUNET_OK ==
  526. GNUNET_CONTAINER_multipeermap_put (plans,
  527. id, pp,
  528. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
  529. }
  530. mpc.merged = GNUNET_NO;
  531. mpc.pr = pr;
  532. GNUNET_CONTAINER_multihashmap_get_multiple (pp->plan_map,
  533. &GSF_pending_request_get_data_
  534. (pr)->query, &merge_pr, &mpc);
  535. if (GNUNET_NO != mpc.merged)
  536. return;
  537. GNUNET_CONTAINER_multihashmap_get_multiple (pp->plan_map,
  538. &GSF_pending_request_get_data_
  539. (pr)->query, &merge_pr, &mpc);
  540. if (GNUNET_NO != mpc.merged)
  541. return;
  542. plan_count++;
  543. GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# query plan entries"), 1,
  544. GNUNET_NO);
  545. prd = GSF_pending_request_get_data_ (pr);
  546. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  547. "Planning transmission of query `%s' to peer `%s'\n",
  548. GNUNET_h2s (&prd->query), GNUNET_i2s (id));
  549. rp = GNUNET_new (struct GSF_RequestPlan);
  550. bi = GNUNET_new (struct GSF_PendingRequestPlanBijection);
  551. bi->rp = rp;
  552. bi->pr = pr;
  553. GNUNET_CONTAINER_MDLL_insert (PR, prd->pr_head, prd->pr_tail, bi);
  554. GNUNET_CONTAINER_MDLL_insert (PE, rp->pe_head, rp->pe_tail, bi);
  555. rp->pp = pp;
  556. GNUNET_assert (GNUNET_YES ==
  557. GNUNET_CONTAINER_multihashmap_put (pp->plan_map,
  558. get_rp_key (rp), rp,
  559. GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
  560. plan (pp, rp);
  561. }
  562. /**
  563. * Notify the plan about a peer being no longer available;
  564. * destroy all entries associated with this peer.
  565. *
  566. * @param cp connected peer
  567. */
  568. void
  569. GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
  570. {
  571. const struct GNUNET_PeerIdentity *id;
  572. struct PeerPlan *pp;
  573. struct GSF_RequestPlan *rp;
  574. struct GSF_PendingRequestData *prd;
  575. struct GSF_PendingRequestPlanBijection *bi;
  576. id = GSF_connected_peer_get_identity2_ (cp);
  577. pp = GNUNET_CONTAINER_multipeermap_get (plans, id);
  578. if (NULL == pp)
  579. return; /* nothing was ever planned for this peer */
  580. GNUNET_assert (GNUNET_YES ==
  581. GNUNET_CONTAINER_multipeermap_remove (plans, id,
  582. pp));
  583. if (NULL != pp->pth)
  584. {
  585. GSF_peer_transmit_cancel_ (pp->pth);
  586. pp->pth = NULL;
  587. }
  588. if (GNUNET_SCHEDULER_NO_TASK != pp->task)
  589. {
  590. GNUNET_SCHEDULER_cancel (pp->task);
  591. pp->task = GNUNET_SCHEDULER_NO_TASK;
  592. }
  593. while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
  594. {
  595. GNUNET_break (GNUNET_YES ==
  596. GNUNET_CONTAINER_multihashmap_remove (pp->plan_map,
  597. get_rp_key (rp), rp));
  598. while (NULL != (bi = rp->pe_head))
  599. {
  600. GNUNET_CONTAINER_MDLL_remove (PE, rp->pe_head, rp->pe_tail, bi);
  601. prd = GSF_pending_request_get_data_ (bi->pr);
  602. GNUNET_CONTAINER_MDLL_remove (PR, prd->pr_head, prd->pr_tail, bi);
  603. GNUNET_free (bi);
  604. }
  605. plan_count--;
  606. GNUNET_free (rp);
  607. }
  608. GNUNET_CONTAINER_heap_destroy (pp->priority_heap);
  609. while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->delay_heap)))
  610. {
  611. GNUNET_break (GNUNET_YES ==
  612. GNUNET_CONTAINER_multihashmap_remove (pp->plan_map,
  613. get_rp_key (rp), rp));
  614. while (NULL != (bi = rp->pe_head))
  615. {
  616. prd = GSF_pending_request_get_data_ (bi->pr);
  617. GNUNET_CONTAINER_MDLL_remove (PE, rp->pe_head, rp->pe_tail, bi);
  618. GNUNET_CONTAINER_MDLL_remove (PR, prd->pr_head, prd->pr_tail, bi);
  619. GNUNET_free (bi);
  620. }
  621. plan_count--;
  622. GNUNET_free (rp);
  623. }
  624. GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# query plan entries"),
  625. plan_count, GNUNET_NO);
  626. GNUNET_CONTAINER_heap_destroy (pp->delay_heap);
  627. GNUNET_CONTAINER_multihashmap_destroy (pp->plan_map);
  628. GNUNET_free (pp);
  629. }
  630. /**
  631. * Get the last transmission attempt time for the request plan list
  632. * referenced by @a pr_head, that was sent to @a sender
  633. *
  634. * @param pr_head request plan reference list to check.
  635. * @param sender the peer that we've sent the request to.
  636. * @param result the timestamp to fill, set to #GNUNET_TIME_UNIT_FOREVER_ABS if never transmitted
  637. * @return #GNUNET_YES if @a result was changed, #GNUNET_NO otherwise.
  638. */
  639. int
  640. GSF_request_plan_reference_get_last_transmission_ (struct GSF_PendingRequestPlanBijection *pr_head,
  641. struct GSF_ConnectedPeer *sender,
  642. struct GNUNET_TIME_Absolute *result)
  643. {
  644. struct GSF_PendingRequestPlanBijection *bi;
  645. for (bi = pr_head; NULL != bi; bi = bi->next_PR)
  646. {
  647. if (bi->rp->pp->cp == sender)
  648. {
  649. if (0 == bi->rp->last_transmission.abs_value_us)
  650. *result = GNUNET_TIME_UNIT_FOREVER_ABS;
  651. else
  652. *result = bi->rp->last_transmission;
  653. return GNUNET_YES;
  654. }
  655. }
  656. return GNUNET_NO;
  657. }
  658. /**
  659. * Notify the plan about a request being done; destroy all entries
  660. * associated with this request.
  661. *
  662. * @param pr request that is done
  663. */
  664. void
  665. GSF_plan_notify_request_done_ (struct GSF_PendingRequest *pr)
  666. {
  667. struct GSF_RequestPlan *rp;
  668. struct GSF_PendingRequestData *prd;
  669. struct GSF_PendingRequestPlanBijection *bi;
  670. prd = GSF_pending_request_get_data_ (pr);
  671. while (NULL != (bi = prd->pr_head))
  672. {
  673. rp = bi->rp;
  674. GNUNET_CONTAINER_MDLL_remove (PR, prd->pr_head, prd->pr_tail, bi);
  675. GNUNET_CONTAINER_MDLL_remove (PE, rp->pe_head, rp->pe_tail, bi);
  676. if (NULL == rp->pe_head)
  677. {
  678. GNUNET_CONTAINER_heap_remove_node (rp->hn);
  679. plan_count--;
  680. GNUNET_break (GNUNET_YES ==
  681. GNUNET_CONTAINER_multihashmap_remove (rp->pp->plan_map,
  682. &GSF_pending_request_get_data_
  683. (bi->pr)->query,
  684. rp));
  685. GNUNET_free (rp);
  686. }
  687. GNUNET_free (bi);
  688. }
  689. GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# query plan entries"),
  690. plan_count, GNUNET_NO);
  691. }
  692. /**
  693. * Initialize plan subsystem.
  694. */
  695. void
  696. GSF_plan_init ()
  697. {
  698. plans = GNUNET_CONTAINER_multipeermap_create (256, GNUNET_YES);
  699. }
  700. /**
  701. * Shutdown plan subsystem.
  702. */
  703. void
  704. GSF_plan_done ()
  705. {
  706. GNUNET_assert (0 == GNUNET_CONTAINER_multipeermap_size (plans));
  707. GNUNET_CONTAINER_multipeermap_destroy (plans);
  708. }
  709. /* end of gnunet-service-fs_pe.h */