gnunet-service-fs_cp.c 52 KB


  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2011, 2016 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file fs/gnunet-service-fs_cp.c
  18. * @brief API to handle 'connected peers'
  19. * @author Christian Grothoff
  20. */
  21. #include "platform.h"
  22. #include "gnunet_util_lib.h"
  23. #include "gnunet_load_lib.h"
  24. #include "gnunet-service-fs.h"
  25. #include "gnunet-service-fs_cp.h"
  26. #include "gnunet-service-fs_pe.h"
  27. #include "gnunet-service-fs_pr.h"
  28. #include "gnunet-service-fs_push.h"
  29. #include "gnunet_peerstore_service.h"
  30. /**
  31. * Ratio for moving average delay calculation. The previous
  32. * average goes in with a factor of (n-1) into the calculation.
  33. * Must be > 0.
  34. */
  35. #define RUNAVG_DELAY_N 16
  36. /**
  37. * How often do we flush respect values to disk?
  38. */
  39. #define RESPECT_FLUSH_FREQ GNUNET_TIME_relative_multiply ( \
  40. GNUNET_TIME_UNIT_MINUTES, 5)
  41. /**
  42. * After how long do we discard a reply?
  43. */
  44. #define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, \
  45. 2)
  46. /**
  47. * Collect an instane number of statistics? May cause excessive IPC.
  48. */
  49. #define INSANE_STATISTICS GNUNET_NO
  50. /**
  51. * Handle to cancel a transmission request.
  52. */
  53. struct GSF_PeerTransmitHandle
  54. {
  55. /**
  56. * Kept in a doubly-linked list.
  57. */
  58. struct GSF_PeerTransmitHandle *next;
  59. /**
  60. * Kept in a doubly-linked list.
  61. */
  62. struct GSF_PeerTransmitHandle *prev;
  63. /**
  64. * Time when this transmission request was issued.
  65. */
  66. struct GNUNET_TIME_Absolute transmission_request_start_time;
  67. /**
  68. * Envelope with the actual message.
  69. */
  70. struct GNUNET_MQ_Envelope *env;
  71. /**
  72. * Peer this request targets.
  73. */
  74. struct GSF_ConnectedPeer *cp;
  75. /**
  76. * #GNUNET_YES if this is a query, #GNUNET_NO for content.
  77. */
  78. int is_query;
  79. /**
  80. * Did we get a reservation already?
  81. */
  82. int was_reserved;
  83. /**
  84. * Priority of this request.
  85. */
  86. uint32_t priority;
  87. };
  88. /**
  89. * Handle for an entry in our delay list.
  90. */
  91. struct GSF_DelayedHandle
  92. {
  93. /**
  94. * Kept in a doubly-linked list.
  95. */
  96. struct GSF_DelayedHandle *next;
  97. /**
  98. * Kept in a doubly-linked list.
  99. */
  100. struct GSF_DelayedHandle *prev;
  101. /**
  102. * Peer this transmission belongs to.
  103. */
  104. struct GSF_ConnectedPeer *cp;
  105. /**
  106. * Envelope of the message that was delayed.
  107. */
  108. struct GNUNET_MQ_Envelope *env;
  109. /**
  110. * Task for the delay.
  111. */
  112. struct GNUNET_SCHEDULER_Task *delay_task;
  113. /**
  114. * Size of the message.
  115. */
  116. size_t msize;
  117. };
  118. /**
  119. * Information per peer and request.
  120. */
  121. struct PeerRequest
  122. {
  123. /**
  124. * Handle to generic request (generic: from peer or local client).
  125. */
  126. struct GSF_PendingRequest *pr;
  127. /**
  128. * Which specific peer issued this request?
  129. */
  130. struct GSF_ConnectedPeer *cp;
  131. /**
  132. * Task for asynchronous stopping of this request.
  133. */
  134. struct GNUNET_SCHEDULER_Task *kill_task;
  135. };
  136. /**
  137. * A connected peer.
  138. */
  139. struct GSF_ConnectedPeer
  140. {
  141. /**
  142. * Performance data for this peer.
  143. */
  144. struct GSF_PeerPerformanceData ppd;
  145. /**
  146. * Time until when we blocked this peer from migrating
  147. * data to us.
  148. */
  149. struct GNUNET_TIME_Absolute last_migration_block;
  150. /**
  151. * Task scheduled to revive migration to this peer.
  152. */
  153. struct GNUNET_SCHEDULER_Task *mig_revive_task;
  154. /**
  155. * Messages (replies, queries, content migration) we would like to
  156. * send to this peer in the near future. Sorted by priority, head.
  157. */
  158. struct GSF_PeerTransmitHandle *pth_head;
  159. /**
  160. * Messages (replies, queries, content migration) we would like to
  161. * send to this peer in the near future. Sorted by priority, tail.
  162. */
  163. struct GSF_PeerTransmitHandle *pth_tail;
  164. /**
  165. * Messages (replies, queries, content migration) we would like to
  166. * send to this peer in the near future. Sorted by priority, head.
  167. */
  168. struct GSF_DelayedHandle *delayed_head;
  169. /**
  170. * Messages (replies, queries, content migration) we would like to
  171. * send to this peer in the near future. Sorted by priority, tail.
  172. */
  173. struct GSF_DelayedHandle *delayed_tail;
  174. /**
  175. * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL).
  176. */
  177. struct GNUNET_ATS_ReservationContext *rc;
  178. /**
  179. * Task scheduled if we need to retry bandwidth reservation later.
  180. */
  181. struct GNUNET_SCHEDULER_Task *rc_delay_task;
  182. /**
  183. * Active requests from this neighbour, map of query to `struct PeerRequest`.
  184. */
  185. struct GNUNET_CONTAINER_MultiHashMap *request_map;
  186. /**
  187. * Handle for an active request for transmission to this
  188. * peer.
  189. */
  190. struct GNUNET_MQ_Handle *mq;
  191. /**
  192. * Increase in traffic preference still to be submitted
  193. * to the core service for this peer.
  194. */
  195. uint64_t inc_preference;
  196. /**
  197. * Number of entries in @e delayed_head DLL.
  198. */
  199. unsigned int delay_queue_size;
  200. /**
  201. * Respect rating for this peer on disk.
  202. */
  203. uint32_t disk_respect;
  204. /**
  205. * Which offset in @e last_p2p_replies will be updated next?
  206. * (we go round-robin).
  207. */
  208. unsigned int last_p2p_replies_woff;
  209. /**
  210. * Which offset in @e last_client_replies will be updated next?
  211. * (we go round-robin).
  212. */
  213. unsigned int last_client_replies_woff;
  214. /**
  215. * Current offset into @e last_request_times ring buffer.
  216. */
  217. unsigned int last_request_times_off;
  218. /**
  219. * #GNUNET_YES if we did successfully reserve 32k bandwidth,
  220. * #GNUNET_NO if not.
  221. */
  222. int did_reserve;
  223. /**
  224. * Handle to the PEERSTORE iterate request for peer respect value
  225. */
  226. struct GNUNET_PEERSTORE_IterateContext *respect_iterate_req;
  227. };
  228. /**
  229. * Map from peer identities to `struct GSF_ConnectPeer` entries.
  230. */
  231. static struct GNUNET_CONTAINER_MultiPeerMap *cp_map;
  232. /**
  233. * Handle to peerstore service.
  234. */
  235. static struct GNUNET_PEERSTORE_Handle *peerstore;
  236. /**
  237. * Task used to flush respect values to disk.
  238. */
  239. static struct GNUNET_SCHEDULER_Task *fr_task;
  240. /**
  241. * Update the latency information kept for the given peer.
  242. *
  243. * @param id peer record to update
  244. * @param latency current latency value
  245. */
  246. void
  247. GSF_update_peer_latency_ (const struct GNUNET_PeerIdentity *id,
  248. struct GNUNET_TIME_Relative latency)
  249. {
  250. struct GSF_ConnectedPeer *cp;
  251. cp = GSF_peer_get_ (id);
  252. if (NULL == cp)
  253. return; /* we're not yet connected at the core level, ignore */
  254. GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay,
  255. latency);
  256. }
  257. /**
  258. * Return the performance data record for the given peer
  259. *
  260. * @param cp peer to query
  261. * @return performance data record for the peer
  262. */
  263. struct GSF_PeerPerformanceData *
  264. GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
  265. {
  266. return &cp->ppd;
  267. }
  268. /**
  269. * Core is ready to transmit to a peer, get the message.
  270. *
  271. * @param cp which peer to send a message to
  272. */
  273. static void
  274. peer_transmit (struct GSF_ConnectedPeer *cp);
  275. /**
  276. * Function called by core upon success or failure of our bandwidth reservation request.
  277. *
  278. * @param cls the `struct GSF_ConnectedPeer` of the peer for which we made the request
  279. * @param peer identifies the peer
  280. * @param amount set to the amount that was actually reserved or unreserved;
  281. * either the full requested amount or zero (no partial reservations)
  282. * @param res_delay if the reservation could not be satisfied (amount was 0), how
  283. * long should the client wait until re-trying?
  284. */
  285. static void
  286. ats_reserve_callback (void *cls,
  287. const struct GNUNET_PeerIdentity *peer,
  288. int32_t amount,
  289. struct GNUNET_TIME_Relative res_delay);
  290. /**
  291. * If ready (bandwidth reserved), try to schedule transmission via
  292. * core for the given handle.
  293. *
  294. * @param pth transmission handle to schedule
  295. */
  296. static void
  297. schedule_transmission (struct GSF_PeerTransmitHandle *pth)
  298. {
  299. struct GSF_ConnectedPeer *cp;
  300. struct GNUNET_PeerIdentity target;
  301. cp = pth->cp;
  302. GNUNET_assert (0 != cp->ppd.pid);
  303. GNUNET_PEER_resolve (cp->ppd.pid, &target);
  304. if (0 != cp->inc_preference)
  305. {
  306. GNUNET_ATS_performance_change_preference (GSF_ats,
  307. &target,
  308. GNUNET_ATS_PREFERENCE_BANDWIDTH,
  309. (double) cp->inc_preference,
  310. GNUNET_ATS_PREFERENCE_END);
  311. cp->inc_preference = 0;
  312. }
  313. if ((GNUNET_YES == pth->is_query) &&
  314. (GNUNET_YES != pth->was_reserved))
  315. {
  316. /* query, need reservation */
  317. if (GNUNET_YES != cp->did_reserve)
  318. return; /* not ready */
  319. cp->did_reserve = GNUNET_NO;
  320. /* reservation already done! */
  321. pth->was_reserved = GNUNET_YES;
  322. cp->rc = GNUNET_ATS_reserve_bandwidth (GSF_ats,
  323. &target,
  324. DBLOCK_SIZE,
  325. &ats_reserve_callback,
  326. cp);
  327. return;
  328. }
  329. peer_transmit (cp);
  330. }
  331. /**
  332. * Core is ready to transmit to a peer, get the message.
  333. *
  334. * @param cp which peer to send a message to
  335. */
  336. static void
  337. peer_transmit (struct GSF_ConnectedPeer *cp)
  338. {
  339. struct GSF_PeerTransmitHandle *pth = cp->pth_head;
  340. struct GSF_PeerTransmitHandle *pos;
  341. if (NULL == pth)
  342. return;
  343. GNUNET_CONTAINER_DLL_remove (cp->pth_head,
  344. cp->pth_tail,
  345. pth);
  346. if (GNUNET_YES == pth->is_query)
  347. {
  348. cp->ppd.last_request_times[(cp->last_request_times_off++)
  349. % MAX_QUEUE_PER_PEER] =
  350. GNUNET_TIME_absolute_get ();
  351. GNUNET_assert (0 < cp->ppd.pending_queries--);
  352. }
  353. else if (GNUNET_NO == pth->is_query)
  354. {
  355. GNUNET_assert (0 < cp->ppd.pending_replies--);
  356. }
  357. GNUNET_LOAD_update (cp->ppd.transmission_delay,
  358. GNUNET_TIME_absolute_get_duration
  359. (pth->transmission_request_start_time).rel_value_us);
  360. GNUNET_MQ_send (cp->mq,
  361. pth->env);
  362. GNUNET_free (pth);
  363. if (NULL != (pos = cp->pth_head))
  364. {
  365. GNUNET_assert (pos != pth);
  366. schedule_transmission (pos);
  367. }
  368. }
  369. /**
  370. * (re)try to reserve bandwidth from the given peer.
  371. *
  372. * @param cls the `struct GSF_ConnectedPeer` to reserve from
  373. */
  374. static void
  375. retry_reservation (void *cls)
  376. {
  377. struct GSF_ConnectedPeer *cp = cls;
  378. struct GNUNET_PeerIdentity target;
  379. GNUNET_PEER_resolve (cp->ppd.pid, &target);
  380. cp->rc_delay_task = NULL;
  381. cp->rc =
  382. GNUNET_ATS_reserve_bandwidth (GSF_ats,
  383. &target,
  384. DBLOCK_SIZE,
  385. &ats_reserve_callback, cp);
  386. }
  387. /**
  388. * Function called by core upon success or failure of our bandwidth reservation request.
  389. *
  390. * @param cls the `struct GSF_ConnectedPeer` of the peer for which we made the request
  391. * @param peer identifies the peer
  392. * @param amount set to the amount that was actually reserved or unreserved;
  393. * either the full requested amount or zero (no partial reservations)
  394. * @param res_delay if the reservation could not be satisfied (amount was 0), how
  395. * long should the client wait until re-trying?
  396. */
  397. static void
  398. ats_reserve_callback (void *cls,
  399. const struct GNUNET_PeerIdentity *peer,
  400. int32_t amount,
  401. struct GNUNET_TIME_Relative res_delay)
  402. {
  403. struct GSF_ConnectedPeer *cp = cls;
  404. struct GSF_PeerTransmitHandle *pth;
  405. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  406. "Reserved %d bytes / need to wait %s for reservation\n",
  407. (int) amount,
  408. GNUNET_STRINGS_relative_time_to_string (res_delay, GNUNET_YES));
  409. cp->rc = NULL;
  410. if (0 == amount)
  411. {
  412. cp->rc_delay_task =
  413. GNUNET_SCHEDULER_add_delayed (res_delay,
  414. &retry_reservation,
  415. cp);
  416. return;
  417. }
  418. cp->did_reserve = GNUNET_YES;
  419. pth = cp->pth_head;
  420. if (NULL != pth)
  421. {
  422. /* reservation success, try transmission now! */
  423. peer_transmit (cp);
  424. }
  425. }
  426. /**
  427. * Function called by PEERSTORE with peer respect record
  428. *
  429. * @param cls handle to connected peer entry
  430. * @param record peerstore record information
  431. * @param emsg error message, or NULL if no errors
  432. */
  433. static void
  434. peer_respect_cb (void *cls,
  435. const struct GNUNET_PEERSTORE_Record *record,
  436. const char *emsg)
  437. {
  438. struct GSF_ConnectedPeer *cp = cls;
  439. GNUNET_assert (NULL != cp->respect_iterate_req);
  440. if ((NULL != record) &&
  441. (sizeof(cp->disk_respect) == record->value_size))
  442. {
  443. cp->disk_respect = *((uint32_t *) record->value);
  444. cp->ppd.respect += *((uint32_t *) record->value);
  445. }
  446. GSF_push_start_ (cp);
  447. if (NULL != record)
  448. GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
  449. cp->respect_iterate_req = NULL;
  450. }
  451. /**
  452. * Function called for each pending request whenever a new
  453. * peer connects, giving us a chance to decide about submitting
  454. * the existing request to the new peer.
  455. *
  456. * @param cls the `struct GSF_ConnectedPeer` of the new peer
  457. * @param key query for the request
  458. * @param pr handle to the pending request
  459. * @return #GNUNET_YES to continue to iterate
  460. */
  461. static int
  462. consider_peer_for_forwarding (void *cls,
  463. const struct GNUNET_HashCode *key,
  464. struct GSF_PendingRequest *pr)
  465. {
  466. struct GSF_ConnectedPeer *cp = cls;
  467. struct GNUNET_PeerIdentity pid;
  468. if (GNUNET_YES !=
  469. GSF_pending_request_test_active_ (pr))
  470. return GNUNET_YES; /* request is not actually active, skip! */
  471. GSF_connected_peer_get_identity_ (cp, &pid);
  472. if (GNUNET_YES !=
  473. GSF_pending_request_test_target_ (pr, &pid))
  474. {
  475. GNUNET_STATISTICS_update (GSF_stats,
  476. gettext_noop ("# Loopback routes suppressed"),
  477. 1,
  478. GNUNET_NO);
  479. return GNUNET_YES;
  480. }
  481. GSF_plan_add_ (cp, pr);
  482. return GNUNET_YES;
  483. }
  484. /**
  485. * A peer connected to us. Setup the connected peer
  486. * records.
  487. *
  488. * @param cls NULL
  489. * @param peer identity of peer that connected
  490. * @param mq message queue for talking to @a peer
  491. * @return our internal handle for the peer
  492. */
  493. void *
  494. GSF_peer_connect_handler (void *cls,
  495. const struct GNUNET_PeerIdentity *peer,
  496. struct GNUNET_MQ_Handle *mq)
  497. {
  498. struct GSF_ConnectedPeer *cp;
  499. if (0 ==
  500. GNUNET_memcmp (&GSF_my_id,
  501. peer))
  502. return NULL;
  503. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  504. "Connected to peer %s\n",
  505. GNUNET_i2s (peer));
  506. cp = GNUNET_new (struct GSF_ConnectedPeer);
  507. cp->ppd.pid = GNUNET_PEER_intern (peer);
  508. cp->ppd.peer = peer;
  509. cp->mq = mq;
  510. cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
  511. cp->rc =
  512. GNUNET_ATS_reserve_bandwidth (GSF_ats,
  513. peer,
  514. DBLOCK_SIZE,
  515. &ats_reserve_callback, cp);
  516. cp->request_map = GNUNET_CONTAINER_multihashmap_create (128,
  517. GNUNET_YES);
  518. GNUNET_break (GNUNET_OK ==
  519. GNUNET_CONTAINER_multipeermap_put (cp_map,
  520. GSF_connected_peer_get_identity2_ (
  521. cp),
  522. cp,
  523. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
  524. GNUNET_STATISTICS_set (GSF_stats,
  525. gettext_noop ("# peers connected"),
  526. GNUNET_CONTAINER_multipeermap_size (cp_map),
  527. GNUNET_NO);
  528. cp->respect_iterate_req
  529. = GNUNET_PEERSTORE_iterate (peerstore,
  530. "fs",
  531. peer,
  532. "respect",
  533. &peer_respect_cb,
  534. cp);
  535. GSF_iterate_pending_requests_ (&consider_peer_for_forwarding,
  536. cp);
  537. return cp;
  538. }
  539. /**
  540. * It may be time to re-start migrating content to this
  541. * peer. Check, and if so, restart migration.
  542. *
  543. * @param cls the `struct GSF_ConnectedPeer`
  544. */
  545. static void
  546. revive_migration (void *cls)
  547. {
  548. struct GSF_ConnectedPeer *cp = cls;
  549. struct GNUNET_TIME_Relative bt;
  550. cp->mig_revive_task = NULL;
  551. bt = GNUNET_TIME_absolute_get_remaining (cp->ppd.migration_blocked_until);
  552. if (0 != bt.rel_value_us)
  553. {
  554. /* still time left... */
  555. cp->mig_revive_task =
  556. GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
  557. return;
  558. }
  559. GSF_push_start_ (cp);
  560. }
  561. /**
  562. * Get a handle for a connected peer.
  563. *
  564. * @param peer peer's identity
  565. * @return NULL if the peer is not currently connected
  566. */
  567. struct GSF_ConnectedPeer *
  568. GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer)
  569. {
  570. if (NULL == cp_map)
  571. return NULL;
  572. return GNUNET_CONTAINER_multipeermap_get (cp_map, peer);
  573. }
  574. /**
  575. * Handle P2P #GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP message.
  576. *
  577. * @param cls closure, the `struct GSF_ConnectedPeer`
  578. * @param msm the actual message
  579. */
  580. void
  581. handle_p2p_migration_stop (void *cls,
  582. const struct MigrationStopMessage *msm)
  583. {
  584. struct GSF_ConnectedPeer *cp = cls;
  585. struct GNUNET_TIME_Relative bt;
  586. GNUNET_STATISTICS_update (GSF_stats,
  587. gettext_noop ("# migration stop messages received"),
  588. 1, GNUNET_NO);
  589. bt = GNUNET_TIME_relative_ntoh (msm->duration);
  590. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  591. _ ("Migration of content to peer `%s' blocked for %s\n"),
  592. GNUNET_i2s (cp->ppd.peer),
  593. GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES));
  594. cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
  595. if ((NULL == cp->mig_revive_task) &&
  596. (NULL == cp->respect_iterate_req))
  597. {
  598. GSF_push_stop_ (cp);
  599. cp->mig_revive_task =
  600. GNUNET_SCHEDULER_add_delayed (bt,
  601. &revive_migration, cp);
  602. }
  603. }
  604. /**
  605. * Free resources associated with the given peer request.
  606. *
  607. * @param peerreq request to free
  608. */
  609. static void
  610. free_pending_request (struct PeerRequest *peerreq)
  611. {
  612. struct GSF_ConnectedPeer *cp = peerreq->cp;
  613. struct GSF_PendingRequestData *prd;
  614. prd = GSF_pending_request_get_data_ (peerreq->pr);
  615. if (NULL != peerreq->kill_task)
  616. {
  617. GNUNET_SCHEDULER_cancel (peerreq->kill_task);
  618. peerreq->kill_task = NULL;
  619. }
  620. GNUNET_STATISTICS_update (GSF_stats,
  621. gettext_noop ("# P2P searches active"),
  622. -1,
  623. GNUNET_NO);
  624. GNUNET_break (GNUNET_YES ==
  625. GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
  626. &prd->query,
  627. peerreq));
  628. GNUNET_free (peerreq);
  629. }
  630. /**
  631. * Cancel all requests associated with the peer.
  632. *
  633. * @param cls unused
  634. * @param query hash code of the request
  635. * @param value the `struct GSF_PendingRequest`
  636. * @return #GNUNET_YES (continue to iterate)
  637. */
  638. static int
  639. cancel_pending_request (void *cls,
  640. const struct GNUNET_HashCode *query,
  641. void *value)
  642. {
  643. struct PeerRequest *peerreq = value;
  644. struct GSF_PendingRequest *pr = peerreq->pr;
  645. free_pending_request (peerreq);
  646. GSF_pending_request_cancel_ (pr,
  647. GNUNET_NO);
  648. return GNUNET_OK;
  649. }
  650. /**
  651. * Free the given request.
  652. *
  653. * @param cls the request to free
  654. */
  655. static void
  656. peer_request_destroy (void *cls)
  657. {
  658. struct PeerRequest *peerreq = cls;
  659. struct GSF_PendingRequest *pr = peerreq->pr;
  660. struct GSF_PendingRequestData *prd;
  661. peerreq->kill_task = NULL;
  662. prd = GSF_pending_request_get_data_ (pr);
  663. cancel_pending_request (NULL,
  664. &prd->query,
  665. peerreq);
  666. }
  667. /**
  668. * The artificial delay is over, transmit the message now.
  669. *
  670. * @param cls the `struct GSF_DelayedHandle` with the message
  671. */
  672. static void
  673. transmit_delayed_now (void *cls)
  674. {
  675. struct GSF_DelayedHandle *dh = cls;
  676. struct GSF_ConnectedPeer *cp = dh->cp;
  677. GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
  678. cp->delayed_tail,
  679. dh);
  680. cp->delay_queue_size--;
  681. GSF_peer_transmit_ (cp,
  682. GNUNET_NO,
  683. UINT32_MAX,
  684. dh->env);
  685. GNUNET_free (dh);
  686. }
  687. /**
  688. * Get the randomized delay a response should be subjected to.
  689. *
  690. * @return desired delay
  691. */
  692. static struct GNUNET_TIME_Relative
  693. get_randomized_delay ()
  694. {
  695. struct GNUNET_TIME_Relative ret;
  696. ret =
  697. GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
  698. GNUNET_CRYPTO_random_u32
  699. (GNUNET_CRYPTO_QUALITY_WEAK,
  700. 2 * GSF_avg_latency.rel_value_us + 1));
  701. #if INSANE_STATISTICS
  702. GNUNET_STATISTICS_update (GSF_stats,
  703. gettext_noop
  704. ("# artificial delays introduced (ms)"),
  705. ret.rel_value_us / 1000LL, GNUNET_NO);
  706. #endif
  707. return ret;
  708. }
  709. /**
  710. * Handle a reply to a pending request. Also called if a request
  711. * expires (then with data == NULL). The handler may be called
  712. * many times (depending on the request type), but will not be
  713. * called during or after a call to GSF_pending_request_cancel
  714. * and will also not be called anymore after a call signalling
  715. * expiration.
  716. *
  717. * @param cls `struct PeerRequest` this is an answer for
  718. * @param eval evaluation of the result
  719. * @param pr handle to the original pending request
  720. * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown"
  721. * @param expiration when does @a data expire?
  722. * @param last_transmission when did we last transmit a request for this block
  723. * @param type type of the block
  724. * @param data response data, NULL on request expiration
  725. * @param data_len number of bytes in @a data
  726. */
  727. static void
  728. handle_p2p_reply (void *cls,
  729. enum GNUNET_BLOCK_EvaluationResult eval,
  730. struct GSF_PendingRequest *pr,
  731. uint32_t reply_anonymity_level,
  732. struct GNUNET_TIME_Absolute expiration,
  733. struct GNUNET_TIME_Absolute last_transmission,
  734. enum GNUNET_BLOCK_Type type,
  735. const void *data,
  736. size_t data_len)
  737. {
  738. struct PeerRequest *peerreq = cls;
  739. struct GSF_ConnectedPeer *cp = peerreq->cp;
  740. struct GSF_PendingRequestData *prd;
  741. struct GNUNET_MQ_Envelope *env;
  742. struct PutMessage *pm;
  743. size_t msize;
  744. GNUNET_assert (data_len + sizeof(struct PutMessage) <
  745. GNUNET_MAX_MESSAGE_SIZE);
  746. GNUNET_assert (peerreq->pr == pr);
  747. prd = GSF_pending_request_get_data_ (pr);
  748. if (NULL == data)
  749. {
  750. free_pending_request (peerreq);
  751. return;
  752. }
  753. GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
  754. if ((prd->type != type) && (GNUNET_BLOCK_TYPE_ANY != prd->type))
  755. {
  756. GNUNET_STATISTICS_update (GSF_stats,
  757. gettext_noop
  758. ("# replies dropped due to type mismatch"),
  759. 1, GNUNET_NO);
  760. return;
  761. }
  762. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  763. "Transmitting result for query `%s' to peer\n",
  764. GNUNET_h2s (&prd->query));
  765. GNUNET_STATISTICS_update (GSF_stats,
  766. gettext_noop ("# replies received for other peers"),
  767. 1, GNUNET_NO);
  768. msize = sizeof(struct PutMessage) + data_len;
  769. if (msize >= GNUNET_MAX_MESSAGE_SIZE)
  770. {
  771. GNUNET_break (0);
  772. return;
  773. }
  774. if ((UINT32_MAX != reply_anonymity_level) && (reply_anonymity_level > 1))
  775. {
  776. if (reply_anonymity_level - 1 > GSF_cover_content_count)
  777. {
  778. GNUNET_STATISTICS_update (GSF_stats,
  779. gettext_noop
  780. (
  781. "# replies dropped due to insufficient cover traffic"),
  782. 1, GNUNET_NO);
  783. return;
  784. }
  785. GSF_cover_content_count -= (reply_anonymity_level - 1);
  786. }
  787. env = GNUNET_MQ_msg_extra (pm,
  788. data_len,
  789. GNUNET_MESSAGE_TYPE_FS_PUT);
  790. pm->type = htonl (type);
  791. pm->expiration = GNUNET_TIME_absolute_hton (expiration);
  792. GNUNET_memcpy (&pm[1],
  793. data,
  794. data_len);
  795. if ((UINT32_MAX != reply_anonymity_level) &&
  796. (0 != reply_anonymity_level) &&
  797. (GNUNET_YES == GSF_enable_randomized_delays))
  798. {
  799. struct GSF_DelayedHandle *dh;
  800. dh = GNUNET_new (struct GSF_DelayedHandle);
  801. dh->cp = cp;
  802. dh->env = env;
  803. dh->msize = msize;
  804. GNUNET_CONTAINER_DLL_insert (cp->delayed_head,
  805. cp->delayed_tail,
  806. dh);
  807. cp->delay_queue_size++;
  808. dh->delay_task =
  809. GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
  810. &transmit_delayed_now,
  811. dh);
  812. }
  813. else
  814. {
  815. GSF_peer_transmit_ (cp,
  816. GNUNET_NO,
  817. UINT32_MAX,
  818. env);
  819. }
  820. if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval)
  821. return;
  822. if (NULL == peerreq->kill_task)
  823. {
  824. GNUNET_STATISTICS_update (GSF_stats,
  825. gettext_noop
  826. (
  827. "# P2P searches destroyed due to ultimate reply"),
  828. 1,
  829. GNUNET_NO);
  830. peerreq->kill_task =
  831. GNUNET_SCHEDULER_add_now (&peer_request_destroy,
  832. peerreq);
  833. }
  834. }
  835. /**
  836. * Increase the peer's respect by a value.
  837. *
  838. * @param cp which peer to change the respect value on
  839. * @param value is the int value by which the
  840. * peer's credit is to be increased or decreased
  841. * @returns the actual change in respect (positive or negative)
  842. */
  843. static int
  844. change_peer_respect (struct GSF_ConnectedPeer *cp, int value)
  845. {
  846. if (0 == value)
  847. return 0;
  848. GNUNET_assert (NULL != cp);
  849. if (value > 0)
  850. {
  851. if (cp->ppd.respect + value < cp->ppd.respect)
  852. {
  853. value = UINT32_MAX - cp->ppd.respect;
  854. cp->ppd.respect = UINT32_MAX;
  855. }
  856. else
  857. cp->ppd.respect += value;
  858. }
  859. else
  860. {
  861. if (cp->ppd.respect < -value)
  862. {
  863. value = -cp->ppd.respect;
  864. cp->ppd.respect = 0;
  865. }
  866. else
  867. cp->ppd.respect += value;
  868. }
  869. return value;
  870. }
  871. /**
  872. * We've received a request with the specified priority. Bound it
  873. * according to how much we respect the given peer.
  874. *
  875. * @param prio_in requested priority
  876. * @param cp the peer making the request
  877. * @return effective priority
  878. */
  879. static int32_t
  880. bound_priority (uint32_t prio_in,
  881. struct GSF_ConnectedPeer *cp)
  882. {
  883. #define N ((double) 128.0)
  884. uint32_t ret;
  885. double rret;
  886. int ld;
  887. ld = GSF_test_get_load_too_high_ (0);
  888. if (GNUNET_SYSERR == ld)
  889. {
  890. #if INSANE_STATISTICS
  891. GNUNET_STATISTICS_update (GSF_stats,
  892. gettext_noop
  893. ("# requests done for free (low load)"), 1,
  894. GNUNET_NO);
  895. #endif
  896. return 0; /* excess resources */
  897. }
  898. if (prio_in > INT32_MAX)
  899. prio_in = INT32_MAX;
  900. ret = -change_peer_respect (cp, -(int) prio_in);
  901. if (ret > 0)
  902. {
  903. if (ret > GSF_current_priorities + N)
  904. rret = GSF_current_priorities + N;
  905. else
  906. rret = ret;
  907. GSF_current_priorities = (GSF_current_priorities * (N - 1) + rret) / N;
  908. }
  909. if ((GNUNET_YES == ld) && (ret > 0))
  910. {
  911. /* try with charging */
  912. ld = GSF_test_get_load_too_high_ (ret);
  913. }
  914. if (GNUNET_YES == ld)
  915. {
  916. GNUNET_STATISTICS_update (GSF_stats,
  917. gettext_noop
  918. ("# request dropped, priority insufficient"), 1,
  919. GNUNET_NO);
  920. /* undo charge */
  921. change_peer_respect (cp, (int) ret);
  922. return -1; /* not enough resources */
  923. }
  924. else
  925. {
  926. GNUNET_STATISTICS_update (GSF_stats,
  927. gettext_noop
  928. ("# requests done for a price (normal load)"),
  929. 1,
  930. GNUNET_NO);
  931. }
  932. #undef N
  933. return ret;
  934. }
  935. /**
  936. * The priority level imposes a bound on the maximum
  937. * value for the ttl that can be requested.
  938. *
  939. * @param ttl_in requested ttl
  940. * @param prio given priority
  941. * @return @a ttl_in if @a ttl_in is below the limit,
  942. * otherwise the ttl-limit for the given @a prio
  943. */
  944. static int32_t
  945. bound_ttl (int32_t ttl_in,
  946. uint32_t prio)
  947. {
  948. unsigned long long allowed;
  949. if (ttl_in <= 0)
  950. return ttl_in;
  951. allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
  952. if (ttl_in > allowed)
  953. {
  954. if (allowed >= (1 << 30))
  955. return 1 << 30;
  956. return allowed;
  957. }
  958. return ttl_in;
  959. }
  960. /**
  961. * Closure for #test_exist_cb().
  962. */
  963. struct TestExistClosure
  964. {
  965. /**
  966. * Priority of the incoming request.
  967. */
  968. int32_t priority;
  969. /**
  970. * Relative TTL of the incoming request.
  971. */
  972. int32_t ttl;
  973. /**
  974. * Type of the incoming request.
  975. */
  976. enum GNUNET_BLOCK_Type type;
  977. /**
  978. * Set to #GNUNET_YES if we are done handling the query.
  979. */
  980. int finished;
  981. };
  982. /**
  983. * Test if the query already exists. If so, merge it, otherwise
  984. * keep `finished` at #GNUNET_NO.
  985. *
  986. * @param cls our `struct TestExistClosure`
  987. * @param hc the key of the query
  988. * @param value the existing `struct PeerRequest`.
  989. * @return #GNUNET_YES to continue to iterate,
  990. * #GNUNET_NO if we successfully merged
  991. */
  992. static int
  993. test_exist_cb (void *cls,
  994. const struct GNUNET_HashCode *hc,
  995. void *value)
  996. {
  997. struct TestExistClosure *tec = cls;
  998. struct PeerRequest *peerreq = value;
  999. struct GSF_PendingRequest *pr;
  1000. struct GSF_PendingRequestData *prd;
  1001. pr = peerreq->pr;
  1002. prd = GSF_pending_request_get_data_ (pr);
  1003. if (prd->type != tec->type)
  1004. return GNUNET_YES;
  1005. if (prd->ttl.abs_value_us >=
  1006. GNUNET_TIME_absolute_get ().abs_value_us + tec->ttl * 1000LL)
  1007. {
  1008. /* existing request has higher TTL, drop new one! */
  1009. prd->priority += tec->priority;
  1010. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1011. "Have existing request with higher TTL, dropping new request.\n");
  1012. GNUNET_STATISTICS_update (GSF_stats,
  1013. gettext_noop
  1014. ("# requests dropped due to higher-TTL request"),
  1015. 1, GNUNET_NO);
  1016. tec->finished = GNUNET_YES;
  1017. return GNUNET_NO;
  1018. }
  1019. /* existing request has lower TTL, drop old one! */
  1020. tec->priority += prd->priority;
  1021. free_pending_request (peerreq);
  1022. GSF_pending_request_cancel_ (pr,
  1023. GNUNET_YES);
  1024. return GNUNET_NO;
  1025. }
  1026. /**
  1027. * Handle P2P "QUERY" message. Creates the pending request entry
  1028. * and sets up all of the data structures to that we will
  1029. * process replies properly. Does not initiate forwarding or
  1030. * local database lookups.
  1031. *
  1032. * @param cls the other peer involved (sender of the message)
  1033. * @param gm the GET message
  1034. */
  1035. void
  1036. handle_p2p_get (void *cls,
  1037. const struct GetMessage *gm)
  1038. {
  1039. struct GSF_ConnectedPeer *cps = cls;
  1040. struct PeerRequest *peerreq;
  1041. struct GSF_PendingRequest *pr;
  1042. struct GSF_ConnectedPeer *cp;
  1043. const struct GNUNET_PeerIdentity *target;
  1044. enum GSF_PendingRequestOptions options;
  1045. uint16_t msize;
  1046. unsigned int bits;
  1047. const struct GNUNET_PeerIdentity *opt;
  1048. uint32_t bm;
  1049. size_t bfsize;
  1050. uint32_t ttl_decrement;
  1051. struct TestExistClosure tec;
  1052. GNUNET_PEER_Id spid;
  1053. const struct GSF_PendingRequestData *prd;
  1054. msize = ntohs (gm->header.size);
  1055. tec.type = ntohl (gm->type);
  1056. bm = ntohl (gm->hash_bitmap);
  1057. bits = 0;
  1058. while (bm > 0)
  1059. {
  1060. if (1 == (bm & 1))
  1061. bits++;
  1062. bm >>= 1;
  1063. }
  1064. opt = (const struct GNUNET_PeerIdentity *) &gm[1];
  1065. bfsize = msize - sizeof(struct GetMessage) - bits * sizeof(struct
  1066. GNUNET_PeerIdentity);
  1067. GNUNET_STATISTICS_update (GSF_stats,
  1068. gettext_noop
  1069. ("# GET requests received (from other peers)"),
  1070. 1,
  1071. GNUNET_NO);
  1072. GSF_cover_query_count++;
  1073. bm = ntohl (gm->hash_bitmap);
  1074. bits = 0;
  1075. if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
  1076. cp = GSF_peer_get_ (&opt[bits++]);
  1077. else
  1078. cp = cps;
  1079. if (NULL == cp)
  1080. {
  1081. if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
  1082. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1083. "Failed to find RETURN-TO peer `%s' in connection set. Dropping query.\n",
  1084. GNUNET_i2s (&opt[bits - 1]));
  1085. else
  1086. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1087. "Failed to find peer `%s' in connection set. Dropping query.\n",
  1088. GNUNET_i2s (cps->ppd.peer));
  1089. GNUNET_STATISTICS_update (GSF_stats,
  1090. gettext_noop
  1091. (
  1092. "# requests dropped due to missing reverse route"),
  1093. 1,
  1094. GNUNET_NO);
  1095. return;
  1096. }
  1097. unsigned int queue_size = GNUNET_MQ_get_length (cp->mq);
  1098. queue_size += cp->ppd.pending_replies + cp->delay_queue_size;
  1099. if (queue_size > MAX_QUEUE_PER_PEER)
  1100. {
  1101. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1102. "Peer `%s' has too many replies queued already. Dropping query.\n",
  1103. GNUNET_i2s (cps->ppd.peer));
  1104. GNUNET_STATISTICS_update (GSF_stats,
  1105. gettext_noop (
  1106. "# requests dropped due to full reply queue"),
  1107. 1,
  1108. GNUNET_NO);
  1109. return;
  1110. }
  1111. /* note that we can really only check load here since otherwise
  1112. * peers could find out that we are overloaded by not being
  1113. * disconnected after sending us a malformed query... */
  1114. tec.priority = bound_priority (ntohl (gm->priority),
  1115. cps);
  1116. if (tec.priority < 0)
  1117. {
  1118. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1119. "Dropping query from `%s', this peer is too busy.\n",
  1120. GNUNET_i2s (cps->ppd.peer));
  1121. return;
  1122. }
  1123. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1124. "Received request for `%s' of type %u from peer `%s' with flags %u\n",
  1125. GNUNET_h2s (&gm->query),
  1126. (unsigned int) tec.type,
  1127. GNUNET_i2s (cps->ppd.peer),
  1128. (unsigned int) bm);
  1129. target =
  1130. (0 !=
  1131. (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? (&opt[bits++]) : NULL;
  1132. options = GSF_PRO_DEFAULTS;
  1133. spid = 0;
  1134. if ((GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1
  1135. + tec.priority))
  1136. || (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) >
  1137. GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value_us * 2
  1138. + GNUNET_LOAD_get_average (GSF_rt_entry_lifetime)))
  1139. {
  1140. /* don't have BW to send to peer, or would likely take longer than we have for it,
  1141. * so at best indirect the query */
  1142. tec.priority = 0;
  1143. options |= GSF_PRO_FORWARD_ONLY;
  1144. spid = GNUNET_PEER_intern (cps->ppd.peer);
  1145. GNUNET_assert (0 != spid);
  1146. }
  1147. tec.ttl = bound_ttl (ntohl (gm->ttl),
  1148. tec.priority);
  1149. /* decrement ttl (always) */
  1150. ttl_decrement =
  1151. 2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
  1152. TTL_DECREMENT);
  1153. if ((tec.ttl < 0) &&
  1154. (((int32_t) (tec.ttl - ttl_decrement)) > 0))
  1155. {
  1156. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1157. "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
  1158. GNUNET_i2s (cps->ppd.peer),
  1159. tec.ttl,
  1160. ttl_decrement);
  1161. GNUNET_STATISTICS_update (GSF_stats,
  1162. gettext_noop
  1163. ("# requests dropped due TTL underflow"), 1,
  1164. GNUNET_NO);
  1165. /* integer underflow => drop (should be very rare)! */
  1166. return;
  1167. }
  1168. tec.ttl -= ttl_decrement;
  1169. /* test if the request already exists */
  1170. tec.finished = GNUNET_NO;
  1171. GNUNET_CONTAINER_multihashmap_get_multiple (cp->request_map,
  1172. &gm->query,
  1173. &test_exist_cb,
  1174. &tec);
  1175. if (GNUNET_YES == tec.finished)
  1176. return; /* merged into existing request, we're done */
  1177. peerreq = GNUNET_new (struct PeerRequest);
  1178. peerreq->cp = cp;
  1179. pr = GSF_pending_request_create_ (options,
  1180. tec.type,
  1181. &gm->query,
  1182. target,
  1183. (bfsize > 0)
  1184. ? (const char *) &opt[bits]
  1185. : NULL,
  1186. bfsize,
  1187. ntohl (gm->filter_mutator),
  1188. 1 /* anonymity */,
  1189. (uint32_t) tec.priority,
  1190. tec.ttl,
  1191. spid,
  1192. GNUNET_PEER_intern (cps->ppd.peer),
  1193. NULL, 0, /* replies_seen */
  1194. &handle_p2p_reply,
  1195. peerreq);
  1196. GNUNET_assert (NULL != pr);
  1197. prd = GSF_pending_request_get_data_ (pr);
  1198. peerreq->pr = pr;
  1199. GNUNET_break (GNUNET_OK ==
  1200. GNUNET_CONTAINER_multihashmap_put (cp->request_map,
  1201. &prd->query,
  1202. peerreq,
  1203. GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
  1204. GNUNET_STATISTICS_update (GSF_stats,
  1205. gettext_noop (
  1206. "# P2P query messages received and processed"),
  1207. 1,
  1208. GNUNET_NO);
  1209. GNUNET_STATISTICS_update (GSF_stats,
  1210. gettext_noop ("# P2P searches active"),
  1211. 1,
  1212. GNUNET_NO);
  1213. GSF_pending_request_get_data_ (pr)->has_started = GNUNET_YES;
  1214. GSF_local_lookup_ (pr,
  1215. &GSF_consider_forwarding,
  1216. NULL);
  1217. }
  1218. /**
  1219. * Transmit a message to the given peer as soon as possible.
  1220. * If the peer disconnects before the transmission can happen,
  1221. * the callback is invoked with a `NULL` @a buffer.
  1222. *
  1223. * @param cp target peer
  1224. * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR)
  1225. * @param priority how important is this request?
  1226. * @param timeout when does this request timeout
  1227. * @param size number of bytes we would like to send to the peer
  1228. * @param env message to send
  1229. */
  1230. void
  1231. GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
  1232. int is_query,
  1233. uint32_t priority,
  1234. struct GNUNET_MQ_Envelope *env)
  1235. {
  1236. struct GSF_PeerTransmitHandle *pth;
  1237. struct GSF_PeerTransmitHandle *pos;
  1238. struct GSF_PeerTransmitHandle *prev;
  1239. pth = GNUNET_new (struct GSF_PeerTransmitHandle);
  1240. pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
  1241. pth->env = env;
  1242. pth->is_query = is_query;
  1243. pth->priority = priority;
  1244. pth->cp = cp;
  1245. /* insertion sort (by priority, descending) */
  1246. prev = NULL;
  1247. pos = cp->pth_head;
  1248. while ((NULL != pos) && (pos->priority > priority))
  1249. {
  1250. prev = pos;
  1251. pos = pos->next;
  1252. }
  1253. GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
  1254. cp->pth_tail,
  1255. prev,
  1256. pth);
  1257. if (GNUNET_YES == is_query)
  1258. cp->ppd.pending_queries++;
  1259. else if (GNUNET_NO == is_query)
  1260. cp->ppd.pending_replies++;
  1261. schedule_transmission (pth);
  1262. }
  1263. /**
  1264. * Report on receiving a reply; update the performance record of the given peer.
  1265. *
  1266. * @param cp responding peer (will be updated)
  1267. * @param request_time time at which the original query was transmitted
  1268. * @param request_priority priority of the original request
  1269. */
  1270. void
  1271. GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
  1272. struct GNUNET_TIME_Absolute request_time,
  1273. uint32_t request_priority)
  1274. {
  1275. struct GNUNET_TIME_Relative delay;
  1276. delay = GNUNET_TIME_absolute_get_duration (request_time);
  1277. cp->ppd.avg_reply_delay.rel_value_us =
  1278. (cp->ppd.avg_reply_delay.rel_value_us * (RUNAVG_DELAY_N - 1)
  1279. + delay.rel_value_us) / RUNAVG_DELAY_N;
  1280. cp->ppd.avg_priority =
  1281. (cp->ppd.avg_priority * (RUNAVG_DELAY_N - 1)
  1282. + request_priority) / RUNAVG_DELAY_N;
  1283. }
  1284. /**
  1285. * Report on receiving a reply in response to an initiating client.
  1286. * Remember that this peer is good for this client.
  1287. *
  1288. * @param cp responding peer (will be updated)
  1289. * @param initiator_client local client on responsible for query
  1290. */
  1291. void
  1292. GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
  1293. struct GSF_LocalClient *initiator_client)
  1294. {
  1295. cp->ppd.last_client_replies[cp->last_client_replies_woff++
  1296. % CS2P_SUCCESS_LIST_SIZE] = initiator_client;
  1297. }
  1298. /**
  1299. * Report on receiving a reply in response to an initiating peer.
  1300. * Remember that this peer is good for this initiating peer.
  1301. *
  1302. * @param cp responding peer (will be updated)
  1303. * @param initiator_peer other peer responsible for query
  1304. */
  1305. void
  1306. GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
  1307. const struct GSF_ConnectedPeer *initiator_peer)
  1308. {
  1309. unsigned int woff;
  1310. woff = cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE;
  1311. GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[woff], -1);
  1312. cp->ppd.last_p2p_replies[woff] = initiator_peer->ppd.pid;
  1313. GNUNET_PEER_change_rc (initiator_peer->ppd.pid, 1);
  1314. cp->last_p2p_replies_woff = (woff + 1) % P2P_SUCCESS_LIST_SIZE;
  1315. }
  1316. /**
  1317. * Write peer-respect information to a file - flush the buffer entry!
  1318. *
  1319. * @param cls unused
  1320. * @param key peer identity
  1321. * @param value the `struct GSF_ConnectedPeer` to flush
  1322. * @return #GNUNET_OK to continue iteration
  1323. */
  1324. static int
  1325. flush_respect (void *cls,
  1326. const struct GNUNET_PeerIdentity *key,
  1327. void *value)
  1328. {
  1329. struct GSF_ConnectedPeer *cp = value;
  1330. struct GNUNET_PeerIdentity pid;
  1331. if (cp->ppd.respect == cp->disk_respect)
  1332. return GNUNET_OK; /* unchanged */
  1333. GNUNET_assert (0 != cp->ppd.pid);
  1334. GNUNET_PEER_resolve (cp->ppd.pid, &pid);
  1335. GNUNET_PEERSTORE_store (peerstore, "fs", &pid, "respect", &cp->ppd.respect,
  1336. sizeof(cp->ppd.respect),
  1337. GNUNET_TIME_UNIT_FOREVER_ABS,
  1338. GNUNET_PEERSTORE_STOREOPTION_REPLACE,
  1339. NULL,
  1340. NULL);
  1341. return GNUNET_OK;
  1342. }
  1343. /**
  1344. * A peer disconnected from us. Tear down the connected peer
  1345. * record.
  1346. *
  1347. * @param cls unused
  1348. * @param peer identity of peer that disconnected
  1349. * @param internal_cls the corresponding `struct GSF_ConnectedPeer`
  1350. */
  1351. void
  1352. GSF_peer_disconnect_handler (void *cls,
  1353. const struct GNUNET_PeerIdentity *peer,
  1354. void *internal_cls)
  1355. {
  1356. struct GSF_ConnectedPeer *cp = internal_cls;
  1357. struct GSF_PeerTransmitHandle *pth;
  1358. struct GSF_DelayedHandle *dh;
  1359. if (NULL == cp)
  1360. return; /* must have been disconnect from core with
  1361. * 'peer' == my_id, ignore */
  1362. flush_respect (NULL,
  1363. peer,
  1364. cp);
  1365. GNUNET_assert (GNUNET_YES ==
  1366. GNUNET_CONTAINER_multipeermap_remove (cp_map,
  1367. peer,
  1368. cp));
  1369. GNUNET_STATISTICS_set (GSF_stats,
  1370. gettext_noop ("# peers connected"),
  1371. GNUNET_CONTAINER_multipeermap_size (cp_map),
  1372. GNUNET_NO);
  1373. if (NULL != cp->respect_iterate_req)
  1374. {
  1375. GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
  1376. cp->respect_iterate_req = NULL;
  1377. }
  1378. if (NULL != cp->rc)
  1379. {
  1380. GNUNET_ATS_reserve_bandwidth_cancel (cp->rc);
  1381. cp->rc = NULL;
  1382. }
  1383. if (NULL != cp->rc_delay_task)
  1384. {
  1385. GNUNET_SCHEDULER_cancel (cp->rc_delay_task);
  1386. cp->rc_delay_task = NULL;
  1387. }
  1388. GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
  1389. &cancel_pending_request,
  1390. cp);
  1391. GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
  1392. cp->request_map = NULL;
  1393. GSF_plan_notify_peer_disconnect_ (cp);
  1394. GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
  1395. GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies,
  1396. P2P_SUCCESS_LIST_SIZE);
  1397. memset (cp->ppd.last_p2p_replies,
  1398. 0,
  1399. sizeof(cp->ppd.last_p2p_replies));
  1400. GSF_push_stop_ (cp);
  1401. while (NULL != (pth = cp->pth_head))
  1402. {
  1403. GNUNET_CONTAINER_DLL_remove (cp->pth_head,
  1404. cp->pth_tail,
  1405. pth);
  1406. if (GNUNET_YES == pth->is_query)
  1407. GNUNET_assert (0 < cp->ppd.pending_queries--);
  1408. else if (GNUNET_NO == pth->is_query)
  1409. GNUNET_assert (0 < cp->ppd.pending_replies--);
  1410. GNUNET_free (pth);
  1411. }
  1412. while (NULL != (dh = cp->delayed_head))
  1413. {
  1414. GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
  1415. cp->delayed_tail,
  1416. dh);
  1417. GNUNET_MQ_discard (dh->env);
  1418. cp->delay_queue_size--;
  1419. GNUNET_SCHEDULER_cancel (dh->delay_task);
  1420. GNUNET_free (dh);
  1421. }
  1422. GNUNET_PEER_change_rc (cp->ppd.pid, -1);
  1423. if (NULL != cp->mig_revive_task)
  1424. {
  1425. GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
  1426. cp->mig_revive_task = NULL;
  1427. }
  1428. GNUNET_break (0 == cp->ppd.pending_queries);
  1429. GNUNET_break (0 == cp->ppd.pending_replies);
  1430. GNUNET_free (cp);
  1431. }
  1432. /**
  1433. * Closure for #call_iterator().
  1434. */
  1435. struct IterationContext
  1436. {
  1437. /**
  1438. * Function to call on each entry.
  1439. */
  1440. GSF_ConnectedPeerIterator it;
  1441. /**
  1442. * Closure for @e it.
  1443. */
  1444. void *it_cls;
  1445. };
  1446. /**
  1447. * Function that calls the callback for each peer.
  1448. *
  1449. * @param cls the `struct IterationContext *`
  1450. * @param key identity of the peer
  1451. * @param value the `struct GSF_ConnectedPeer *`
  1452. * @return #GNUNET_YES to continue iteration
  1453. */
  1454. static int
  1455. call_iterator (void *cls,
  1456. const struct GNUNET_PeerIdentity *key,
  1457. void *value)
  1458. {
  1459. struct IterationContext *ic = cls;
  1460. struct GSF_ConnectedPeer *cp = value;
  1461. ic->it (ic->it_cls,
  1462. key, cp,
  1463. &cp->ppd);
  1464. return GNUNET_YES;
  1465. }
  1466. /**
  1467. * Iterate over all connected peers.
  1468. *
  1469. * @param it function to call for each peer
  1470. * @param it_cls closure for @a it
  1471. */
  1472. void
  1473. GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
  1474. void *it_cls)
  1475. {
  1476. struct IterationContext ic;
  1477. ic.it = it;
  1478. ic.it_cls = it_cls;
  1479. GNUNET_CONTAINER_multipeermap_iterate (cp_map,
  1480. &call_iterator,
  1481. &ic);
  1482. }
  1483. /**
  1484. * Obtain the identity of a connected peer.
  1485. *
  1486. * @param cp peer to get identity of
  1487. * @param id identity to set (written to)
  1488. */
  1489. void
  1490. GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
  1491. struct GNUNET_PeerIdentity *id)
  1492. {
  1493. GNUNET_assert (0 != cp->ppd.pid);
  1494. GNUNET_PEER_resolve (cp->ppd.pid, id);
  1495. }
  1496. /**
  1497. * Obtain the identity of a connected peer.
  1498. *
  1499. * @param cp peer to get identity of
  1500. * @return reference to peer identity, valid until peer disconnects (!)
  1501. */
  1502. const struct GNUNET_PeerIdentity *
  1503. GSF_connected_peer_get_identity2_ (const struct GSF_ConnectedPeer *cp)
  1504. {
  1505. GNUNET_assert (0 != cp->ppd.pid);
  1506. return GNUNET_PEER_resolve2 (cp->ppd.pid);
  1507. }
  1508. /**
  1509. * Ask a peer to stop migrating data to us until the given point
  1510. * in time.
  1511. *
  1512. * @param cp peer to ask
  1513. * @param block_time until when to block
  1514. */
  1515. void
  1516. GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
  1517. struct GNUNET_TIME_Absolute block_time)
  1518. {
  1519. struct GNUNET_MQ_Envelope *env;
  1520. struct MigrationStopMessage *msm;
  1521. if (cp->last_migration_block.abs_value_us > block_time.abs_value_us)
  1522. {
  1523. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1524. "Migration already blocked for another %s\n",
  1525. GNUNET_STRINGS_relative_time_to_string (
  1526. GNUNET_TIME_absolute_get_remaining
  1527. (cp->
  1528. last_migration_block), GNUNET_YES));
  1529. return; /* already blocked */
  1530. }
  1531. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking to stop migration for %s\n",
  1532. GNUNET_STRINGS_relative_time_to_string (
  1533. GNUNET_TIME_absolute_get_remaining (block_time),
  1534. GNUNET_YES));
  1535. cp->last_migration_block = block_time;
  1536. env = GNUNET_MQ_msg (msm,
  1537. GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
  1538. msm->reserved = htonl (0);
  1539. msm->duration
  1540. = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
  1541. (cp->last_migration_block));
  1542. GNUNET_STATISTICS_update (GSF_stats,
  1543. gettext_noop ("# migration stop messages sent"),
  1544. 1,
  1545. GNUNET_NO);
  1546. GSF_peer_transmit_ (cp,
  1547. GNUNET_SYSERR,
  1548. UINT32_MAX,
  1549. env);
  1550. }
  1551. /**
  1552. * Notify core about a preference we have for the given peer
  1553. * (to allocate more resources towards it). The change will
  1554. * be communicated the next time we reserve bandwidth with
  1555. * core (not instantly).
  1556. *
  1557. * @param cp peer to reserve bandwidth from
  1558. * @param pref preference change
  1559. */
  1560. void
  1561. GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
  1562. uint64_t pref)
  1563. {
  1564. cp->inc_preference += pref;
  1565. }
  1566. /**
  1567. * Call this method periodically to flush respect information to disk.
  1568. *
  1569. * @param cls closure, not used
  1570. */
  1571. static void
  1572. cron_flush_respect (void *cls)
  1573. {
  1574. fr_task = NULL;
  1575. GNUNET_CONTAINER_multipeermap_iterate (cp_map,
  1576. &flush_respect,
  1577. NULL);
  1578. fr_task = GNUNET_SCHEDULER_add_delayed_with_priority (RESPECT_FLUSH_FREQ,
  1579. GNUNET_SCHEDULER_PRIORITY_HIGH,
  1580. &cron_flush_respect,
  1581. NULL);
  1582. }
  1583. /**
  1584. * Initialize peer management subsystem.
  1585. */
  1586. void
  1587. GSF_connected_peer_init_ ()
  1588. {
  1589. cp_map = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES);
  1590. peerstore = GNUNET_PEERSTORE_connect (GSF_cfg);
  1591. fr_task = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
  1592. &cron_flush_respect, NULL);
  1593. }
  1594. /**
  1595. * Shutdown peer management subsystem.
  1596. */
  1597. void
  1598. GSF_connected_peer_done_ ()
  1599. {
  1600. GNUNET_CONTAINER_multipeermap_iterate (cp_map,
  1601. &flush_respect,
  1602. NULL);
  1603. GNUNET_SCHEDULER_cancel (fr_task);
  1604. fr_task = NULL;
  1605. GNUNET_CONTAINER_multipeermap_destroy (cp_map);
  1606. cp_map = NULL;
  1607. GNUNET_PEERSTORE_disconnect (peerstore,
  1608. GNUNET_YES);
  1609. }
  1610. /**
  1611. * Iterator to remove references to LC entry.
  1612. *
  1613. * @param cls the `struct GSF_LocalClient *` to look for
  1614. * @param key current key code
  1615. * @param value value in the hash map (peer entry)
  1616. * @return #GNUNET_YES (we should continue to iterate)
  1617. */
  1618. static int
  1619. clean_local_client (void *cls,
  1620. const struct GNUNET_PeerIdentity *key,
  1621. void *value)
  1622. {
  1623. const struct GSF_LocalClient *lc = cls;
  1624. struct GSF_ConnectedPeer *cp = value;
  1625. unsigned int i;
  1626. for (i = 0; i < CS2P_SUCCESS_LIST_SIZE; i++)
  1627. if (cp->ppd.last_client_replies[i] == lc)
  1628. cp->ppd.last_client_replies[i] = NULL;
  1629. return GNUNET_YES;
  1630. }
  1631. /**
  1632. * Notification that a local client disconnected. Clean up all of our
  1633. * references to the given handle.
  1634. *
  1635. * @param lc handle to the local client (henceforth invalid)
  1636. */
  1637. void
  1638. GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
  1639. {
  1640. if (NULL == cp_map)
  1641. return; /* already cleaned up */
  1642. GNUNET_CONTAINER_multipeermap_iterate (cp_map,
  1643. &clean_local_client,
  1644. (void *) lc);
  1645. }
  1646. /* end of gnunet-service-fs_cp.c */