gnunet-service-fs_put.c 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. /*
  2. This file is part of GNUnet.
  3. (C) 2011 Christian Grothoff (and other contributing authors)
  4. GNUnet is free software; you can redistribute it and/or modify
  5. it under the terms of the GNU General Public License as published
  6. by the Free Software Foundation; either version 3, or (at your
  7. option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. General Public License for more details.
  12. You should have received a copy of the GNU General Public License
  13. along with GNUnet; see the file COPYING. If not, write to the
  14. Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  15. Boston, MA 02111-1307, USA.
  16. */
  17. /**
  18. * @file fs/gnunet-service-fs_put.c
  19. * @brief API to PUT zero-anonymity index data from our datastore into the DHT
  20. * @author Christian Grothoff
  21. */
  22. #include "platform.h"
  23. #include "gnunet-service-fs.h"
  24. #include "gnunet-service-fs_put.h"
  25. /**
  26. * How often do we at most PUT content into the DHT?
  27. */
  28. #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
  29. /**
  30. * How many replicas do we try to create per PUT?
  31. */
  32. #define DEFAULT_PUT_REPLICATION 5
  33. /**
  34. * Context for each zero-anonymity iterator.
  35. */
  36. struct PutOperator
  37. {
  38. /**
  39. * Request to datastore for DHT PUTs (or NULL).
  40. */
  41. struct GNUNET_DATASTORE_QueueEntry *dht_qe;
  42. /**
  43. * Type we request from the datastore.
  44. */
  45. enum GNUNET_BLOCK_Type dht_put_type;
  46. /**
  47. * Handle to PUT operation.
  48. */
  49. struct GNUNET_DHT_PutHandle *dht_put;
  50. /**
  51. * ID of task that collects blocks for DHT PUTs.
  52. */
  53. GNUNET_SCHEDULER_TaskIdentifier dht_task;
  54. /**
  55. * How many entires with zero anonymity of our type do we currently
  56. * estimate to have in the database?
  57. */
  58. uint64_t zero_anonymity_count_estimate;
  59. /**
  60. * Current offset when iterating the database.
  61. */
  62. uint64_t current_offset;
  63. };
  64. /**
  65. * ANY-terminated list of our operators (one per type
  66. * of block that we're putting into the DHT).
  67. */
  68. static struct PutOperator operators[] = {
  69. {NULL, GNUNET_BLOCK_TYPE_FS_UBLOCK, 0, 0, 0},
  70. {NULL, GNUNET_BLOCK_TYPE_ANY, 0, 0, 0}
  71. };
  72. /**
  73. * Task that is run periodically to obtain blocks for DHT PUTs.
  74. *
  75. * @param cls type of blocks to gather
  76. * @param tc scheduler context (unused)
  77. */
  78. static void
  79. gather_dht_put_blocks (void *cls,
  80. const struct GNUNET_SCHEDULER_TaskContext *tc);
  81. /**
  82. * Calculate when to run the next PUT operation and schedule it.
  83. *
  84. * @param po put operator to schedule
  85. */
  86. static void
  87. schedule_next_put (struct PutOperator *po)
  88. {
  89. struct GNUNET_TIME_Relative delay;
  90. if (po->zero_anonymity_count_estimate > 0)
  91. {
  92. delay =
  93. GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
  94. po->zero_anonymity_count_estimate);
  95. delay = GNUNET_TIME_relative_min (delay, MAX_DHT_PUT_FREQ);
  96. }
  97. else
  98. {
  99. /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
  100. * (hopefully) appear */
  101. delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
  102. }
  103. po->dht_task =
  104. GNUNET_SCHEDULER_add_delayed (delay, &gather_dht_put_blocks, po);
  105. }
  106. /**
  107. * Continuation called after DHT PUT operation has finished.
  108. *
  109. * @param cls type of blocks to gather
  110. * @param success GNUNET_OK if the PUT was transmitted,
  111. * GNUNET_NO on timeout,
  112. * GNUNET_SYSERR on disconnect from service
  113. * after the PUT message was transmitted
  114. * (so we don't know if it was received or not)
  115. */
  116. static void
  117. delay_dht_put_blocks (void *cls, int success)
  118. {
  119. struct PutOperator *po = cls;
  120. po->dht_put = NULL;
  121. schedule_next_put (po);
  122. }
  123. /**
  124. * Task that is run periodically to obtain blocks for DHT PUTs.
  125. *
  126. * @param cls type of blocks to gather
  127. * @param tc scheduler context
  128. */
  129. static void
  130. delay_dht_put_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  131. {
  132. struct PutOperator *po = cls;
  133. po->dht_task = GNUNET_SCHEDULER_NO_TASK;
  134. schedule_next_put (po);
  135. }
  136. /**
  137. * Store content in DHT.
  138. *
  139. * @param cls closure
  140. * @param key key for the content
  141. * @param size number of bytes in data
  142. * @param data content stored
  143. * @param type type of the content
  144. * @param priority priority of the content
  145. * @param anonymity anonymity-level for the content
  146. * @param expiration expiration time for the content
  147. * @param uid unique identifier for the datum;
  148. * maybe 0 if no unique identifier is available
  149. */
  150. static void
  151. process_dht_put_content (void *cls,
  152. const struct GNUNET_HashCode * key,
  153. size_t size,
  154. const void *data,
  155. enum GNUNET_BLOCK_Type type,
  156. uint32_t priority, uint32_t anonymity,
  157. struct GNUNET_TIME_Absolute expiration, uint64_t uid)
  158. {
  159. struct PutOperator *po = cls;
  160. po->dht_qe = NULL;
  161. if (key == NULL)
  162. {
  163. po->zero_anonymity_count_estimate = po->current_offset - 1;
  164. po->current_offset = 0;
  165. po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po);
  166. return;
  167. }
  168. po->zero_anonymity_count_estimate =
  169. GNUNET_MAX (po->current_offset, po->zero_anonymity_count_estimate);
  170. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  171. "Retrieved block `%s' of type %u for DHT PUT\n", GNUNET_h2s (key),
  172. type);
  173. po->dht_put = GNUNET_DHT_put (GSF_dht, key, DEFAULT_PUT_REPLICATION,
  174. GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, type, size, data,
  175. expiration, GNUNET_TIME_UNIT_FOREVER_REL,
  176. &delay_dht_put_blocks, po);
  177. }
  178. /**
  179. * Task that is run periodically to obtain blocks for DHT PUTs.
  180. *
  181. * @param cls type of blocks to gather
  182. * @param tc scheduler context (unused)
  183. */
  184. static void
  185. gather_dht_put_blocks (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  186. {
  187. struct PutOperator *po = cls;
  188. po->dht_task = GNUNET_SCHEDULER_NO_TASK;
  189. if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
  190. return;
  191. po->dht_qe =
  192. GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh, po->current_offset++, 0,
  193. UINT_MAX,
  194. GNUNET_TIME_UNIT_FOREVER_REL,
  195. po->dht_put_type,
  196. &process_dht_put_content, po);
  197. if (NULL == po->dht_qe)
  198. po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_task, po);
  199. }
  200. /**
  201. * Setup the module.
  202. */
  203. void
  204. GSF_put_init_ ()
  205. {
  206. unsigned int i;
  207. i = 0;
  208. while (operators[i].dht_put_type != GNUNET_BLOCK_TYPE_ANY)
  209. {
  210. operators[i].dht_task =
  211. GNUNET_SCHEDULER_add_now (&gather_dht_put_blocks, &operators[i]);
  212. i++;
  213. }
  214. }
  215. /**
  216. * Shutdown the module.
  217. */
  218. void
  219. GSF_put_done_ ()
  220. {
  221. struct PutOperator *po;
  222. unsigned int i;
  223. i = 0;
  224. while ((po = &operators[i])->dht_put_type != GNUNET_BLOCK_TYPE_ANY)
  225. {
  226. if (GNUNET_SCHEDULER_NO_TASK != po->dht_task)
  227. {
  228. GNUNET_SCHEDULER_cancel (po->dht_task);
  229. po->dht_task = GNUNET_SCHEDULER_NO_TASK;
  230. }
  231. if (NULL != po->dht_put)
  232. {
  233. GNUNET_DHT_put_cancel (po->dht_put);
  234. po->dht_put = NULL;
  235. }
  236. if (NULL != po->dht_qe)
  237. {
  238. GNUNET_DATASTORE_cancel (po->dht_qe);
  239. po->dht_qe = NULL;
  240. }
  241. i++;
  242. }
  243. }
  244. /* end of gnunet-service-fs_put.c */