gnunet-service-fs_push.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677
  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2011, 2016 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file fs/gnunet-service-fs_push.c
  18. * @brief API to push content from our datastore to other peers
  19. * ('anonymous'-content P2P migration)
  20. * @author Christian Grothoff
  21. */
  22. #include "platform.h"
  23. #include "gnunet-service-fs.h"
  24. #include "gnunet-service-fs_cp.h"
  25. #include "gnunet-service-fs_indexing.h"
  26. #include "gnunet-service-fs_push.h"
  27. /**
  28. * Maximum number of blocks we keep in memory for migration.
  29. */
  30. #define MAX_MIGRATION_QUEUE 8
  31. /**
  32. * Blocks are at most migrated to this number of peers
  33. * plus one, each time they are fetched from the database.
  34. */
  35. #define MIGRATION_LIST_SIZE 2
  36. /**
  37. * How long must content remain valid for us to consider it for migration?
  38. * If content will expire too soon, there is clearly no point in pushing
  39. * it to other peers. This value gives the threshold for migration. Note
  40. * that if this value is increased, the migration testcase may need to be
  41. * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
  42. */
  43. #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply ( \
  44. GNUNET_TIME_UNIT_MINUTES, 30)
  45. /**
  46. * Block that is ready for migration to other peers. Actual data is at the end of the block.
  47. */
  48. struct MigrationReadyBlock
  49. {
  50. /**
  51. * This is a doubly-linked list.
  52. */
  53. struct MigrationReadyBlock *next;
  54. /**
  55. * This is a doubly-linked list.
  56. */
  57. struct MigrationReadyBlock *prev;
  58. /**
  59. * Query for the block.
  60. */
  61. struct GNUNET_HashCode query;
  62. /**
  63. * When does this block expire?
  64. */
  65. struct GNUNET_TIME_Absolute expiration;
  66. /**
  67. * Peers we already forwarded this
  68. * block to. Zero for empty entries.
  69. */
  70. GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
  71. /**
  72. * Size of the block.
  73. */
  74. size_t size;
  75. /**
  76. * Number of targets already used.
  77. */
  78. unsigned int used_targets;
  79. /**
  80. * Type of the block.
  81. */
  82. enum GNUNET_BLOCK_Type type;
  83. };
  84. /**
  85. * Information about a peer waiting for migratable data.
  86. */
  87. struct MigrationReadyPeer
  88. {
  89. /**
  90. * This is a doubly-linked list.
  91. */
  92. struct MigrationReadyPeer *next;
  93. /**
  94. * This is a doubly-linked list.
  95. */
  96. struct MigrationReadyPeer *prev;
  97. /**
  98. * Handle to peer.
  99. */
  100. struct GSF_ConnectedPeer *peer;
  101. /**
  102. * Envelope of the currently pushed message.
  103. */
  104. struct GNUNET_MQ_Envelope *env;
  105. };
  106. /**
  107. * Head of linked list of blocks that can be migrated.
  108. */
  109. static struct MigrationReadyBlock *mig_head;
  110. /**
  111. * Tail of linked list of blocks that can be migrated.
  112. */
  113. static struct MigrationReadyBlock *mig_tail;
  114. /**
  115. * Head of linked list of peers.
  116. */
  117. static struct MigrationReadyPeer *peer_head;
  118. /**
  119. * Tail of linked list of peers.
  120. */
  121. static struct MigrationReadyPeer *peer_tail;
  122. /**
  123. * Request to datastore for migration (or NULL).
  124. */
  125. static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
  126. /**
  127. * ID of task that collects blocks for migration.
  128. */
  129. static struct GNUNET_SCHEDULER_Task *mig_task;
  130. /**
  131. * What is the maximum frequency at which we are allowed to
  132. * poll the datastore for migration content?
  133. */
  134. static struct GNUNET_TIME_Relative min_migration_delay;
  135. /**
  136. * Size of the doubly-linked list of migration blocks.
  137. */
  138. static unsigned int mig_size;
  139. /**
  140. * Is this module enabled?
  141. */
  142. static int enabled;
  143. /**
  144. * Did we find anything in the datastore?
  145. */
  146. static int value_found;
  147. /**
  148. * Delete the given migration block.
  149. *
  150. * @param mb block to delete
  151. */
  152. static void
  153. delete_migration_block (struct MigrationReadyBlock *mb)
  154. {
  155. GNUNET_CONTAINER_DLL_remove (mig_head,
  156. mig_tail,
  157. mb);
  158. GNUNET_PEER_decrement_rcs (mb->target_list,
  159. MIGRATION_LIST_SIZE);
  160. mig_size--;
  161. GNUNET_free (mb);
  162. }
  163. /**
  164. * Find content for migration to this peer.
  165. *
  166. * @param cls a `struct MigrationReadyPeer *`
  167. */
  168. static void
  169. find_content (void *cls);
  170. /**
  171. * Send the given block to the given peer.
  172. *
  173. * @param peer target peer
  174. * @param block the block
  175. * @return #GNUNET_YES if the block was deleted (!)
  176. */
  177. static int
  178. transmit_content (struct MigrationReadyPeer *mrp,
  179. struct MigrationReadyBlock *block)
  180. {
  181. struct PutMessage *msg;
  182. unsigned int i;
  183. struct GSF_PeerPerformanceData *ppd;
  184. int ret;
  185. ppd = GSF_get_peer_performance_data_ (mrp->peer);
  186. GNUNET_assert (NULL == mrp->env);
  187. mrp->env = GNUNET_MQ_msg_extra (msg,
  188. block->size,
  189. GNUNET_MESSAGE_TYPE_FS_PUT);
  190. msg->type = htonl (block->type);
  191. msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
  192. GNUNET_memcpy (&msg[1],
  193. &block[1],
  194. block->size);
  195. for (i = 0; i < MIGRATION_LIST_SIZE; i++)
  196. {
  197. if (block->target_list[i] == 0)
  198. {
  199. block->target_list[i] = ppd->pid;
  200. GNUNET_PEER_change_rc (block->target_list[i],
  201. 1);
  202. break;
  203. }
  204. }
  205. if (MIGRATION_LIST_SIZE == i)
  206. {
  207. delete_migration_block (block);
  208. ret = GNUNET_YES;
  209. }
  210. else
  211. {
  212. ret = GNUNET_NO;
  213. }
  214. GNUNET_MQ_notify_sent (mrp->env,
  215. &find_content,
  216. mrp);
  217. GSF_peer_transmit_ (mrp->peer,
  218. GNUNET_NO,
  219. 0 /* priority */,
  220. mrp->env);
  221. return ret;
  222. }
  223. /**
  224. * Count the number of peers this block has
  225. * already been forwarded to.
  226. *
  227. * @param block the block
  228. * @return number of times block was forwarded
  229. */
  230. static unsigned int
  231. count_targets (struct MigrationReadyBlock *block)
  232. {
  233. unsigned int i;
  234. for (i = 0; i < MIGRATION_LIST_SIZE; i++)
  235. if (block->target_list[i] == 0)
  236. return i;
  237. return i;
  238. }
  239. /**
  240. * Check if sending this block to this peer would
  241. * be a good idea.
  242. *
  243. * @param mrp target peer
  244. * @param block the block
  245. * @return score (>= 0: feasible, negative: infeasible)
  246. */
  247. static long
  248. score_content (struct MigrationReadyPeer *mrp,
  249. struct MigrationReadyBlock *block)
  250. {
  251. unsigned int i;
  252. struct GSF_PeerPerformanceData *ppd;
  253. struct GNUNET_PeerIdentity id;
  254. struct GNUNET_HashCode hc;
  255. uint32_t dist;
  256. ppd = GSF_get_peer_performance_data_ (mrp->peer);
  257. for (i = 0; i < MIGRATION_LIST_SIZE; i++)
  258. if (block->target_list[i] == ppd->pid)
  259. return -1;
  260. GNUNET_assert (0 != ppd->pid);
  261. GNUNET_PEER_resolve (ppd->pid,
  262. &id);
  263. GNUNET_CRYPTO_hash (&id,
  264. sizeof(struct GNUNET_PeerIdentity),
  265. &hc);
  266. dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query,
  267. &hc);
  268. /* closer distance, higher score: */
  269. return UINT32_MAX - dist;
  270. }
  271. /**
  272. * If the migration task is not currently running, consider
  273. * (re)scheduling it with the appropriate delay.
  274. */
  275. static void
  276. consider_gathering (void);
  277. /**
  278. * Find content for migration to this peer.
  279. *
  280. * @param cls peer to find content for
  281. */
  282. static void
  283. find_content (void *cls)
  284. {
  285. struct MigrationReadyPeer *mrp = cls;
  286. struct MigrationReadyBlock *pos;
  287. long score;
  288. long best_score;
  289. struct MigrationReadyBlock *best;
  290. mrp->env = NULL;
  291. best = NULL;
  292. best_score = -1;
  293. pos = mig_head;
  294. while (NULL != pos)
  295. {
  296. score = score_content (mrp, pos);
  297. if (score > best_score)
  298. {
  299. best_score = score;
  300. best = pos;
  301. }
  302. pos = pos->next;
  303. }
  304. if (NULL == best)
  305. {
  306. if (mig_size < MAX_MIGRATION_QUEUE)
  307. {
  308. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  309. "No content found for pushing, waiting for queue to fill\n");
  310. return; /* will fill up eventually... */
  311. }
  312. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  313. "No suitable content found, purging content from full queue\n");
  314. /* failed to find migration target AND
  315. * queue is full, purge most-forwarded
  316. * block from queue to make room for more */
  317. pos = mig_head;
  318. while (NULL != pos)
  319. {
  320. score = count_targets (pos);
  321. if (score >= best_score)
  322. {
  323. best_score = score;
  324. best = pos;
  325. }
  326. pos = pos->next;
  327. }
  328. GNUNET_assert (NULL != best);
  329. delete_migration_block (best);
  330. consider_gathering ();
  331. return;
  332. }
  333. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  334. "Preparing to push best content to peer\n");
  335. transmit_content (mrp,
  336. best);
  337. }
  338. /**
  339. * Task that is run periodically to obtain blocks for content
  340. * migration
  341. *
  342. * @param cls unused
  343. */
  344. static void
  345. gather_migration_blocks (void *cls);
  346. /**
  347. * If the migration task is not currently running, consider
  348. * (re)scheduling it with the appropriate delay.
  349. */
  350. static void
  351. consider_gathering ()
  352. {
  353. struct GNUNET_TIME_Relative delay;
  354. if (NULL == GSF_dsh)
  355. return;
  356. if (NULL != mig_qe)
  357. return;
  358. if (NULL != mig_task)
  359. return;
  360. if (mig_size >= MAX_MIGRATION_QUEUE)
  361. return;
  362. delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
  363. mig_size);
  364. delay = GNUNET_TIME_relative_divide (delay,
  365. MAX_MIGRATION_QUEUE);
  366. delay = GNUNET_TIME_relative_max (delay,
  367. min_migration_delay);
  368. if (GNUNET_NO == value_found)
  369. {
  370. /* wait at least 5s if the datastore is empty */
  371. delay = GNUNET_TIME_relative_max (delay,
  372. GNUNET_TIME_relative_multiply (
  373. GNUNET_TIME_UNIT_SECONDS,
  374. 5));
  375. }
  376. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  377. "Scheduling gathering task (queue size: %u)\n",
  378. mig_size);
  379. mig_task = GNUNET_SCHEDULER_add_delayed (delay,
  380. &gather_migration_blocks,
  381. NULL);
  382. }
  383. /**
  384. * Process content offered for migration.
  385. *
  386. * @param cls closure
  387. * @param key key for the content
  388. * @param size number of bytes in data
  389. * @param data content stored
  390. * @param type type of the content
  391. * @param priority priority of the content
  392. * @param anonymity anonymity-level for the content
  393. * @param replication replication-level for the content
  394. * @param expiration expiration time for the content
  395. * @param uid unique identifier for the datum;
  396. * maybe 0 if no unique identifier is available
  397. */
  398. static void
  399. process_migration_content (void *cls,
  400. const struct GNUNET_HashCode *key,
  401. size_t size,
  402. const void *data,
  403. enum GNUNET_BLOCK_Type type,
  404. uint32_t priority,
  405. uint32_t anonymity,
  406. uint32_t replication,
  407. struct GNUNET_TIME_Absolute expiration,
  408. uint64_t uid)
  409. {
  410. struct MigrationReadyBlock *mb;
  411. struct MigrationReadyPeer *pos;
  412. mig_qe = NULL;
  413. if (NULL == key)
  414. {
  415. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  416. "No content found for migration...\n");
  417. consider_gathering ();
  418. return;
  419. }
  420. value_found = GNUNET_YES;
  421. if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us <
  422. MIN_MIGRATION_CONTENT_LIFETIME.rel_value_us)
  423. {
  424. /* content will expire soon, don't bother */
  425. consider_gathering ();
  426. return;
  427. }
  428. if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
  429. {
  430. if (GNUNET_OK !=
  431. GNUNET_FS_handle_on_demand_block (key,
  432. size,
  433. data,
  434. type,
  435. priority,
  436. anonymity,
  437. replication,
  438. expiration,
  439. uid,
  440. &process_migration_content,
  441. NULL))
  442. consider_gathering ();
  443. return;
  444. }
  445. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  446. "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
  447. GNUNET_h2s (key),
  448. type, mig_size + 1,
  449. MAX_MIGRATION_QUEUE);
  450. mb = GNUNET_malloc (sizeof(struct MigrationReadyBlock) + size);
  451. mb->query = *key;
  452. mb->expiration = expiration;
  453. mb->size = size;
  454. mb->type = type;
  455. GNUNET_memcpy (&mb[1], data, size);
  456. GNUNET_CONTAINER_DLL_insert_after (mig_head,
  457. mig_tail,
  458. mig_tail,
  459. mb);
  460. mig_size++;
  461. for (pos = peer_head; NULL != pos; pos = pos->next)
  462. {
  463. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  464. "Preparing to push best content to peer %s\n",
  465. GNUNET_i2s (GSF_connected_peer_get_identity2_ (pos->peer)));
  466. if ((NULL == pos->env) &&
  467. (GNUNET_YES == transmit_content (pos,
  468. mb)))
  469. {
  470. break; /* 'mb' was freed! */
  471. }
  472. }
  473. consider_gathering ();
  474. }
  475. /**
  476. * Task that is run periodically to obtain blocks for content
  477. * migration
  478. *
  479. * @param cls unused
  480. */
  481. static void
  482. gather_migration_blocks (void *cls)
  483. {
  484. mig_task = NULL;
  485. if (mig_size >= MAX_MIGRATION_QUEUE)
  486. return;
  487. if (NULL == GSF_dsh)
  488. return;
  489. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  490. "Asking datastore for content for replication (queue size: %u)\n",
  491. mig_size);
  492. value_found = GNUNET_NO;
  493. mig_qe = GNUNET_DATASTORE_get_for_replication (GSF_dsh,
  494. 0,
  495. UINT_MAX,
  496. &process_migration_content,
  497. NULL);
  498. if (NULL == mig_qe)
  499. consider_gathering ();
  500. }
  501. /**
  502. * A peer connected to us. Start pushing content
  503. * to this peer.
  504. *
  505. * @param peer handle for the peer that connected
  506. */
  507. void
  508. GSF_push_start_ (struct GSF_ConnectedPeer *peer)
  509. {
  510. struct MigrationReadyPeer *mrp;
  511. if (GNUNET_YES != enabled)
  512. return;
  513. for (mrp = peer_head; NULL != mrp; mrp = mrp->next)
  514. if (mrp->peer == peer)
  515. break;
  516. if (NULL != mrp)
  517. {
  518. /* same peer added twice, must not happen */
  519. GNUNET_break (0);
  520. return;
  521. }
  522. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  523. "Adding peer %s to list for pushing\n",
  524. GNUNET_i2s (GSF_connected_peer_get_identity2_ (peer)));
  525. mrp = GNUNET_new (struct MigrationReadyPeer);
  526. mrp->peer = peer;
  527. find_content (mrp);
  528. GNUNET_CONTAINER_DLL_insert (peer_head,
  529. peer_tail,
  530. mrp);
  531. }
  532. /**
  533. * A peer disconnected from us. Stop pushing content
  534. * to this peer.
  535. *
  536. * @param peer handle for the peer that disconnected
  537. */
  538. void
  539. GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
  540. {
  541. struct MigrationReadyPeer *pos;
  542. for (pos = peer_head; NULL != pos; pos = pos->next)
  543. if (pos->peer == peer)
  544. break;
  545. if (NULL == pos)
  546. return;
  547. if (NULL != pos->env)
  548. GNUNET_MQ_send_cancel (pos->env);
  549. GNUNET_CONTAINER_DLL_remove (peer_head,
  550. peer_tail,
  551. pos);
  552. GNUNET_free (pos);
  553. }
  554. /**
  555. * Setup the module.
  556. */
  557. void
  558. GSF_push_init_ ()
  559. {
  560. enabled =
  561. GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg,
  562. "FS",
  563. "CONTENT_PUSHING");
  564. if (GNUNET_YES != enabled)
  565. return;
  566. if (GNUNET_OK !=
  567. GNUNET_CONFIGURATION_get_value_time (GSF_cfg,
  568. "fs",
  569. "MIN_MIGRATION_DELAY",
  570. &min_migration_delay))
  571. {
  572. GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING,
  573. "fs",
  574. "MIN_MIGRATION_DELAY",
  575. _ ("time required, content pushing disabled"));
  576. return;
  577. }
  578. consider_gathering ();
  579. }
  580. /**
  581. * Shutdown the module.
  582. */
  583. void
  584. GSF_push_done_ ()
  585. {
  586. if (NULL != mig_task)
  587. {
  588. GNUNET_SCHEDULER_cancel (mig_task);
  589. mig_task = NULL;
  590. }
  591. if (NULL != mig_qe)
  592. {
  593. GNUNET_DATASTORE_cancel (mig_qe);
  594. mig_qe = NULL;
  595. }
  596. while (NULL != mig_head)
  597. delete_migration_block (mig_head);
  598. GNUNET_assert (0 == mig_size);
  599. }
  600. /* end of gnunet-service-fs_push.c */