gnunet-service-fs_put.c 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2011 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file fs/gnunet-service-fs_put.c
  18. * @brief API to PUT zero-anonymity index data from our datastore into the DHT
  19. * @author Christian Grothoff
  20. */
  21. #include "platform.h"
  22. #include "gnunet-service-fs.h"
  23. #include "gnunet-service-fs_put.h"
  24. /**
  25. * How often do we at most PUT content into the DHT?
  26. */
  27. #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply ( \
  28. GNUNET_TIME_UNIT_SECONDS, 5)
  29. /**
  30. * How many replicas do we try to create per PUT?
  31. */
  32. #define DEFAULT_PUT_REPLICATION 5
  33. /**
  34. * Context for each zero-anonymity iterator.
  35. */
  36. struct PutOperator
  37. {
  38. /**
  39. * Request to datastore for DHT PUTs (or NULL).
  40. */
  41. struct GNUNET_DATASTORE_QueueEntry *dht_qe;
  42. /**
  43. * Type we request from the datastore.
  44. */
  45. enum GNUNET_BLOCK_Type dht_put_type;
  46. /**
  47. * Handle to PUT operation.
  48. */
  49. struct GNUNET_DHT_PutHandle *dht_put;
  50. /**
  51. * ID of task that collects blocks for DHT PUTs.
  52. */
  53. struct GNUNET_SCHEDULER_Task *dht_task;
  54. /**
  55. * How many entires with zero anonymity of our type do we currently
  56. * estimate to have in the database?
  57. */
  58. uint64_t zero_anonymity_count_estimate;
  59. /**
  60. * Count of results received from the database.
  61. */
  62. uint64_t result_count;
  63. /**
  64. * Next UID to request when iterating the database.
  65. */
  66. uint64_t next_uid;
  67. };
  68. /**
  69. * ANY-terminated list of our operators (one per type
  70. * of block that we're putting into the DHT).
  71. */
  72. static struct PutOperator operators[] = {
  73. { NULL, GNUNET_BLOCK_TYPE_FS_UBLOCK, 0, 0, 0 },
  74. { NULL, GNUNET_BLOCK_TYPE_ANY, 0, 0, 0 }
  75. };
  76. /**
  77. * Task that is run periodically to obtain blocks for DHT PUTs.
  78. *
  79. * @param cls type of blocks to gather
  80. * @param tc scheduler context (unused)
  81. */
  82. static void
  83. gather_dht_put_blocks (void *cls);
  84. /**
  85. * Calculate when to run the next PUT operation and schedule it.
  86. *
  87. * @param po put operator to schedule
  88. */
  89. static void
  90. schedule_next_put (struct PutOperator *po)
  91. {
  92. struct GNUNET_TIME_Relative delay;
  93. if (po->zero_anonymity_count_estimate > 0)
  94. {
  95. delay =
  96. GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
  97. po->zero_anonymity_count_estimate);
  98. delay = GNUNET_TIME_relative_min (delay, MAX_DHT_PUT_FREQ);
  99. }
  100. else
  101. {
  102. /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
  103. * (hopefully) appear */
  104. delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
  105. }
  106. po->dht_task =
  107. GNUNET_SCHEDULER_add_delayed (delay, &gather_dht_put_blocks, po);
  108. }
  109. /**
  110. * Continuation called after DHT PUT operation has finished.
  111. *
  112. * @param cls type of blocks to gather
  113. */
  114. static void
  115. delay_dht_put_blocks (void *cls)
  116. {
  117. struct PutOperator *po = cls;
  118. po->dht_put = NULL;
  119. schedule_next_put (po);
  120. }
  121. /**
  122. * Task that is run periodically to obtain blocks for DHT PUTs.
  123. *
  124. * @param cls type of blocks to gather
  125. */
  126. static void
  127. delay_dht_put_task (void *cls)
  128. {
  129. struct PutOperator *po = cls;
  130. po->dht_task = NULL;
  131. schedule_next_put (po);
  132. }
  133. /**
  134. * Store content in DHT.
  135. *
  136. * @param cls closure
  137. * @param key key for the content
  138. * @param size number of bytes in data
  139. * @param data content stored
  140. * @param type type of the content
  141. * @param priority priority of the content
  142. * @param anonymity anonymity-level for the content
  143. * @param replication replication-level for the content
  144. * @param expiration expiration time for the content
  145. * @param uid unique identifier for the datum;
  146. * maybe 0 if no unique identifier is available
  147. */
  148. static void
  149. process_dht_put_content (void *cls,
  150. const struct GNUNET_HashCode *key,
  151. size_t size,
  152. const void *data,
  153. enum GNUNET_BLOCK_Type type,
  154. uint32_t priority,
  155. uint32_t anonymity,
  156. uint32_t replication,
  157. struct GNUNET_TIME_Absolute expiration,
  158. uint64_t uid)
  159. {
  160. struct PutOperator *po = cls;
  161. po->dht_qe = NULL;
  162. if (key == NULL)
  163. {
  164. po->zero_anonymity_count_estimate = po->result_count;
  165. po->result_count = 0;
  166. po->next_uid = 0;
  167. po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po);
  168. return;
  169. }
  170. po->result_count++;
  171. po->next_uid = uid + 1;
  172. po->zero_anonymity_count_estimate =
  173. GNUNET_MAX (po->result_count, po->zero_anonymity_count_estimate);
  174. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  175. "Retrieved block `%s' of type %u for DHT PUT\n", GNUNET_h2s (key),
  176. type);
  177. po->dht_put = GNUNET_DHT_put (GSF_dht,
  178. key,
  179. DEFAULT_PUT_REPLICATION,
  180. GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
  181. type,
  182. size,
  183. data,
  184. expiration,
  185. &delay_dht_put_blocks,
  186. po);
  187. }
  188. /**
  189. * Task that is run periodically to obtain blocks for DHT PUTs.
  190. *
  191. * @param cls type of blocks to gather
  192. */
  193. static void
  194. gather_dht_put_blocks (void *cls)
  195. {
  196. struct PutOperator *po = cls;
  197. po->dht_task = NULL;
  198. po->dht_qe =
  199. GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh,
  200. po->next_uid,
  201. 0,
  202. UINT_MAX,
  203. po->dht_put_type,
  204. &process_dht_put_content,
  205. po);
  206. if (NULL == po->dht_qe)
  207. po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po);
  208. }
  209. /**
  210. * Setup the module.
  211. */
  212. void
  213. GSF_put_init_ ()
  214. {
  215. unsigned int i;
  216. i = 0;
  217. while (operators[i].dht_put_type != GNUNET_BLOCK_TYPE_ANY)
  218. {
  219. operators[i].dht_task =
  220. GNUNET_SCHEDULER_add_now (&gather_dht_put_blocks, &operators[i]);
  221. i++;
  222. }
  223. }
  224. /**
  225. * Shutdown the module.
  226. */
  227. void
  228. GSF_put_done_ ()
  229. {
  230. struct PutOperator *po;
  231. unsigned int i;
  232. i = 0;
  233. while ((po = &operators[i])->dht_put_type != GNUNET_BLOCK_TYPE_ANY)
  234. {
  235. if (NULL != po->dht_task)
  236. {
  237. GNUNET_SCHEDULER_cancel (po->dht_task);
  238. po->dht_task = NULL;
  239. }
  240. if (NULL != po->dht_put)
  241. {
  242. GNUNET_DHT_put_cancel (po->dht_put);
  243. po->dht_put = NULL;
  244. }
  245. if (NULL != po->dht_qe)
  246. {
  247. GNUNET_DATASTORE_cancel (po->dht_qe);
  248. po->dht_qe = NULL;
  249. }
  250. i++;
  251. }
  252. }
  253. /* end of gnunet-service-fs_put.c */