gnunet-service-fs_cp.c 54 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881
  1. /*
  2. This file is part of GNUnet.
  3. (C) 2011 Christian Grothoff (and other contributing authors)
  4. GNUnet is free software; you can redistribute it and/or modify
  5. it under the terms of the GNU General Public License as published
  6. by the Free Software Foundation; either version 3, or (at your
  7. option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. General Public License for more details.
  12. You should have received a copy of the GNU General Public License
  13. along with GNUnet; see the file COPYING. If not, write to the
  14. Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  15. Boston, MA 02111-1307, USA.
  16. */
  17. /**
  18. * @file fs/gnunet-service-fs_cp.c
  19. * @brief API to handle 'connected peers'
  20. * @author Christian Grothoff
  21. */
  22. #include "platform.h"
  23. #include "gnunet_util_lib.h"
  24. #include "gnunet_load_lib.h"
  25. #include "gnunet-service-fs.h"
  26. #include "gnunet-service-fs_cp.h"
  27. #include "gnunet-service-fs_pe.h"
  28. #include "gnunet-service-fs_pr.h"
  29. #include "gnunet-service-fs_push.h"
  30. #include "gnunet_peerstore_service.h"
  31. /**
  32. * Ratio for moving average delay calculation. The previous
  33. * average goes in with a factor of (n-1) into the calculation.
  34. * Must be > 0.
  35. */
  36. #define RUNAVG_DELAY_N 16
  37. /**
  38. * How often do we flush respect values to disk?
  39. */
  40. #define RESPECT_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
  41. /**
  42. * After how long do we discard a reply?
  43. */
  44. #define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
  45. /**
  46. * Collect an instane number of statistics? May cause excessive IPC.
  47. */
  48. #define INSANE_STATISTICS GNUNET_NO
  49. /**
  50. * Handle to cancel a transmission request.
  51. */
  52. struct GSF_PeerTransmitHandle
  53. {
  54. /**
  55. * Kept in a doubly-linked list.
  56. */
  57. struct GSF_PeerTransmitHandle *next;
  58. /**
  59. * Kept in a doubly-linked list.
  60. */
  61. struct GSF_PeerTransmitHandle *prev;
  62. /**
  63. * Time when this transmission request was issued.
  64. */
  65. struct GNUNET_TIME_Absolute transmission_request_start_time;
  66. /**
  67. * Timeout for this request.
  68. */
  69. struct GNUNET_TIME_Absolute timeout;
  70. /**
  71. * Task called on timeout, or 0 for none.
  72. */
  73. GNUNET_SCHEDULER_TaskIdentifier timeout_task;
  74. /**
  75. * Function to call to get the actual message.
  76. */
  77. GSF_GetMessageCallback gmc;
  78. /**
  79. * Peer this request targets.
  80. */
  81. struct GSF_ConnectedPeer *cp;
  82. /**
  83. * Closure for @e gmc.
  84. */
  85. void *gmc_cls;
  86. /**
  87. * Size of the message to be transmitted.
  88. */
  89. size_t size;
  90. /**
  91. * #GNUNET_YES if this is a query, #GNUNET_NO for content.
  92. */
  93. int is_query;
  94. /**
  95. * Did we get a reservation already?
  96. */
  97. int was_reserved;
  98. /**
  99. * Priority of this request.
  100. */
  101. uint32_t priority;
  102. };
  103. /**
  104. * Handle for an entry in our delay list.
  105. */
  106. struct GSF_DelayedHandle
  107. {
  108. /**
  109. * Kept in a doubly-linked list.
  110. */
  111. struct GSF_DelayedHandle *next;
  112. /**
  113. * Kept in a doubly-linked list.
  114. */
  115. struct GSF_DelayedHandle *prev;
  116. /**
  117. * Peer this transmission belongs to.
  118. */
  119. struct GSF_ConnectedPeer *cp;
  120. /**
  121. * The PUT that was delayed.
  122. */
  123. struct PutMessage *pm;
  124. /**
  125. * Task for the delay.
  126. */
  127. GNUNET_SCHEDULER_TaskIdentifier delay_task;
  128. /**
  129. * Size of the message.
  130. */
  131. size_t msize;
  132. };
  133. /**
  134. * Information per peer and request.
  135. */
  136. struct PeerRequest
  137. {
  138. /**
  139. * Handle to generic request.
  140. */
  141. struct GSF_PendingRequest *pr;
  142. /**
  143. * Handle to specific peer.
  144. */
  145. struct GSF_ConnectedPeer *cp;
  146. /**
  147. * Task for asynchronous stopping of this request.
  148. */
  149. GNUNET_SCHEDULER_TaskIdentifier kill_task;
  150. };
  151. /**
  152. * A connected peer.
  153. */
  154. struct GSF_ConnectedPeer
  155. {
  156. /**
  157. * Performance data for this peer.
  158. */
  159. struct GSF_PeerPerformanceData ppd;
  160. /**
  161. * Time until when we blocked this peer from migrating
  162. * data to us.
  163. */
  164. struct GNUNET_TIME_Absolute last_migration_block;
  165. /**
  166. * Task scheduled to revive migration to this peer.
  167. */
  168. GNUNET_SCHEDULER_TaskIdentifier mig_revive_task;
  169. /**
  170. * Messages (replies, queries, content migration) we would like to
  171. * send to this peer in the near future. Sorted by priority, head.
  172. */
  173. struct GSF_PeerTransmitHandle *pth_head;
  174. /**
  175. * Messages (replies, queries, content migration) we would like to
  176. * send to this peer in the near future. Sorted by priority, tail.
  177. */
  178. struct GSF_PeerTransmitHandle *pth_tail;
  179. /**
  180. * Messages (replies, queries, content migration) we would like to
  181. * send to this peer in the near future. Sorted by priority, head.
  182. */
  183. struct GSF_DelayedHandle *delayed_head;
  184. /**
  185. * Messages (replies, queries, content migration) we would like to
  186. * send to this peer in the near future. Sorted by priority, tail.
  187. */
  188. struct GSF_DelayedHandle *delayed_tail;
  189. /**
  190. * Migration stop message in our queue, or NULL if we have none pending.
  191. */
  192. struct GSF_PeerTransmitHandle *migration_pth;
  193. /**
  194. * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL).
  195. */
  196. struct GNUNET_ATS_ReservationContext *rc;
  197. /**
  198. * Task scheduled if we need to retry bandwidth reservation later.
  199. */
  200. GNUNET_SCHEDULER_TaskIdentifier rc_delay_task;
  201. /**
  202. * Active requests from this neighbour, map of query to 'struct PeerRequest'.
  203. */
  204. struct GNUNET_CONTAINER_MultiHashMap *request_map;
  205. /**
  206. * Handle for an active request for transmission to this
  207. * peer, or NULL (if core queue was full).
  208. */
  209. struct GNUNET_CORE_TransmitHandle *cth;
  210. /**
  211. * Increase in traffic preference still to be submitted
  212. * to the core service for this peer.
  213. */
  214. uint64_t inc_preference;
  215. /**
  216. * Set to 1 if we're currently in the process of calling
  217. * #GNUNET_CORE_notify_transmit_ready() (so while @e cth is
  218. * NULL, we should not call notify_transmit_ready for this
  219. * handle right now).
  220. */
  221. unsigned int cth_in_progress;
  222. /**
  223. * Respect rating for this peer on disk.
  224. */
  225. uint32_t disk_respect;
  226. /**
  227. * Which offset in "last_p2p_replies" will be updated next?
  228. * (we go round-robin).
  229. */
  230. unsigned int last_p2p_replies_woff;
  231. /**
  232. * Which offset in "last_client_replies" will be updated next?
  233. * (we go round-robin).
  234. */
  235. unsigned int last_client_replies_woff;
  236. /**
  237. * Current offset into 'last_request_times' ring buffer.
  238. */
  239. unsigned int last_request_times_off;
  240. /**
  241. * GNUNET_YES if we did successfully reserve 32k bandwidth,
  242. * GNUNET_NO if not.
  243. */
  244. int did_reserve;
  245. /**
  246. * Function called when the creation of this record is complete.
  247. */
  248. GSF_ConnectedPeerCreationCallback creation_cb;
  249. /**
  250. * Closure for @e creation_cb
  251. */
  252. void *creation_cb_cls;
  253. /**
  254. * Handle to the PEERSTORE iterate request for peer respect value
  255. */
  256. struct GNUNET_PEERSTORE_IterateContext *respect_iterate_req;
  257. };
  258. /**
  259. * Map from peer identities to 'struct GSF_ConnectPeer' entries.
  260. */
  261. static struct GNUNET_CONTAINER_MultiPeerMap *cp_map;
  262. /**
  263. * Handle to peerstore service.
  264. */
  265. static struct GNUNET_PEERSTORE_Handle *peerstore;
  266. /**
  267. * Update the latency information kept for the given peer.
  268. *
  269. * @param id peer record to update
  270. * @param latency current latency value
  271. */
  272. void
  273. GSF_update_peer_latency_ (const struct GNUNET_PeerIdentity *id,
  274. struct GNUNET_TIME_Relative latency)
  275. {
  276. struct GSF_ConnectedPeer *cp;
  277. cp = GSF_peer_get_ (id);
  278. if (NULL == cp)
  279. return; /* we're not yet connected at the core level, ignore */
  280. GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay, latency);
  281. /* LATER: merge atsi into cp's performance data (if we ever care...) */
  282. }
  283. /**
  284. * Return the performance data record for the given peer
  285. *
  286. * @param cp peer to query
  287. * @return performance data record for the peer
  288. */
  289. struct GSF_PeerPerformanceData *
  290. GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
  291. {
  292. return &cp->ppd;
  293. }
  294. /**
  295. * Core is ready to transmit to a peer, get the message.
  296. *
  297. * @param cls the 'struct GSF_PeerTransmitHandle' of the message
  298. * @param size number of bytes core is willing to take
  299. * @param buf where to copy the message
  300. * @return number of bytes copied to buf
  301. */
  302. static size_t
  303. peer_transmit_ready_cb (void *cls, size_t size, void *buf);
  304. /**
  305. * Function called by core upon success or failure of our bandwidth reservation request.
  306. *
  307. * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
  308. * @param peer identifies the peer
  309. * @param amount set to the amount that was actually reserved or unreserved;
  310. * either the full requested amount or zero (no partial reservations)
  311. * @param res_delay if the reservation could not be satisfied (amount was 0), how
  312. * long should the client wait until re-trying?
  313. */
  314. static void
  315. ats_reserve_callback (void *cls, const struct GNUNET_PeerIdentity *peer,
  316. int32_t amount, struct GNUNET_TIME_Relative res_delay);
  317. /**
  318. * If ready (bandwidth reserved), try to schedule transmission via
  319. * core for the given handle.
  320. *
  321. * @param pth transmission handle to schedule
  322. */
  323. static void
  324. schedule_transmission (struct GSF_PeerTransmitHandle *pth)
  325. {
  326. struct GSF_ConnectedPeer *cp;
  327. struct GNUNET_PeerIdentity target;
  328. cp = pth->cp;
  329. if ((NULL != cp->cth) || (0 != cp->cth_in_progress))
  330. return; /* already done */
  331. GNUNET_assert (0 != cp->ppd.pid);
  332. GNUNET_PEER_resolve (cp->ppd.pid, &target);
  333. if (0 != cp->inc_preference)
  334. {
  335. GNUNET_ATS_performance_change_preference (GSF_ats, &target, GNUNET_ATS_PREFERENCE_BANDWIDTH,
  336. (double) cp->inc_preference,
  337. GNUNET_ATS_PREFERENCE_END);
  338. cp->inc_preference = 0;
  339. }
  340. if ((GNUNET_YES == pth->is_query) && (GNUNET_YES != pth->was_reserved))
  341. {
  342. /* query, need reservation */
  343. if (GNUNET_YES != cp->did_reserve)
  344. return; /* not ready */
  345. cp->did_reserve = GNUNET_NO;
  346. /* reservation already done! */
  347. pth->was_reserved = GNUNET_YES;
  348. cp->rc =
  349. GNUNET_ATS_reserve_bandwidth (GSF_ats, &target, DBLOCK_SIZE,
  350. &ats_reserve_callback, cp);
  351. return;
  352. }
  353. GNUNET_assert (NULL == cp->cth);
  354. cp->cth_in_progress++;
  355. cp->cth =
  356. GNUNET_CORE_notify_transmit_ready (GSF_core, GNUNET_YES,
  357. GNUNET_CORE_PRIO_BACKGROUND,
  358. GNUNET_TIME_absolute_get_remaining
  359. (pth->timeout), &target, pth->size,
  360. &peer_transmit_ready_cb, cp);
  361. GNUNET_assert (NULL != cp->cth);
  362. GNUNET_assert (0 < cp->cth_in_progress--);
  363. }
  364. /**
  365. * Core is ready to transmit to a peer, get the message.
  366. *
  367. * @param cls the 'struct GSF_PeerTransmitHandle' of the message
  368. * @param size number of bytes core is willing to take
  369. * @param buf where to copy the message
  370. * @return number of bytes copied to buf
  371. */
  372. static size_t
  373. peer_transmit_ready_cb (void *cls, size_t size, void *buf)
  374. {
  375. struct GSF_ConnectedPeer *cp = cls;
  376. struct GSF_PeerTransmitHandle *pth = cp->pth_head;
  377. struct GSF_PeerTransmitHandle *pos;
  378. size_t ret;
  379. cp->cth = NULL;
  380. if (NULL == pth)
  381. return 0;
  382. if (pth->size > size)
  383. {
  384. schedule_transmission (pth);
  385. return 0;
  386. }
  387. if (GNUNET_SCHEDULER_NO_TASK != pth->timeout_task)
  388. {
  389. GNUNET_SCHEDULER_cancel (pth->timeout_task);
  390. pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
  391. }
  392. GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
  393. if (GNUNET_YES == pth->is_query)
  394. {
  395. cp->ppd.last_request_times[(cp->last_request_times_off++) %
  396. MAX_QUEUE_PER_PEER] =
  397. GNUNET_TIME_absolute_get ();
  398. GNUNET_assert (0 < cp->ppd.pending_queries--);
  399. }
  400. else if (GNUNET_NO == pth->is_query)
  401. {
  402. GNUNET_assert (0 < cp->ppd.pending_replies--);
  403. }
  404. GNUNET_LOAD_update (cp->ppd.transmission_delay,
  405. GNUNET_TIME_absolute_get_duration
  406. (pth->transmission_request_start_time).rel_value_us);
  407. ret = pth->gmc (pth->gmc_cls, size, buf);
  408. if (NULL != (pos = cp->pth_head))
  409. {
  410. GNUNET_assert (pos != pth);
  411. schedule_transmission (pos);
  412. }
  413. GNUNET_free (pth);
  414. return ret;
  415. }
  416. /**
  417. * (re)try to reserve bandwidth from the given peer.
  418. *
  419. * @param cls the 'struct GSF_ConnectedPeer' to reserve from
  420. * @param tc scheduler context
  421. */
  422. static void
  423. retry_reservation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  424. {
  425. struct GSF_ConnectedPeer *cp = cls;
  426. struct GNUNET_PeerIdentity target;
  427. GNUNET_PEER_resolve (cp->ppd.pid, &target);
  428. cp->rc_delay_task = GNUNET_SCHEDULER_NO_TASK;
  429. cp->rc =
  430. GNUNET_ATS_reserve_bandwidth (GSF_ats, &target, DBLOCK_SIZE,
  431. &ats_reserve_callback, cp);
  432. }
  433. /**
  434. * Function called by core upon success or failure of our bandwidth reservation request.
  435. *
  436. * @param cls the 'struct GSF_ConnectedPeer' of the peer for which we made the request
  437. * @param peer identifies the peer
  438. * @param amount set to the amount that was actually reserved or unreserved;
  439. * either the full requested amount or zero (no partial reservations)
  440. * @param res_delay if the reservation could not be satisfied (amount was 0), how
  441. * long should the client wait until re-trying?
  442. */
  443. static void
  444. ats_reserve_callback (void *cls, const struct GNUNET_PeerIdentity *peer,
  445. int32_t amount, struct GNUNET_TIME_Relative res_delay)
  446. {
  447. struct GSF_ConnectedPeer *cp = cls;
  448. struct GSF_PeerTransmitHandle *pth;
  449. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  450. "Reserved %d bytes / need to wait %s for reservation\n",
  451. (int) amount,
  452. GNUNET_STRINGS_relative_time_to_string (res_delay, GNUNET_YES));
  453. cp->rc = NULL;
  454. if (0 == amount)
  455. {
  456. cp->rc_delay_task =
  457. GNUNET_SCHEDULER_add_delayed (res_delay, &retry_reservation, cp);
  458. return;
  459. }
  460. cp->did_reserve = GNUNET_YES;
  461. pth = cp->pth_head;
  462. if ((NULL != pth) && (NULL == cp->cth) && (0 == cp->cth_in_progress))
  463. {
  464. /* reservation success, try transmission now! */
  465. cp->cth_in_progress++;
  466. cp->cth =
  467. GNUNET_CORE_notify_transmit_ready (GSF_core, GNUNET_YES,
  468. GNUNET_CORE_PRIO_BACKGROUND,
  469. GNUNET_TIME_absolute_get_remaining
  470. (pth->timeout), peer, pth->size,
  471. &peer_transmit_ready_cb, cp);
  472. GNUNET_assert (NULL != cp->cth);
  473. GNUNET_assert (0 < cp->cth_in_progress--);
  474. }
  475. }
  476. /**
  477. * Function called by PEERSTORE with peer respect record
  478. *
  479. * @param cls handle to connected peer entry
  480. * @param record peerstore record information
  481. * @param emsg error message, or NULL if no errors
  482. * @return #GNUNET_NO to stop iterating since we only expect 0 or 1 records
  483. */
  484. static int
  485. peer_respect_cb (void *cls, struct GNUNET_PEERSTORE_Record *record, char *emsg)
  486. {
  487. struct GSF_ConnectedPeer *cp = cls;
  488. cp->respect_iterate_req = NULL;
  489. if ((NULL != record) && (sizeof (cp->disk_respect) == record->value_size))
  490. cp->disk_respect = cp->ppd.respect = *((uint32_t *)record->value);
  491. GSF_push_start_ (cp);
  492. if (NULL != cp->creation_cb)
  493. cp->creation_cb (cp->creation_cb_cls, cp);
  494. return GNUNET_NO;
  495. }
  496. /**
  497. * A peer connected to us. Setup the connected peer
  498. * records.
  499. *
  500. * @param peer identity of peer that connected
  501. * @param creation_cb callback function when the record is created.
  502. * @param creation_cb_cls closure for @creation_cb
  503. */
  504. void
  505. GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
  506. GSF_ConnectedPeerCreationCallback creation_cb,
  507. void *creation_cb_cls)
  508. {
  509. struct GSF_ConnectedPeer *cp;
  510. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connected to peer %s\n",
  511. GNUNET_i2s (peer));
  512. cp = GNUNET_new (struct GSF_ConnectedPeer);
  513. cp->ppd.pid = GNUNET_PEER_intern (peer);
  514. cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
  515. cp->rc =
  516. GNUNET_ATS_reserve_bandwidth (GSF_ats, peer, DBLOCK_SIZE,
  517. &ats_reserve_callback, cp);
  518. cp->request_map = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_NO);
  519. GNUNET_break (GNUNET_OK ==
  520. GNUNET_CONTAINER_multipeermap_put (cp_map,
  521. GSF_connected_peer_get_identity2_ (cp),
  522. cp,
  523. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
  524. GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
  525. GNUNET_CONTAINER_multipeermap_size (cp_map),
  526. GNUNET_NO);
  527. cp->creation_cb = creation_cb;
  528. cp->creation_cb_cls = creation_cb_cls;
  529. cp->respect_iterate_req =
  530. GNUNET_PEERSTORE_iterate (peerstore, "fs", peer, "respect",
  531. GNUNET_TIME_UNIT_FOREVER_REL, &peer_respect_cb,
  532. cp);
  533. }
  534. /**
  535. * It may be time to re-start migrating content to this
  536. * peer. Check, and if so, restart migration.
  537. *
  538. * @param cls the 'struct GSF_ConnectedPeer'
  539. * @param tc scheduler context
  540. */
  541. static void
  542. revive_migration (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  543. {
  544. struct GSF_ConnectedPeer *cp = cls;
  545. struct GNUNET_TIME_Relative bt;
  546. cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK;
  547. bt = GNUNET_TIME_absolute_get_remaining (cp->ppd.migration_blocked_until);
  548. if (0 != bt.rel_value_us)
  549. {
  550. /* still time left... */
  551. cp->mig_revive_task =
  552. GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
  553. return;
  554. }
  555. GSF_push_start_ (cp);
  556. }
  557. /**
  558. * Get a handle for a connected peer.
  559. *
  560. * @param peer peer's identity
  561. * @return NULL if the peer is not currently connected
  562. */
  563. struct GSF_ConnectedPeer *
  564. GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer)
  565. {
  566. if (NULL == cp_map)
  567. return NULL;
  568. return GNUNET_CONTAINER_multipeermap_get (cp_map, peer);
  569. }
  570. /**
  571. * Handle P2P "MIGRATION_STOP" message.
  572. *
  573. * @param cls closure, always NULL
  574. * @param other the other peer involved (sender or receiver, NULL
  575. * for loopback messages where we are both sender and receiver)
  576. * @param message the actual message
  577. * @return GNUNET_OK to keep the connection open,
  578. * GNUNET_SYSERR to close it (signal serious error)
  579. */
  580. int
  581. GSF_handle_p2p_migration_stop_ (void *cls,
  582. const struct GNUNET_PeerIdentity *other,
  583. const struct GNUNET_MessageHeader *message)
  584. {
  585. struct GSF_ConnectedPeer *cp;
  586. const struct MigrationStopMessage *msm;
  587. struct GNUNET_TIME_Relative bt;
  588. msm = (const struct MigrationStopMessage *) message;
  589. cp = GSF_peer_get_ (other);
  590. if (NULL == cp)
  591. {
  592. GNUNET_break (0);
  593. return GNUNET_OK;
  594. }
  595. GNUNET_STATISTICS_update (GSF_stats,
  596. gettext_noop ("# migration stop messages received"),
  597. 1, GNUNET_NO);
  598. bt = GNUNET_TIME_relative_ntoh (msm->duration);
  599. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  600. _("Migration of content to peer `%s' blocked for %s\n"),
  601. GNUNET_i2s (other),
  602. GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES));
  603. cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
  604. if (GNUNET_SCHEDULER_NO_TASK == cp->mig_revive_task)
  605. {
  606. GSF_push_stop_ (cp);
  607. cp->mig_revive_task =
  608. GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
  609. }
  610. return GNUNET_OK;
  611. }
  612. /**
  613. * Copy reply and free put message.
  614. *
  615. * @param cls the 'struct PutMessage'
  616. * @param buf_size number of bytes available in buf
  617. * @param buf where to copy the message, NULL on error (peer disconnect)
  618. * @return number of bytes copied to 'buf', can be 0 (without indicating an error)
  619. */
  620. static size_t
  621. copy_reply (void *cls, size_t buf_size, void *buf)
  622. {
  623. struct PutMessage *pm = cls;
  624. size_t size;
  625. if (NULL != buf)
  626. {
  627. GNUNET_assert (buf_size >= ntohs (pm->header.size));
  628. size = ntohs (pm->header.size);
  629. memcpy (buf, pm, size);
  630. GNUNET_STATISTICS_update (GSF_stats,
  631. gettext_noop
  632. ("# replies transmitted to other peers"), 1,
  633. GNUNET_NO);
  634. }
  635. else
  636. {
  637. size = 0;
  638. GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# replies dropped"), 1,
  639. GNUNET_NO);
  640. }
  641. GNUNET_free (pm);
  642. return size;
  643. }
  644. /**
  645. * Free resources associated with the given peer request.
  646. *
  647. * @param peerreq request to free
  648. * @param query associated key for the request
  649. */
  650. static void
  651. free_pending_request (struct PeerRequest *peerreq,
  652. const struct GNUNET_HashCode *query)
  653. {
  654. struct GSF_ConnectedPeer *cp = peerreq->cp;
  655. if (GNUNET_SCHEDULER_NO_TASK != peerreq->kill_task)
  656. {
  657. GNUNET_SCHEDULER_cancel (peerreq->kill_task);
  658. peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
  659. }
  660. GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# P2P searches active"),
  661. -1, GNUNET_NO);
  662. GNUNET_break (GNUNET_YES ==
  663. GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
  664. query, peerreq));
  665. GNUNET_free (peerreq);
  666. }
  667. /**
  668. * Cancel all requests associated with the peer.
  669. *
  670. * @param cls unused
  671. * @param query hash code of the request
  672. * @param value the 'struct GSF_PendingRequest'
  673. * @return GNUNET_YES (continue to iterate)
  674. */
  675. static int
  676. cancel_pending_request (void *cls, const struct GNUNET_HashCode * query, void *value)
  677. {
  678. struct PeerRequest *peerreq = value;
  679. struct GSF_PendingRequest *pr = peerreq->pr;
  680. struct GSF_PendingRequestData *prd;
  681. prd = GSF_pending_request_get_data_ (pr);
  682. GSF_pending_request_cancel_ (pr, GNUNET_NO);
  683. free_pending_request (peerreq, &prd->query);
  684. return GNUNET_OK;
  685. }
  686. /**
  687. * Free the given request.
  688. *
  689. * @param cls the request to free
  690. * @param tc task context
  691. */
  692. static void
  693. peer_request_destroy (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  694. {
  695. struct PeerRequest *peerreq = cls;
  696. struct GSF_PendingRequest *pr = peerreq->pr;
  697. struct GSF_PendingRequestData *prd;
  698. peerreq->kill_task = GNUNET_SCHEDULER_NO_TASK;
  699. prd = GSF_pending_request_get_data_ (pr);
  700. cancel_pending_request (NULL, &prd->query, peerreq);
  701. }
  702. /**
  703. * The artificial delay is over, transmit the message now.
  704. *
  705. * @param cls the 'struct GSF_DelayedHandle' with the message
  706. * @param tc scheduler context
  707. */
  708. static void
  709. transmit_delayed_now (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  710. {
  711. struct GSF_DelayedHandle *dh = cls;
  712. struct GSF_ConnectedPeer *cp = dh->cp;
  713. GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh);
  714. if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
  715. {
  716. GNUNET_free (dh->pm);
  717. GNUNET_free (dh);
  718. return;
  719. }
  720. (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT,
  721. dh->msize, &copy_reply, dh->pm);
  722. GNUNET_free (dh);
  723. }
  724. /**
  725. * Get the randomized delay a response should be subjected to.
  726. *
  727. * @return desired delay
  728. */
  729. static struct GNUNET_TIME_Relative
  730. get_randomized_delay ()
  731. {
  732. struct GNUNET_TIME_Relative ret;
  733. ret =
  734. GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
  735. GNUNET_CRYPTO_random_u32
  736. (GNUNET_CRYPTO_QUALITY_WEAK,
  737. 2 * GSF_avg_latency.rel_value_us + 1));
  738. #if INSANE_STATISTICS
  739. GNUNET_STATISTICS_update (GSF_stats,
  740. gettext_noop
  741. ("# artificial delays introduced (ms)"),
  742. ret.rel_value_us / 1000LL, GNUNET_NO);
  743. #endif
  744. return ret;
  745. }
  746. /**
  747. * Handle a reply to a pending request. Also called if a request
  748. * expires (then with data == NULL). The handler may be called
  749. * many times (depending on the request type), but will not be
  750. * called during or after a call to GSF_pending_request_cancel
  751. * and will also not be called anymore after a call signalling
  752. * expiration.
  753. *
  754. * @param cls 'struct PeerRequest' this is an answer for
  755. * @param eval evaluation of the result
  756. * @param pr handle to the original pending request
  757. * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown"
  758. * @param expiration when does 'data' expire?
  759. * @param last_transmission when did we last transmit a request for this block
  760. * @param type type of the block
  761. * @param data response data, NULL on request expiration
  762. * @param data_len number of bytes in data
  763. */
  764. static void
  765. handle_p2p_reply (void *cls, enum GNUNET_BLOCK_EvaluationResult eval,
  766. struct GSF_PendingRequest *pr, uint32_t reply_anonymity_level,
  767. struct GNUNET_TIME_Absolute expiration,
  768. struct GNUNET_TIME_Absolute last_transmission,
  769. enum GNUNET_BLOCK_Type type, const void *data,
  770. size_t data_len)
  771. {
  772. struct PeerRequest *peerreq = cls;
  773. struct GSF_ConnectedPeer *cp = peerreq->cp;
  774. struct GSF_PendingRequestData *prd;
  775. struct PutMessage *pm;
  776. size_t msize;
  777. GNUNET_assert (data_len + sizeof (struct PutMessage) <
  778. GNUNET_SERVER_MAX_MESSAGE_SIZE);
  779. GNUNET_assert (peerreq->pr == pr);
  780. prd = GSF_pending_request_get_data_ (pr);
  781. if (NULL == data)
  782. {
  783. free_pending_request (peerreq, &prd->query);
  784. return;
  785. }
  786. GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
  787. if ((prd->type != type) && (GNUNET_BLOCK_TYPE_ANY != prd->type))
  788. {
  789. GNUNET_STATISTICS_update (GSF_stats,
  790. gettext_noop
  791. ("# replies dropped due to type mismatch"),
  792. 1, GNUNET_NO);
  793. return;
  794. }
  795. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  796. "Transmitting result for query `%s' to peer\n",
  797. GNUNET_h2s (&prd->query));
  798. GNUNET_STATISTICS_update (GSF_stats,
  799. gettext_noop ("# replies received for other peers"),
  800. 1, GNUNET_NO);
  801. msize = sizeof (struct PutMessage) + data_len;
  802. if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
  803. {
  804. GNUNET_break (0);
  805. return;
  806. }
  807. if ((UINT32_MAX != reply_anonymity_level) && (reply_anonymity_level > 1))
  808. {
  809. if (reply_anonymity_level - 1 > GSF_cover_content_count)
  810. {
  811. GNUNET_STATISTICS_update (GSF_stats,
  812. gettext_noop
  813. ("# replies dropped due to insufficient cover traffic"),
  814. 1, GNUNET_NO);
  815. return;
  816. }
  817. GSF_cover_content_count -= (reply_anonymity_level - 1);
  818. }
  819. pm = GNUNET_malloc (msize);
  820. pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
  821. pm->header.size = htons (msize);
  822. pm->type = htonl (type);
  823. pm->expiration = GNUNET_TIME_absolute_hton (expiration);
  824. memcpy (&pm[1], data, data_len);
  825. if ((UINT32_MAX != reply_anonymity_level) && (0 != reply_anonymity_level) &&
  826. (GNUNET_YES == GSF_enable_randomized_delays))
  827. {
  828. struct GSF_DelayedHandle *dh;
  829. dh = GNUNET_new (struct GSF_DelayedHandle);
  830. dh->cp = cp;
  831. dh->pm = pm;
  832. dh->msize = msize;
  833. GNUNET_CONTAINER_DLL_insert (cp->delayed_head, cp->delayed_tail, dh);
  834. dh->delay_task =
  835. GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
  836. &transmit_delayed_now, dh);
  837. }
  838. else
  839. {
  840. (void) GSF_peer_transmit_ (cp, GNUNET_NO, UINT32_MAX, REPLY_TIMEOUT, msize,
  841. &copy_reply, pm);
  842. }
  843. if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval)
  844. return;
  845. if (GNUNET_SCHEDULER_NO_TASK == peerreq->kill_task)
  846. {
  847. GNUNET_STATISTICS_update (GSF_stats,
  848. gettext_noop
  849. ("# P2P searches destroyed due to ultimate reply"),
  850. 1, GNUNET_NO);
  851. peerreq->kill_task =
  852. GNUNET_SCHEDULER_add_now (&peer_request_destroy, peerreq);
  853. }
  854. }
  855. /**
  856. * Increase the peer's respect by a value.
  857. *
  858. * @param cp which peer to change the respect value on
  859. * @param value is the int value by which the
  860. * peer's credit is to be increased or decreased
  861. * @returns the actual change in respect (positive or negative)
  862. */
  863. static int
  864. change_peer_respect (struct GSF_ConnectedPeer *cp, int value)
  865. {
  866. if (0 == value)
  867. return 0;
  868. GNUNET_assert (NULL != cp);
  869. if (value > 0)
  870. {
  871. if (cp->ppd.respect + value < cp->ppd.respect)
  872. {
  873. value = UINT32_MAX - cp->ppd.respect;
  874. cp->ppd.respect = UINT32_MAX;
  875. }
  876. else
  877. cp->ppd.respect += value;
  878. }
  879. else
  880. {
  881. if (cp->ppd.respect < -value)
  882. {
  883. value = -cp->ppd.respect;
  884. cp->ppd.respect = 0;
  885. }
  886. else
  887. cp->ppd.respect += value;
  888. }
  889. return value;
  890. }
  891. /**
  892. * We've received a request with the specified priority. Bound it
  893. * according to how much we respect the given peer.
  894. *
  895. * @param prio_in requested priority
  896. * @param cp the peer making the request
  897. * @return effective priority
  898. */
  899. static int32_t
  900. bound_priority (uint32_t prio_in, struct GSF_ConnectedPeer *cp)
  901. {
  902. #define N ((double)128.0)
  903. uint32_t ret;
  904. double rret;
  905. int ld;
  906. ld = GSF_test_get_load_too_high_ (0);
  907. if (GNUNET_SYSERR == ld)
  908. {
  909. #if INSANE_STATISTICS
  910. GNUNET_STATISTICS_update (GSF_stats,
  911. gettext_noop
  912. ("# requests done for free (low load)"), 1,
  913. GNUNET_NO);
  914. #endif
  915. return 0; /* excess resources */
  916. }
  917. if (prio_in > INT32_MAX)
  918. prio_in = INT32_MAX;
  919. ret = -change_peer_respect (cp, -(int) prio_in);
  920. if (ret > 0)
  921. {
  922. if (ret > GSF_current_priorities + N)
  923. rret = GSF_current_priorities + N;
  924. else
  925. rret = ret;
  926. GSF_current_priorities = (GSF_current_priorities * (N - 1) + rret) / N;
  927. }
  928. if ((GNUNET_YES == ld) && (ret > 0))
  929. {
  930. /* try with charging */
  931. ld = GSF_test_get_load_too_high_ (ret);
  932. }
  933. if (GNUNET_YES == ld)
  934. {
  935. GNUNET_STATISTICS_update (GSF_stats,
  936. gettext_noop
  937. ("# request dropped, priority insufficient"), 1,
  938. GNUNET_NO);
  939. /* undo charge */
  940. change_peer_respect (cp, (int) ret);
  941. return -1; /* not enough resources */
  942. }
  943. else
  944. {
  945. GNUNET_STATISTICS_update (GSF_stats,
  946. gettext_noop
  947. ("# requests done for a price (normal load)"), 1,
  948. GNUNET_NO);
  949. }
  950. #undef N
  951. return ret;
  952. }
  953. /**
  954. * The priority level imposes a bound on the maximum
  955. * value for the ttl that can be requested.
  956. *
  957. * @param ttl_in requested ttl
  958. * @param prio given priority
  959. * @return ttl_in if ttl_in is below the limit,
  960. * otherwise the ttl-limit for the given priority
  961. */
  962. static int32_t
  963. bound_ttl (int32_t ttl_in, uint32_t prio)
  964. {
  965. unsigned long long allowed;
  966. if (ttl_in <= 0)
  967. return ttl_in;
  968. allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
  969. if (ttl_in > allowed)
  970. {
  971. if (allowed >= (1 << 30))
  972. return 1 << 30;
  973. return allowed;
  974. }
  975. return ttl_in;
  976. }
  977. /**
  978. * Handle P2P "QUERY" message. Creates the pending request entry
  979. * and sets up all of the data structures to that we will
  980. * process replies properly. Does not initiate forwarding or
  981. * local database lookups.
  982. *
  983. * @param other the other peer involved (sender or receiver, NULL
  984. * for loopback messages where we are both sender and receiver)
  985. * @param message the actual message
  986. * @return pending request handle, NULL on error
  987. */
  988. struct GSF_PendingRequest *
  989. GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
  990. const struct GNUNET_MessageHeader *message)
  991. {
  992. struct PeerRequest *peerreq;
  993. struct GSF_PendingRequest *pr;
  994. struct GSF_PendingRequestData *prd;
  995. struct GSF_ConnectedPeer *cp;
  996. struct GSF_ConnectedPeer *cps;
  997. const struct GNUNET_PeerIdentity *target;
  998. enum GSF_PendingRequestOptions options;
  999. uint16_t msize;
  1000. const struct GetMessage *gm;
  1001. unsigned int bits;
  1002. const struct GNUNET_PeerIdentity *opt;
  1003. uint32_t bm;
  1004. size_t bfsize;
  1005. uint32_t ttl_decrement;
  1006. int32_t priority;
  1007. int32_t ttl;
  1008. enum GNUNET_BLOCK_Type type;
  1009. GNUNET_PEER_Id spid;
  1010. GNUNET_assert (other != NULL);
  1011. msize = ntohs (message->size);
  1012. if (msize < sizeof (struct GetMessage))
  1013. {
  1014. GNUNET_break_op (0);
  1015. return NULL;
  1016. }
  1017. GNUNET_STATISTICS_update (GSF_stats,
  1018. gettext_noop
  1019. ("# GET requests received (from other peers)"), 1,
  1020. GNUNET_NO);
  1021. gm = (const struct GetMessage *) message;
  1022. type = ntohl (gm->type);
  1023. bm = ntohl (gm->hash_bitmap);
  1024. bits = 0;
  1025. while (bm > 0)
  1026. {
  1027. if (1 == (bm & 1))
  1028. bits++;
  1029. bm >>= 1;
  1030. }
  1031. if (msize < sizeof (struct GetMessage) + bits * sizeof (struct GNUNET_PeerIdentity))
  1032. {
  1033. GNUNET_break_op (0);
  1034. return NULL;
  1035. }
  1036. opt = (const struct GNUNET_PeerIdentity *) &gm[1];
  1037. bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_PeerIdentity);
  1038. /* bfsize must be power of 2, check! */
  1039. if (0 != ((bfsize - 1) & bfsize))
  1040. {
  1041. GNUNET_break_op (0);
  1042. return NULL;
  1043. }
  1044. GSF_cover_query_count++;
  1045. bm = ntohl (gm->hash_bitmap);
  1046. bits = 0;
  1047. cps = GSF_peer_get_ (other);
  1048. if (NULL == cps)
  1049. {
  1050. /* peer must have just disconnected */
  1051. GNUNET_STATISTICS_update (GSF_stats,
  1052. gettext_noop
  1053. ("# requests dropped due to initiator not being connected"),
  1054. 1, GNUNET_NO);
  1055. return NULL;
  1056. }
  1057. if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
  1058. cp = GSF_peer_get_ (&opt[bits++]);
  1059. else
  1060. cp = cps;
  1061. if (NULL == cp)
  1062. {
  1063. if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
  1064. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1065. "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
  1066. GNUNET_i2s (&opt[bits - 1]));
  1067. else
  1068. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1069. "Failed to find peer `%4s' in connection set. Dropping query.\n",
  1070. GNUNET_i2s (other));
  1071. #if INSANE_STATISTICS
  1072. GNUNET_STATISTICS_update (GSF_stats,
  1073. gettext_noop
  1074. ("# requests dropped due to missing reverse route"),
  1075. 1, GNUNET_NO);
  1076. #endif
  1077. return NULL;
  1078. }
  1079. /* note that we can really only check load here since otherwise
  1080. * peers could find out that we are overloaded by not being
  1081. * disconnected after sending us a malformed query... */
  1082. priority = bound_priority (ntohl (gm->priority), cps);
  1083. if (priority < 0)
  1084. {
  1085. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1086. "Dropping query from `%s', this peer is too busy.\n",
  1087. GNUNET_i2s (other));
  1088. return NULL;
  1089. }
  1090. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1091. "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
  1092. GNUNET_h2s (&gm->query),
  1093. (unsigned int) type,
  1094. GNUNET_i2s (other),
  1095. (unsigned int) bm);
  1096. target =
  1097. (0 !=
  1098. (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? (&opt[bits++]) : NULL;
  1099. options = GSF_PRO_DEFAULTS;
  1100. spid = 0;
  1101. if ((GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1 + priority))
  1102. || (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) >
  1103. GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value_us * 2 +
  1104. GNUNET_LOAD_get_average (GSF_rt_entry_lifetime)))
  1105. {
  1106. /* don't have BW to send to peer, or would likely take longer than we have for it,
  1107. * so at best indirect the query */
  1108. priority = 0;
  1109. options |= GSF_PRO_FORWARD_ONLY;
  1110. spid = GNUNET_PEER_intern (other);
  1111. GNUNET_assert (0 != spid);
  1112. }
  1113. ttl = bound_ttl (ntohl (gm->ttl), priority);
  1114. /* decrement ttl (always) */
  1115. ttl_decrement =
  1116. 2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
  1117. TTL_DECREMENT);
  1118. if ((ttl < 0) && (((int32_t) (ttl - ttl_decrement)) > 0))
  1119. {
  1120. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1121. "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
  1122. GNUNET_i2s (other), ttl, ttl_decrement);
  1123. GNUNET_STATISTICS_update (GSF_stats,
  1124. gettext_noop
  1125. ("# requests dropped due TTL underflow"), 1,
  1126. GNUNET_NO);
  1127. /* integer underflow => drop (should be very rare)! */
  1128. return NULL;
  1129. }
  1130. ttl -= ttl_decrement;
  1131. /* test if the request already exists */
  1132. peerreq = GNUNET_CONTAINER_multihashmap_get (cp->request_map, &gm->query);
  1133. if (peerreq != NULL)
  1134. {
  1135. pr = peerreq->pr;
  1136. prd = GSF_pending_request_get_data_ (pr);
  1137. if (prd->type == type)
  1138. {
  1139. if (prd->ttl.abs_value_us >= GNUNET_TIME_absolute_get ().abs_value_us + ttl * 1000LL)
  1140. {
  1141. /* existing request has higher TTL, drop new one! */
  1142. prd->priority += priority;
  1143. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1144. "Have existing request with higher TTL, dropping new request.\n",
  1145. GNUNET_i2s (other));
  1146. GNUNET_STATISTICS_update (GSF_stats,
  1147. gettext_noop
  1148. ("# requests dropped due to higher-TTL request"),
  1149. 1, GNUNET_NO);
  1150. return NULL;
  1151. }
  1152. /* existing request has lower TTL, drop old one! */
  1153. priority += prd->priority;
  1154. GSF_pending_request_cancel_ (pr, GNUNET_YES);
  1155. free_pending_request (peerreq, &gm->query);
  1156. }
  1157. }
  1158. peerreq = GNUNET_new (struct PeerRequest);
  1159. peerreq->cp = cp;
  1160. pr = GSF_pending_request_create_ (options, type, &gm->query,
  1161. target,
  1162. (bfsize >
  1163. 0) ? (const char *) &opt[bits] : NULL,
  1164. bfsize, ntohl (gm->filter_mutator),
  1165. 1 /* anonymity */ ,
  1166. (uint32_t) priority, ttl, spid, GNUNET_PEER_intern (other), NULL, 0, /* replies_seen */
  1167. &handle_p2p_reply, peerreq);
  1168. GNUNET_assert (NULL != pr);
  1169. peerreq->pr = pr;
  1170. GNUNET_break (GNUNET_OK ==
  1171. GNUNET_CONTAINER_multihashmap_put (cp->request_map, &gm->query,
  1172. peerreq,
  1173. GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
  1174. GNUNET_STATISTICS_update (GSF_stats,
  1175. gettext_noop
  1176. ("# P2P query messages received and processed"), 1,
  1177. GNUNET_NO);
  1178. GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# P2P searches active"),
  1179. 1, GNUNET_NO);
  1180. return pr;
  1181. }
  1182. /**
  1183. * Function called if there has been a timeout trying to satisfy
  1184. * a transmission request.
  1185. *
  1186. * @param cls the 'struct GSF_PeerTransmitHandle' of the request
  1187. * @param tc scheduler context
  1188. */
  1189. static void
  1190. peer_transmit_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  1191. {
  1192. struct GSF_PeerTransmitHandle *pth = cls;
  1193. struct GSF_ConnectedPeer *cp;
  1194. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1195. "Timeout trying to transmit to other peer\n");
  1196. pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
  1197. cp = pth->cp;
  1198. GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
  1199. if (GNUNET_YES == pth->is_query)
  1200. GNUNET_assert (0 < cp->ppd.pending_queries--);
  1201. else if (GNUNET_NO == pth->is_query)
  1202. GNUNET_assert (0 < cp->ppd.pending_replies--);
  1203. GNUNET_LOAD_update (cp->ppd.transmission_delay, UINT64_MAX);
  1204. if (NULL != cp->cth)
  1205. {
  1206. GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
  1207. cp->cth = NULL;
  1208. }
  1209. pth->gmc (pth->gmc_cls, 0, NULL);
  1210. GNUNET_assert (0 == cp->cth_in_progress);
  1211. GNUNET_free (pth);
  1212. }
  1213. /**
  1214. * Transmit a message to the given peer as soon as possible.
  1215. * If the peer disconnects before the transmission can happen,
  1216. * the callback is invoked with a `NULL` @a buffer.
  1217. *
  1218. * @param cp target peer
  1219. * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR)
  1220. * @param priority how important is this request?
  1221. * @param timeout when does this request timeout (call gmc with error)
  1222. * @param size number of bytes we would like to send to the peer
  1223. * @param gmc function to call to get the message
  1224. * @param gmc_cls closure for @a gmc
  1225. * @return handle to cancel request
  1226. */
  1227. struct GSF_PeerTransmitHandle *
  1228. GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, int is_query,
  1229. uint32_t priority, struct GNUNET_TIME_Relative timeout,
  1230. size_t size, GSF_GetMessageCallback gmc, void *gmc_cls)
  1231. {
  1232. struct GSF_PeerTransmitHandle *pth;
  1233. struct GSF_PeerTransmitHandle *pos;
  1234. struct GSF_PeerTransmitHandle *prev;
  1235. pth = GNUNET_new (struct GSF_PeerTransmitHandle);
  1236. pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
  1237. pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
  1238. pth->gmc = gmc;
  1239. pth->gmc_cls = gmc_cls;
  1240. pth->size = size;
  1241. pth->is_query = is_query;
  1242. pth->priority = priority;
  1243. pth->cp = cp;
  1244. /* insertion sort (by priority, descending) */
  1245. prev = NULL;
  1246. pos = cp->pth_head;
  1247. while ((NULL != pos) && (pos->priority > priority))
  1248. {
  1249. prev = pos;
  1250. pos = pos->next;
  1251. }
  1252. GNUNET_CONTAINER_DLL_insert_after (cp->pth_head, cp->pth_tail, prev, pth);
  1253. if (GNUNET_YES == is_query)
  1254. cp->ppd.pending_queries++;
  1255. else if (GNUNET_NO == is_query)
  1256. cp->ppd.pending_replies++;
  1257. pth->timeout_task =
  1258. GNUNET_SCHEDULER_add_delayed (timeout, &peer_transmit_timeout, pth);
  1259. schedule_transmission (pth);
  1260. return pth;
  1261. }
  1262. /**
  1263. * Cancel an earlier request for transmission.
  1264. *
  1265. * @param pth request to cancel
  1266. */
  1267. void
  1268. GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
  1269. {
  1270. struct GSF_ConnectedPeer *cp;
  1271. if (GNUNET_SCHEDULER_NO_TASK != pth->timeout_task)
  1272. {
  1273. GNUNET_SCHEDULER_cancel (pth->timeout_task);
  1274. pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
  1275. }
  1276. cp = pth->cp;
  1277. GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
  1278. if (GNUNET_YES == pth->is_query)
  1279. GNUNET_assert (0 < cp->ppd.pending_queries--);
  1280. else if (GNUNET_NO == pth->is_query)
  1281. GNUNET_assert (0 < cp->ppd.pending_replies--);
  1282. GNUNET_free (pth);
  1283. }
  1284. /**
  1285. * Report on receiving a reply; update the performance record of the given peer.
  1286. *
  1287. * @param cp responding peer (will be updated)
  1288. * @param request_time time at which the original query was transmitted
  1289. * @param request_priority priority of the original request
  1290. */
  1291. void
  1292. GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
  1293. struct GNUNET_TIME_Absolute request_time,
  1294. uint32_t request_priority)
  1295. {
  1296. struct GNUNET_TIME_Relative delay;
  1297. delay = GNUNET_TIME_absolute_get_duration (request_time);
  1298. cp->ppd.avg_reply_delay.rel_value_us =
  1299. (cp->ppd.avg_reply_delay.rel_value_us * (RUNAVG_DELAY_N - 1) +
  1300. delay.rel_value_us) / RUNAVG_DELAY_N;
  1301. cp->ppd.avg_priority =
  1302. (cp->ppd.avg_priority * (RUNAVG_DELAY_N - 1) +
  1303. request_priority) / RUNAVG_DELAY_N;
  1304. }
  1305. /**
  1306. * Report on receiving a reply in response to an initiating client.
  1307. * Remember that this peer is good for this client.
  1308. *
  1309. * @param cp responding peer (will be updated)
  1310. * @param initiator_client local client on responsible for query
  1311. */
  1312. void
  1313. GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
  1314. struct GSF_LocalClient *initiator_client)
  1315. {
  1316. cp->ppd.last_client_replies[cp->last_client_replies_woff++ %
  1317. CS2P_SUCCESS_LIST_SIZE] = initiator_client;
  1318. }
  1319. /**
  1320. * Report on receiving a reply in response to an initiating peer.
  1321. * Remember that this peer is good for this initiating peer.
  1322. *
  1323. * @param cp responding peer (will be updated)
  1324. * @param initiator_peer other peer responsible for query
  1325. */
  1326. void
  1327. GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
  1328. const struct GSF_ConnectedPeer *initiator_peer)
  1329. {
  1330. unsigned int woff;
  1331. woff = cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE;
  1332. GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[woff], -1);
  1333. cp->ppd.last_p2p_replies[woff] = initiator_peer->ppd.pid;
  1334. GNUNET_PEER_change_rc (initiator_peer->ppd.pid, 1);
  1335. cp->last_p2p_replies_woff = (woff + 1) % P2P_SUCCESS_LIST_SIZE;
  1336. }
  1337. /**
  1338. * A peer disconnected from us. Tear down the connected peer
  1339. * record.
  1340. *
  1341. * @param cls unused
  1342. * @param peer identity of peer that connected
  1343. */
  1344. void
  1345. GSF_peer_disconnect_handler_ (void *cls, const struct GNUNET_PeerIdentity *peer)
  1346. {
  1347. struct GSF_ConnectedPeer *cp;
  1348. struct GSF_PeerTransmitHandle *pth;
  1349. struct GSF_DelayedHandle *dh;
  1350. cp = GSF_peer_get_ (peer);
  1351. if (NULL == cp)
  1352. return; /* must have been disconnect from core with
  1353. * 'peer' == my_id, ignore */
  1354. GNUNET_assert (GNUNET_YES ==
  1355. GNUNET_CONTAINER_multipeermap_remove (cp_map,
  1356. peer,
  1357. cp));
  1358. GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"),
  1359. GNUNET_CONTAINER_multipeermap_size (cp_map),
  1360. GNUNET_NO);
  1361. if (NULL != cp->respect_iterate_req)
  1362. {
  1363. GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
  1364. cp->respect_iterate_req = NULL;
  1365. }
  1366. if (NULL != cp->migration_pth)
  1367. {
  1368. GSF_peer_transmit_cancel_ (cp->migration_pth);
  1369. cp->migration_pth = NULL;
  1370. }
  1371. if (NULL != cp->rc)
  1372. {
  1373. GNUNET_ATS_reserve_bandwidth_cancel (cp->rc);
  1374. cp->rc = NULL;
  1375. }
  1376. if (GNUNET_SCHEDULER_NO_TASK != cp->rc_delay_task)
  1377. {
  1378. GNUNET_SCHEDULER_cancel (cp->rc_delay_task);
  1379. cp->rc_delay_task = GNUNET_SCHEDULER_NO_TASK;
  1380. }
  1381. GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
  1382. &cancel_pending_request, cp);
  1383. GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
  1384. cp->request_map = NULL;
  1385. GSF_plan_notify_peer_disconnect_ (cp);
  1386. GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
  1387. GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
  1388. memset (cp->ppd.last_p2p_replies, 0, sizeof (cp->ppd.last_p2p_replies));
  1389. GSF_push_stop_ (cp);
  1390. if (NULL != cp->cth)
  1391. {
  1392. GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
  1393. cp->cth = NULL;
  1394. }
  1395. GNUNET_assert (0 == cp->cth_in_progress);
  1396. while (NULL != (pth = cp->pth_head))
  1397. {
  1398. if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
  1399. {
  1400. GNUNET_SCHEDULER_cancel (pth->timeout_task);
  1401. pth->timeout_task = GNUNET_SCHEDULER_NO_TASK;
  1402. }
  1403. GNUNET_CONTAINER_DLL_remove (cp->pth_head, cp->pth_tail, pth);
  1404. pth->gmc (pth->gmc_cls, 0, NULL);
  1405. GNUNET_free (pth);
  1406. }
  1407. while (NULL != (dh = cp->delayed_head))
  1408. {
  1409. GNUNET_CONTAINER_DLL_remove (cp->delayed_head, cp->delayed_tail, dh);
  1410. GNUNET_SCHEDULER_cancel (dh->delay_task);
  1411. GNUNET_free (dh->pm);
  1412. GNUNET_free (dh);
  1413. }
  1414. GNUNET_PEER_change_rc (cp->ppd.pid, -1);
  1415. if (GNUNET_SCHEDULER_NO_TASK != cp->mig_revive_task)
  1416. {
  1417. GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
  1418. cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK;
  1419. }
  1420. GNUNET_free (cp);
  1421. }
  1422. /**
  1423. * Closure for 'call_iterator'.
  1424. */
  1425. struct IterationContext
  1426. {
  1427. /**
  1428. * Function to call on each entry.
  1429. */
  1430. GSF_ConnectedPeerIterator it;
  1431. /**
  1432. * Closure for 'it'.
  1433. */
  1434. void *it_cls;
  1435. };
  1436. /**
  1437. * Function that calls the callback for each peer.
  1438. *
  1439. * @param cls the `struct IterationContext *`
  1440. * @param key identity of the peer
  1441. * @param value the `struct GSF_ConnectedPeer *`
  1442. * @return #GNUNET_YES to continue iteration
  1443. */
  1444. static int
  1445. call_iterator (void *cls, const struct GNUNET_PeerIdentity * key, void *value)
  1446. {
  1447. struct IterationContext *ic = cls;
  1448. struct GSF_ConnectedPeer *cp = value;
  1449. ic->it (ic->it_cls, (const struct GNUNET_PeerIdentity *) key, cp, &cp->ppd);
  1450. return GNUNET_YES;
  1451. }
  1452. /**
  1453. * Iterate over all connected peers.
  1454. *
  1455. * @param it function to call for each peer
  1456. * @param it_cls closure for @a it
  1457. */
  1458. void
  1459. GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it, void *it_cls)
  1460. {
  1461. struct IterationContext ic;
  1462. ic.it = it;
  1463. ic.it_cls = it_cls;
  1464. GNUNET_CONTAINER_multipeermap_iterate (cp_map, &call_iterator, &ic);
  1465. }
  1466. /**
  1467. * Obtain the identity of a connected peer.
  1468. *
  1469. * @param cp peer to get identity of
  1470. * @param id identity to set (written to)
  1471. */
  1472. void
  1473. GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
  1474. struct GNUNET_PeerIdentity *id)
  1475. {
  1476. GNUNET_assert (0 != cp->ppd.pid);
  1477. GNUNET_PEER_resolve (cp->ppd.pid, id);
  1478. }
  1479. /**
  1480. * Obtain the identity of a connected peer.
  1481. *
  1482. * @param cp peer to get identity of
  1483. * @return reference to peer identity, valid until peer disconnects (!)
  1484. */
  1485. const struct GNUNET_PeerIdentity *
  1486. GSF_connected_peer_get_identity2_ (const struct GSF_ConnectedPeer *cp)
  1487. {
  1488. GNUNET_assert (0 != cp->ppd.pid);
  1489. return GNUNET_PEER_resolve2 (cp->ppd.pid);
  1490. }
  1491. /**
  1492. * Assemble a migration stop message for transmission.
  1493. *
  1494. * @param cls the 'struct GSF_ConnectedPeer' to use
  1495. * @param size number of bytes we're allowed to write to buf
  1496. * @param buf where to copy the message
  1497. * @return number of bytes copied to buf
  1498. */
  1499. static size_t
  1500. create_migration_stop_message (void *cls, size_t size, void *buf)
  1501. {
  1502. struct GSF_ConnectedPeer *cp = cls;
  1503. struct MigrationStopMessage msm;
  1504. cp->migration_pth = NULL;
  1505. if (NULL == buf)
  1506. return 0;
  1507. GNUNET_assert (size >= sizeof (struct MigrationStopMessage));
  1508. msm.header.size = htons (sizeof (struct MigrationStopMessage));
  1509. msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
  1510. msm.reserved = htonl (0);
  1511. msm.duration =
  1512. GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
  1513. (cp->last_migration_block));
  1514. memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
  1515. GNUNET_STATISTICS_update (GSF_stats,
  1516. gettext_noop ("# migration stop messages sent"),
  1517. 1, GNUNET_NO);
  1518. return sizeof (struct MigrationStopMessage);
  1519. }
  1520. /**
  1521. * Ask a peer to stop migrating data to us until the given point
  1522. * in time.
  1523. *
  1524. * @param cp peer to ask
  1525. * @param block_time until when to block
  1526. */
  1527. void
  1528. GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
  1529. struct GNUNET_TIME_Absolute block_time)
  1530. {
  1531. if (cp->last_migration_block.abs_value_us > block_time.abs_value_us)
  1532. {
  1533. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1534. "Migration already blocked for another %s\n",
  1535. GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining
  1536. (cp->last_migration_block), GNUNET_YES));
  1537. return; /* already blocked */
  1538. }
  1539. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking to stop migration for %s\n",
  1540. GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (block_time),
  1541. GNUNET_YES));
  1542. cp->last_migration_block = block_time;
  1543. if (NULL != cp->migration_pth)
  1544. GSF_peer_transmit_cancel_ (cp->migration_pth);
  1545. cp->migration_pth =
  1546. GSF_peer_transmit_ (cp, GNUNET_SYSERR, UINT32_MAX,
  1547. GNUNET_TIME_UNIT_FOREVER_REL,
  1548. sizeof (struct MigrationStopMessage),
  1549. &create_migration_stop_message, cp);
  1550. }
  1551. /**
  1552. * Write peer-respect information to a file - flush the buffer entry!
  1553. *
  1554. * @param cls unused
  1555. * @param key peer identity
  1556. * @param value the 'struct GSF_ConnectedPeer' to flush
  1557. * @return GNUNET_OK to continue iteration
  1558. */
  1559. static int
  1560. flush_respect (void *cls, const struct GNUNET_PeerIdentity * key, void *value)
  1561. {
  1562. struct GSF_ConnectedPeer *cp = value;
  1563. struct GNUNET_PeerIdentity pid;
  1564. if (cp->ppd.respect == cp->disk_respect)
  1565. return GNUNET_OK; /* unchanged */
  1566. GNUNET_assert (0 != cp->ppd.pid);
  1567. GNUNET_PEER_resolve (cp->ppd.pid, &pid);
  1568. GNUNET_PEERSTORE_store (peerstore, "fs", &pid, "respect", &cp->ppd.respect,
  1569. sizeof (cp->ppd.respect),
  1570. GNUNET_TIME_UNIT_FOREVER_ABS,
  1571. GNUNET_PEERSTORE_STOREOPTION_REPLACE, NULL, NULL);
  1572. return GNUNET_OK;
  1573. }
  1574. /**
  1575. * Notify core about a preference we have for the given peer
  1576. * (to allocate more resources towards it). The change will
  1577. * be communicated the next time we reserve bandwidth with
  1578. * core (not instantly).
  1579. *
  1580. * @param cp peer to reserve bandwidth from
  1581. * @param pref preference change
  1582. */
  1583. void
  1584. GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
  1585. uint64_t pref)
  1586. {
  1587. cp->inc_preference += pref;
  1588. }
  1589. /**
  1590. * Call this method periodically to flush respect information to disk.
  1591. *
  1592. * @param cls closure, not used
  1593. * @param tc task context, not used
  1594. */
  1595. static void
  1596. cron_flush_respect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  1597. {
  1598. if (NULL == cp_map)
  1599. return;
  1600. GNUNET_CONTAINER_multipeermap_iterate (cp_map, &flush_respect, NULL);
  1601. if (NULL == tc)
  1602. return;
  1603. if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
  1604. return;
  1605. GNUNET_SCHEDULER_add_delayed_with_priority (RESPECT_FLUSH_FREQ,
  1606. GNUNET_SCHEDULER_PRIORITY_HIGH,
  1607. &cron_flush_respect, NULL);
  1608. }
  1609. /**
  1610. * Initialize peer management subsystem.
  1611. */
  1612. void
  1613. GSF_connected_peer_init_ ()
  1614. {
  1615. cp_map = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES);
  1616. peerstore = GNUNET_PEERSTORE_connect (GSF_cfg);
  1617. GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
  1618. &cron_flush_respect, NULL);
  1619. }
  1620. /**
  1621. * Iterator to free peer entries.
  1622. *
  1623. * @param cls closure, unused
  1624. * @param key current key code
  1625. * @param value value in the hash map (peer entry)
  1626. * @return #GNUNET_YES (we should continue to iterate)
  1627. */
  1628. static int
  1629. clean_peer (void *cls,
  1630. const struct GNUNET_PeerIdentity *key,
  1631. void *value)
  1632. {
  1633. GSF_peer_disconnect_handler_ (NULL, key);
  1634. return GNUNET_YES;
  1635. }
  1636. /**
  1637. * Shutdown peer management subsystem.
  1638. */
  1639. void
  1640. GSF_connected_peer_done_ ()
  1641. {
  1642. cron_flush_respect (NULL, NULL);
  1643. GNUNET_CONTAINER_multipeermap_iterate (cp_map, &clean_peer, NULL);
  1644. GNUNET_CONTAINER_multipeermap_destroy (cp_map);
  1645. cp_map = NULL;
  1646. GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_YES);
  1647. }
  1648. /**
  1649. * Iterator to remove references to LC entry.
  1650. *
  1651. * @param cls the 'struct GSF_LocalClient*' to look for
  1652. * @param key current key code
  1653. * @param value value in the hash map (peer entry)
  1654. * @return #GNUNET_YES (we should continue to iterate)
  1655. */
  1656. static int
  1657. clean_local_client (void *cls,
  1658. const struct GNUNET_PeerIdentity *key,
  1659. void *value)
  1660. {
  1661. const struct GSF_LocalClient *lc = cls;
  1662. struct GSF_ConnectedPeer *cp = value;
  1663. unsigned int i;
  1664. for (i = 0; i < CS2P_SUCCESS_LIST_SIZE; i++)
  1665. if (cp->ppd.last_client_replies[i] == lc)
  1666. cp->ppd.last_client_replies[i] = NULL;
  1667. return GNUNET_YES;
  1668. }
  1669. /**
  1670. * Notification that a local client disconnected. Clean up all of our
  1671. * references to the given handle.
  1672. *
  1673. * @param lc handle to the local client (henceforth invalid)
  1674. */
  1675. void
  1676. GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
  1677. {
  1678. if (NULL == cp_map)
  1679. return; /* already cleaned up */
  1680. GNUNET_CONTAINER_multipeermap_iterate (cp_map, &clean_local_client,
  1681. (void *) lc);
  1682. }
  1683. /* end of gnunet-service-fs_cp.c */