gnunet-service-fs_push.c 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674
  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2011, 2016 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file fs/gnunet-service-fs_push.c
  18. * @brief API to push content from our datastore to other peers
  19. * ('anonymous'-content P2P migration)
  20. * @author Christian Grothoff
  21. */
  22. #include "platform.h"
  23. #include "gnunet-service-fs.h"
  24. #include "gnunet-service-fs_cp.h"
  25. #include "gnunet-service-fs_indexing.h"
  26. #include "gnunet-service-fs_push.h"
  27. /**
  28. * Maximum number of blocks we keep in memory for migration.
  29. */
  30. #define MAX_MIGRATION_QUEUE 8
  31. /**
  32. * Blocks are at most migrated to this number of peers
  33. * plus one, each time they are fetched from the database.
  34. */
  35. #define MIGRATION_LIST_SIZE 2
  36. /**
  37. * How long must content remain valid for us to consider it for migration?
  38. * If content will expire too soon, there is clearly no point in pushing
  39. * it to other peers. This value gives the threshold for migration. Note
  40. * that if this value is increased, the migration testcase may need to be
  41. * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
  42. */
  43. #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30)
  44. /**
  45. * Block that is ready for migration to other peers. Actual data is at the end of the block.
  46. */
  47. struct MigrationReadyBlock
  48. {
  49. /**
  50. * This is a doubly-linked list.
  51. */
  52. struct MigrationReadyBlock *next;
  53. /**
  54. * This is a doubly-linked list.
  55. */
  56. struct MigrationReadyBlock *prev;
  57. /**
  58. * Query for the block.
  59. */
  60. struct GNUNET_HashCode query;
  61. /**
  62. * When does this block expire?
  63. */
  64. struct GNUNET_TIME_Absolute expiration;
  65. /**
  66. * Peers we already forwarded this
  67. * block to. Zero for empty entries.
  68. */
  69. GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
  70. /**
  71. * Size of the block.
  72. */
  73. size_t size;
  74. /**
  75. * Number of targets already used.
  76. */
  77. unsigned int used_targets;
  78. /**
  79. * Type of the block.
  80. */
  81. enum GNUNET_BLOCK_Type type;
  82. };
  83. /**
  84. * Information about a peer waiting for migratable data.
  85. */
  86. struct MigrationReadyPeer
  87. {
  88. /**
  89. * This is a doubly-linked list.
  90. */
  91. struct MigrationReadyPeer *next;
  92. /**
  93. * This is a doubly-linked list.
  94. */
  95. struct MigrationReadyPeer *prev;
  96. /**
  97. * Handle to peer.
  98. */
  99. struct GSF_ConnectedPeer *peer;
  100. /**
  101. * Envelope of the currently pushed message.
  102. */
  103. struct GNUNET_MQ_Envelope *env;
  104. };
  105. /**
  106. * Head of linked list of blocks that can be migrated.
  107. */
  108. static struct MigrationReadyBlock *mig_head;
  109. /**
  110. * Tail of linked list of blocks that can be migrated.
  111. */
  112. static struct MigrationReadyBlock *mig_tail;
  113. /**
  114. * Head of linked list of peers.
  115. */
  116. static struct MigrationReadyPeer *peer_head;
  117. /**
  118. * Tail of linked list of peers.
  119. */
  120. static struct MigrationReadyPeer *peer_tail;
  121. /**
  122. * Request to datastore for migration (or NULL).
  123. */
  124. static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
  125. /**
  126. * ID of task that collects blocks for migration.
  127. */
  128. static struct GNUNET_SCHEDULER_Task *mig_task;
  129. /**
  130. * What is the maximum frequency at which we are allowed to
  131. * poll the datastore for migration content?
  132. */
  133. static struct GNUNET_TIME_Relative min_migration_delay;
  134. /**
  135. * Size of the doubly-linked list of migration blocks.
  136. */
  137. static unsigned int mig_size;
  138. /**
  139. * Is this module enabled?
  140. */
  141. static int enabled;
  142. /**
  143. * Did we find anything in the datastore?
  144. */
  145. static int value_found;
  146. /**
  147. * Delete the given migration block.
  148. *
  149. * @param mb block to delete
  150. */
  151. static void
  152. delete_migration_block (struct MigrationReadyBlock *mb)
  153. {
  154. GNUNET_CONTAINER_DLL_remove (mig_head,
  155. mig_tail,
  156. mb);
  157. GNUNET_PEER_decrement_rcs (mb->target_list,
  158. MIGRATION_LIST_SIZE);
  159. mig_size--;
  160. GNUNET_free (mb);
  161. }
  162. /**
  163. * Find content for migration to this peer.
  164. *
  165. * @param cls a `struct MigrationReadyPeer *`
  166. */
  167. static void
  168. find_content (void *cls);
  169. /**
  170. * Send the given block to the given peer.
  171. *
  172. * @param peer target peer
  173. * @param block the block
  174. * @return #GNUNET_YES if the block was deleted (!)
  175. */
  176. static int
  177. transmit_content (struct MigrationReadyPeer *mrp,
  178. struct MigrationReadyBlock *block)
  179. {
  180. struct PutMessage *msg;
  181. unsigned int i;
  182. struct GSF_PeerPerformanceData *ppd;
  183. int ret;
  184. ppd = GSF_get_peer_performance_data_ (mrp->peer);
  185. GNUNET_assert (NULL == mrp->env);
  186. mrp->env = GNUNET_MQ_msg_extra (msg,
  187. block->size,
  188. GNUNET_MESSAGE_TYPE_FS_PUT);
  189. msg->type = htonl (block->type);
  190. msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
  191. GNUNET_memcpy (&msg[1],
  192. &block[1],
  193. block->size);
  194. for (i = 0; i < MIGRATION_LIST_SIZE; i++)
  195. {
  196. if (block->target_list[i] == 0)
  197. {
  198. block->target_list[i] = ppd->pid;
  199. GNUNET_PEER_change_rc (block->target_list[i],
  200. 1);
  201. break;
  202. }
  203. }
  204. if (MIGRATION_LIST_SIZE == i)
  205. {
  206. delete_migration_block (block);
  207. ret = GNUNET_YES;
  208. }
  209. else
  210. {
  211. ret = GNUNET_NO;
  212. }
  213. GNUNET_MQ_notify_sent (mrp->env,
  214. &find_content,
  215. mrp);
  216. GSF_peer_transmit_ (mrp->peer,
  217. GNUNET_NO,
  218. 0 /* priority */ ,
  219. mrp->env);
  220. return ret;
  221. }
  222. /**
  223. * Count the number of peers this block has
  224. * already been forwarded to.
  225. *
  226. * @param block the block
  227. * @return number of times block was forwarded
  228. */
  229. static unsigned int
  230. count_targets (struct MigrationReadyBlock *block)
  231. {
  232. unsigned int i;
  233. for (i = 0; i < MIGRATION_LIST_SIZE; i++)
  234. if (block->target_list[i] == 0)
  235. return i;
  236. return i;
  237. }
  238. /**
  239. * Check if sending this block to this peer would
  240. * be a good idea.
  241. *
  242. * @param mrp target peer
  243. * @param block the block
  244. * @return score (>= 0: feasible, negative: infeasible)
  245. */
  246. static long
  247. score_content (struct MigrationReadyPeer *mrp,
  248. struct MigrationReadyBlock *block)
  249. {
  250. unsigned int i;
  251. struct GSF_PeerPerformanceData *ppd;
  252. struct GNUNET_PeerIdentity id;
  253. struct GNUNET_HashCode hc;
  254. uint32_t dist;
  255. ppd = GSF_get_peer_performance_data_ (mrp->peer);
  256. for (i = 0; i < MIGRATION_LIST_SIZE; i++)
  257. if (block->target_list[i] == ppd->pid)
  258. return -1;
  259. GNUNET_assert (0 != ppd->pid);
  260. GNUNET_PEER_resolve (ppd->pid,
  261. &id);
  262. GNUNET_CRYPTO_hash (&id,
  263. sizeof (struct GNUNET_PeerIdentity),
  264. &hc);
  265. dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query,
  266. &hc);
  267. /* closer distance, higher score: */
  268. return UINT32_MAX - dist;
  269. }
  270. /**
  271. * If the migration task is not currently running, consider
  272. * (re)scheduling it with the appropriate delay.
  273. */
  274. static void
  275. consider_gathering (void);
  276. /**
  277. * Find content for migration to this peer.
  278. *
  279. * @param cls peer to find content for
  280. */
  281. static void
  282. find_content (void *cls)
  283. {
  284. struct MigrationReadyPeer *mrp = cls;
  285. struct MigrationReadyBlock *pos;
  286. long score;
  287. long best_score;
  288. struct MigrationReadyBlock *best;
  289. mrp->env = NULL;
  290. best = NULL;
  291. best_score = -1;
  292. pos = mig_head;
  293. while (NULL != pos)
  294. {
  295. score = score_content (mrp, pos);
  296. if (score > best_score)
  297. {
  298. best_score = score;
  299. best = pos;
  300. }
  301. pos = pos->next;
  302. }
  303. if (NULL == best)
  304. {
  305. if (mig_size < MAX_MIGRATION_QUEUE)
  306. {
  307. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  308. "No content found for pushing, waiting for queue to fill\n");
  309. return; /* will fill up eventually... */
  310. }
  311. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  312. "No suitable content found, purging content from full queue\n");
  313. /* failed to find migration target AND
  314. * queue is full, purge most-forwarded
  315. * block from queue to make room for more */
  316. pos = mig_head;
  317. while (NULL != pos)
  318. {
  319. score = count_targets (pos);
  320. if (score >= best_score)
  321. {
  322. best_score = score;
  323. best = pos;
  324. }
  325. pos = pos->next;
  326. }
  327. GNUNET_assert (NULL != best);
  328. delete_migration_block (best);
  329. consider_gathering ();
  330. return;
  331. }
  332. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  333. "Preparing to push best content to peer\n");
  334. transmit_content (mrp,
  335. best);
  336. }
  337. /**
  338. * Task that is run periodically to obtain blocks for content
  339. * migration
  340. *
  341. * @param cls unused
  342. */
  343. static void
  344. gather_migration_blocks (void *cls);
  345. /**
  346. * If the migration task is not currently running, consider
  347. * (re)scheduling it with the appropriate delay.
  348. */
  349. static void
  350. consider_gathering ()
  351. {
  352. struct GNUNET_TIME_Relative delay;
  353. if (NULL == GSF_dsh)
  354. return;
  355. if (NULL != mig_qe)
  356. return;
  357. if (NULL != mig_task)
  358. return;
  359. if (mig_size >= MAX_MIGRATION_QUEUE)
  360. return;
  361. delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
  362. mig_size);
  363. delay = GNUNET_TIME_relative_divide (delay,
  364. MAX_MIGRATION_QUEUE);
  365. delay = GNUNET_TIME_relative_max (delay,
  366. min_migration_delay);
  367. if (GNUNET_NO == value_found)
  368. {
  369. /* wait at least 5s if the datastore is empty */
  370. delay = GNUNET_TIME_relative_max (delay,
  371. GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
  372. 5));
  373. }
  374. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  375. "Scheduling gathering task (queue size: %u)\n",
  376. mig_size);
  377. mig_task = GNUNET_SCHEDULER_add_delayed (delay,
  378. &gather_migration_blocks,
  379. NULL);
  380. }
  381. /**
  382. * Process content offered for migration.
  383. *
  384. * @param cls closure
  385. * @param key key for the content
  386. * @param size number of bytes in data
  387. * @param data content stored
  388. * @param type type of the content
  389. * @param priority priority of the content
  390. * @param anonymity anonymity-level for the content
  391. * @param replication replication-level for the content
  392. * @param expiration expiration time for the content
  393. * @param uid unique identifier for the datum;
  394. * maybe 0 if no unique identifier is available
  395. */
  396. static void
  397. process_migration_content (void *cls,
  398. const struct GNUNET_HashCode *key,
  399. size_t size,
  400. const void *data,
  401. enum GNUNET_BLOCK_Type type,
  402. uint32_t priority,
  403. uint32_t anonymity,
  404. uint32_t replication,
  405. struct GNUNET_TIME_Absolute expiration,
  406. uint64_t uid)
  407. {
  408. struct MigrationReadyBlock *mb;
  409. struct MigrationReadyPeer *pos;
  410. mig_qe = NULL;
  411. if (NULL == key)
  412. {
  413. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  414. "No content found for migration...\n");
  415. consider_gathering ();
  416. return;
  417. }
  418. value_found = GNUNET_YES;
  419. if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us <
  420. MIN_MIGRATION_CONTENT_LIFETIME.rel_value_us)
  421. {
  422. /* content will expire soon, don't bother */
  423. consider_gathering ();
  424. return;
  425. }
  426. if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
  427. {
  428. if (GNUNET_OK !=
  429. GNUNET_FS_handle_on_demand_block (key,
  430. size,
  431. data,
  432. type,
  433. priority,
  434. anonymity,
  435. replication,
  436. expiration,
  437. uid,
  438. &process_migration_content,
  439. NULL))
  440. consider_gathering ();
  441. return;
  442. }
  443. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  444. "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
  445. GNUNET_h2s (key),
  446. type, mig_size + 1,
  447. MAX_MIGRATION_QUEUE);
  448. mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
  449. mb->query = *key;
  450. mb->expiration = expiration;
  451. mb->size = size;
  452. mb->type = type;
  453. GNUNET_memcpy (&mb[1], data, size);
  454. GNUNET_CONTAINER_DLL_insert_after (mig_head,
  455. mig_tail,
  456. mig_tail,
  457. mb);
  458. mig_size++;
  459. for (pos = peer_head; NULL != pos; pos = pos->next)
  460. {
  461. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  462. "Preparing to push best content to peer %s\n",
  463. GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer)));
  464. if ( (NULL == pos->env) &&
  465. (GNUNET_YES == transmit_content (pos,
  466. mb)) ) {
  467. break; /* 'mb' was freed! */
  468. }
  469. }
  470. consider_gathering ();
  471. }
  472. /**
  473. * Task that is run periodically to obtain blocks for content
  474. * migration
  475. *
  476. * @param cls unused
  477. */
  478. static void
  479. gather_migration_blocks (void *cls)
  480. {
  481. mig_task = NULL;
  482. if (mig_size >= MAX_MIGRATION_QUEUE)
  483. return;
  484. if (NULL == GSF_dsh)
  485. return;
  486. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  487. "Asking datastore for content for replication (queue size: %u)\n",
  488. mig_size);
  489. value_found = GNUNET_NO;
  490. mig_qe = GNUNET_DATASTORE_get_for_replication (GSF_dsh,
  491. 0,
  492. UINT_MAX,
  493. &process_migration_content,
  494. NULL);
  495. if (NULL == mig_qe)
  496. consider_gathering ();
  497. }
  498. /**
  499. * A peer connected to us. Start pushing content
  500. * to this peer.
  501. *
  502. * @param peer handle for the peer that connected
  503. */
  504. void
  505. GSF_push_start_ (struct GSF_ConnectedPeer *peer)
  506. {
  507. struct MigrationReadyPeer *mrp;
  508. if (GNUNET_YES != enabled)
  509. return;
  510. for (mrp = peer_head; NULL != mrp; mrp = mrp->next)
  511. if (mrp->peer == peer)
  512. break;
  513. if (NULL != mrp)
  514. {
  515. /* same peer added twice, must not happen */
  516. GNUNET_break (0);
  517. return;
  518. }
  519. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  520. "Adding peer %s to list for pushing\n",
  521. GNUNET_i2s (GSF_connected_peer_get_identity2_(peer)));
  522. mrp = GNUNET_new (struct MigrationReadyPeer);
  523. mrp->peer = peer;
  524. find_content (mrp);
  525. GNUNET_CONTAINER_DLL_insert (peer_head,
  526. peer_tail,
  527. mrp);
  528. }
  529. /**
  530. * A peer disconnected from us. Stop pushing content
  531. * to this peer.
  532. *
  533. * @param peer handle for the peer that disconnected
  534. */
  535. void
  536. GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
  537. {
  538. struct MigrationReadyPeer *pos;
  539. for (pos = peer_head; NULL != pos; pos = pos->next)
  540. if (pos->peer == peer)
  541. break;
  542. if (NULL == pos)
  543. return;
  544. if (NULL != pos->env)
  545. GNUNET_MQ_send_cancel (pos->env);
  546. GNUNET_CONTAINER_DLL_remove (peer_head,
  547. peer_tail,
  548. pos);
  549. GNUNET_free (pos);
  550. }
  551. /**
  552. * Setup the module.
  553. */
  554. void
  555. GSF_push_init_ ()
  556. {
  557. enabled =
  558. GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg,
  559. "FS",
  560. "CONTENT_PUSHING");
  561. if (GNUNET_YES != enabled)
  562. return;
  563. if (GNUNET_OK !=
  564. GNUNET_CONFIGURATION_get_value_time (GSF_cfg,
  565. "fs",
  566. "MIN_MIGRATION_DELAY",
  567. &min_migration_delay))
  568. {
  569. GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING,
  570. "fs",
  571. "MIN_MIGRATION_DELAY",
  572. _("time required, content pushing disabled"));
  573. return;
  574. }
  575. consider_gathering ();
  576. }
  577. /**
  578. * Shutdown the module.
  579. */
  580. void
  581. GSF_push_done_ ()
  582. {
  583. if (NULL != mig_task)
  584. {
  585. GNUNET_SCHEDULER_cancel (mig_task);
  586. mig_task = NULL;
  587. }
  588. if (NULL != mig_qe)
  589. {
  590. GNUNET_DATASTORE_cancel (mig_qe);
  591. mig_qe = NULL;
  592. }
  593. while (NULL != mig_head)
  594. delete_migration_block (mig_head);
  595. GNUNET_assert (0 == mig_size);
  596. }
  597. /* end of gnunet-service-fs_push.c */