rps-sampler_client.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C)
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file rps/gnunet-service-rps_sampler.c
  18. * @brief sampler implementation
  19. * @author Julius Bünger
  20. */
  21. #include "platform.h"
  22. #include "gnunet_util_lib.h"
  23. #include "gnunet_statistics_service.h"
  24. #include "rps.h"
  25. #include "rps-sampler_common.h"
  26. #include "gnunet-service-rps_sampler.h"
  27. #include "gnunet-service-rps_sampler_elem.h"
  28. #include <math.h>
  29. #include <inttypes.h>
  30. #include "rps-test_util.h"
  31. #define LOG(kind, ...) GNUNET_log_from(kind,"rps-sampler",__VA_ARGS__)
  32. // multiple 'clients'?
  33. // TODO check for overflows
  34. // TODO align message structs
  35. // hist_size_init, hist_size_max
  36. /***********************************************************************
  37. * WARNING: This section needs to be reviewed regarding the use of
  38. * functions providing (pseudo)randomness!
  39. ***********************************************************************/
  40. // TODO care about invalid input of the caller (size 0 or less...)
  41. /**
  42. * @brief Callback called each time a new peer was put into the sampler
  43. *
  44. * @param cls A possibly given closure
  45. */
  46. typedef void
  47. (*SamplerNotifyUpdateCB) (void *cls);
  48. /**
  49. * @brief Context for a callback. Contains callback and closure.
  50. *
  51. * Meant to be an entry in an DLL.
  52. */
  53. struct SamplerNotifyUpdateCTX
  54. {
  55. /**
  56. * @brief The Callback to call on updates
  57. */
  58. SamplerNotifyUpdateCB notify_cb;
  59. /**
  60. * @brief The according closure.
  61. */
  62. void *cls;
  63. /**
  64. * @brief Next element in DLL.
  65. */
  66. struct SamplerNotifyUpdateCTX *next;
  67. /**
  68. * @brief Previous element in DLL.
  69. */
  70. struct SamplerNotifyUpdateCTX *prev;
  71. };
  72. /**
  73. * Type of function used to differentiate between modified and not modified
  74. * Sampler.
  75. */
  76. typedef void
  77. (*RPS_get_peers_type) (void *cls);
  78. /**
  79. * Get one random peer out of the sampled peers.
  80. *
  81. * We might want to reinitialise this sampler after giving the
  82. * corrsponding peer to the client.
  83. */
  84. static void
  85. sampler_mod_get_rand_peer (void *cls);
  86. /**
  87. * Closure to _get_n_rand_peers_ready_cb()
  88. */
  89. struct RPS_SamplerRequestHandle
  90. {
  91. /**
  92. * DLL
  93. */
  94. struct RPS_SamplerRequestHandle *next;
  95. struct RPS_SamplerRequestHandle *prev;
  96. /**
  97. * Number of peers we are waiting for.
  98. */
  99. uint32_t num_peers;
  100. /**
  101. * Number of peers we currently have.
  102. */
  103. uint32_t cur_num_peers;
  104. /**
  105. * Pointer to the array holding the ids.
  106. */
  107. struct GNUNET_PeerIdentity *ids;
  108. /**
  109. * Head and tail for the DLL to store the tasks for single requests
  110. */
  111. struct GetPeerCls *gpc_head;
  112. struct GetPeerCls *gpc_tail;
  113. /**
  114. * Sampler.
  115. */
  116. struct RPS_Sampler *sampler;
  117. /**
  118. * Callback to be called when all ids are available.
  119. */
  120. RPS_sampler_n_rand_peers_ready_cb callback;
  121. /**
  122. * Closure given to the callback
  123. */
  124. void *cls;
  125. };
  126. ///**
  127. // * Global sampler variable.
  128. // */
  129. //struct RPS_Sampler *sampler;
  130. /**
  131. * The minimal size for the extended sampler elements.
  132. */
  133. static size_t min_size;
  134. /**
  135. * The maximal size the extended sampler elements should grow to.
  136. */
  137. static size_t max_size;
  138. /**
  139. * The size the extended sampler elements currently have.
  140. */
  141. //static size_t extra_size;
  142. /**
  143. * Inedex to the sampler element that is the next to be returned
  144. */
  145. static uint32_t client_get_index;
  146. /**
  147. * Initialise a modified tuple of sampler elements.
  148. *
  149. * @param init_size the size the sampler is initialised with
  150. * @param max_round_interval maximum time a round takes
  151. * @return a handle to a sampler that consists of sampler elements.
  152. */
  153. struct RPS_Sampler *
  154. RPS_sampler_mod_init (size_t init_size,
  155. struct GNUNET_TIME_Relative max_round_interval)
  156. {
  157. struct RPS_Sampler *sampler;
  158. /* Initialise context around extended sampler */
  159. min_size = 10; // TODO make input to _samplers_init()
  160. max_size = 1000; // TODO make input to _samplers_init()
  161. sampler = GNUNET_new (struct RPS_Sampler);
  162. sampler->max_round_interval = max_round_interval;
  163. sampler->get_peers = sampler_mod_get_rand_peer;
  164. //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity);
  165. //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size);
  166. client_get_index = 0;
  167. //GNUNET_assert (init_size == sampler->sampler_size);
  168. RPS_sampler_resize (sampler, init_size);
  169. return sampler;
  170. }
  171. /**
  172. * @brief Compute the probability that we already observed all peers from a
  173. * biased stream of peer ids.
  174. *
  175. * Deficiency factor:
  176. * As introduced by Brahms: Factor between the number of unique ids in a
  177. * truly random stream and number of unique ids in the gossip stream.
  178. *
  179. * @param num_peers_estim The estimated number of peers in the network
  180. * @param num_peers_observed The number of peers the given element has observed
  181. * @param deficiency_factor A factor that catches the 'bias' of a random stream
  182. * of peer ids
  183. *
  184. * @return The estimated probability
  185. */
  186. static double
  187. prob_observed_n_peers (uint32_t num_peers_estim,
  188. uint32_t num_peers_observed,
  189. double deficiency_factor)
  190. {
  191. uint32_t num_peers = num_peers_estim * (1/deficiency_factor);
  192. uint64_t sum = 0;
  193. for (uint32_t i = 0; i < num_peers; i++)
  194. {
  195. uint64_t a = pow (-1, num_peers-i);
  196. uint64_t b = binom (num_peers, i);
  197. uint64_t c = pow (i, num_peers_observed);
  198. sum += a * b * c;
  199. }
  200. return sum / (double) pow (num_peers, num_peers_observed);
  201. }
  202. /**
  203. * Get one random peer out of the sampled peers.
  204. *
  205. * This reinitialises the queried sampler element.
  206. */
  207. static void
  208. sampler_mod_get_rand_peer (void *cls)
  209. {
  210. struct GetPeerCls *gpc = cls;
  211. struct RPS_SamplerElement *s_elem;
  212. struct GNUNET_TIME_Relative last_request_diff;
  213. struct RPS_Sampler *sampler;
  214. double prob_observed_n;
  215. gpc->get_peer_task = NULL;
  216. gpc->notify_ctx = NULL;
  217. sampler = gpc->req_handle->sampler;
  218. LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n");
  219. /* Cycle the #client_get_index one step further */
  220. client_get_index = (client_get_index + 1) % sampler->sampler_size;
  221. s_elem = sampler->sampler_elements[client_get_index];
  222. *gpc->id = s_elem->peer_id;
  223. GNUNET_assert (NULL != s_elem);
  224. if (EMPTY == s_elem->is_empty)
  225. {
  226. LOG (GNUNET_ERROR_TYPE_DEBUG,
  227. "Sampler_mod element empty, rescheduling.\n");
  228. GNUNET_assert (NULL == gpc->notify_ctx);
  229. gpc->notify_ctx =
  230. sampler_notify_on_update (sampler,
  231. &sampler_mod_get_rand_peer,
  232. gpc);
  233. return;
  234. }
  235. /* Check whether we may use this sampler to give it back to the client */
  236. if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us)
  237. {
  238. // TODO remove this condition at least for the client sampler
  239. last_request_diff =
  240. GNUNET_TIME_absolute_get_difference (s_elem->last_client_request,
  241. GNUNET_TIME_absolute_get ());
  242. /* We're not going to give it back now if it was
  243. * already requested by a client this round */
  244. if (last_request_diff.rel_value_us < sampler->max_round_interval.rel_value_us)
  245. {
  246. LOG (GNUNET_ERROR_TYPE_DEBUG,
  247. "Last client request on this sampler was less than max round interval ago -- scheduling for later\n");
  248. ///* How many time remains untile the next round has started? */
  249. //inv_last_request_diff =
  250. // GNUNET_TIME_absolute_get_difference (last_request_diff,
  251. // sampler->max_round_interval);
  252. // add a little delay
  253. /* Schedule it one round later */
  254. GNUNET_assert (NULL == gpc->notify_ctx);
  255. gpc->notify_ctx =
  256. sampler_notify_on_update (sampler,
  257. &sampler_mod_get_rand_peer,
  258. gpc);
  259. return;
  260. }
  261. }
  262. if (2 > s_elem->num_peers)
  263. {
  264. LOG (GNUNET_ERROR_TYPE_DEBUG,
  265. "This s_elem saw less than two peers -- scheduling for later\n");
  266. GNUNET_assert (NULL == gpc->notify_ctx);
  267. gpc->notify_ctx =
  268. sampler_notify_on_update (sampler,
  269. &sampler_mod_get_rand_peer,
  270. gpc);
  271. return;
  272. }
  273. /* compute probability */
  274. prob_observed_n = prob_observed_n_peers (sampler->num_peers_estim,
  275. s_elem->num_peers,
  276. sampler->deficiency_factor);
  277. /* check if probability is above desired */
  278. if (prob_observed_n >= sampler->desired_probability)
  279. {
  280. LOG (GNUNET_ERROR_TYPE_DEBUG,
  281. "Probability of having observed all peers (%d) too small ( < %d).\n",
  282. prob_observed_n,
  283. sampler->desired_probability);
  284. GNUNET_assert (NULL == gpc->notify_ctx);
  285. gpc->notify_ctx =
  286. sampler_notify_on_update (sampler,
  287. &sampler_mod_get_rand_peer,
  288. gpc);
  289. return;
  290. }
  291. /* More reasons to wait could be added here */
  292. // GNUNET_STATISTICS_set (stats,
  293. // "# client sampler element input",
  294. // s_elem->num_peers,
  295. // GNUNET_NO);
  296. // GNUNET_STATISTICS_set (stats,
  297. // "# client sampler element change",
  298. // s_elem->num_change,
  299. // GNUNET_NO);
  300. RPS_sampler_elem_reinit (s_elem);
  301. s_elem->last_client_request = GNUNET_TIME_absolute_get ();
  302. GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head,
  303. gpc->req_handle->gpc_tail,
  304. gpc);
  305. gpc->cont (gpc->cont_cls, gpc->id);
  306. GNUNET_free (gpc);
  307. }
  308. /* end of gnunet-service-rps.c */