|
@@ -29,6 +29,8 @@
|
|
|
#include "gnunet_rps_service.h"
|
|
|
#include "rps-sampler_client.h"
|
|
|
|
|
|
+#include "gnunet_nse_service.h"
|
|
|
+
|
|
|
#include <inttypes.h>
|
|
|
|
|
|
#define LOG(kind,...) GNUNET_log_from (kind, "rps-api",__VA_ARGS__)
|
|
@@ -109,6 +111,35 @@ struct GNUNET_RPS_Handle
|
|
|
* @brief Tail of the DLL of stream requests
|
|
|
*/
|
|
|
struct GNUNET_RPS_StreamRequestHandle *stream_requests_tail;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @brief Handle to nse service
|
|
|
+ */
|
|
|
+ struct GNUNET_NSE_Handle *nse;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @brief Pointer to the head element in DLL of request handles
|
|
|
+ */
|
|
|
+ struct GNUNET_RPS_Request_Handle *rh_head;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @brief Pointer to the tail element in DLL of request handles
|
|
|
+ */
|
|
|
+ struct GNUNET_RPS_Request_Handle *rh_tail;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @brief The desired probability with which we want to have observed all
|
|
|
+ * peers.
|
|
|
+ */
|
|
|
+ float desired_probability;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @brief A factor that catches the 'bias' of a random stream of peer ids.
|
|
|
+ *
|
|
|
+ * As introduced by Brahms: Factor between the number of unique ids in a
|
|
|
+ * truly random stream and number of unique ids in the gossip stream.
|
|
|
+ */
|
|
|
+ float deficiency_factor;
|
|
|
};
|
|
|
|
|
|
|
|
@@ -152,6 +183,16 @@ struct GNUNET_RPS_Request_Handle
|
|
|
* The closure for the callback.
|
|
|
*/
|
|
|
void *ready_cb_cls;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @brief Pointer to next element in DLL
|
|
|
+ */
|
|
|
+ struct GNUNET_RPS_Request_Handle *next;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @brief Pointer to previous element in DLL
|
|
|
+ */
|
|
|
+ struct GNUNET_RPS_Request_Handle *prev;
|
|
|
};
|
|
|
|
|
|
|
|
@@ -263,10 +304,7 @@ peers_ready_cb (const struct GNUNET_PeerIdentity *peers,
|
|
|
rh->ready_cb (rh->ready_cb_cls,
|
|
|
num_peers,
|
|
|
peers);
|
|
|
- GNUNET_RPS_stream_cancel (rh->srh);
|
|
|
- rh->srh = NULL;
|
|
|
- RPS_sampler_destroy (rh->sampler);
|
|
|
- rh->sampler = NULL;
|
|
|
+ GNUNET_RPS_request_cancel (rh);
|
|
|
}
|
|
|
|
|
|
|
|
@@ -606,6 +644,37 @@ hash_from_share_val (const char *share_val,
|
|
|
}
|
|
|
|
|
|
|
|
|
+/**
|
|
|
+ * @brief Callback for network size estimate - called with new estimates about
|
|
|
+ * the network size, updates all samplers with the new estimate
|
|
|
+ *
|
|
|
+ * Implements #GNUNET_NSE_Callback
|
|
|
+ *
|
|
|
+ * @param cls the rps handle
|
|
|
+ * @param timestamp unused
|
|
|
+ * @param logestimate the estimate
|
|
|
+ * @param std_dev the standard distribution
|
|
|
+ */
|
|
|
+static void
|
|
|
+nse_cb (void *cls,
|
|
|
+ struct GNUNET_TIME_Absolute timestamp,
|
|
|
+ double logestimate,
|
|
|
+ double std_dev)
|
|
|
+{
|
|
|
+ struct GNUNET_RPS_Handle *h = cls;
|
|
|
+ (void) timestamp;
|
|
|
+ (void) std_dev;
|
|
|
+
|
|
|
+ for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head;
|
|
|
+ NULL != rh_iter && NULL != rh_iter->next;
|
|
|
+ rh_iter = rh_iter->next)
|
|
|
+ {
|
|
|
+ RPS_sampler_update_with_nw_size (rh_iter->sampler,
|
|
|
+ GNUNET_NSE_log_estimate_to_n (logestimate));
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* Reconnect to the service
|
|
|
*/
|
|
@@ -631,6 +700,9 @@ reconnect (struct GNUNET_RPS_Handle *h)
|
|
|
mq_handlers,
|
|
|
&mq_error_handler,
|
|
|
h);
|
|
|
+ if (NULL != h->nse)
|
|
|
+ GNUNET_NSE_disconnect (h->nse);
|
|
|
+ h->nse = GNUNET_NSE_connect (h->cfg, &nse_cb, h);
|
|
|
}
|
|
|
|
|
|
|
|
@@ -638,7 +710,7 @@ reconnect (struct GNUNET_RPS_Handle *h)
|
|
|
* Connect to the rps service
|
|
|
*
|
|
|
* @param cfg configuration to use
|
|
|
- * @return a handle to the service
|
|
|
+ * @return a handle to the service, NULL on error
|
|
|
*/
|
|
|
struct GNUNET_RPS_Handle *
|
|
|
GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
|
|
@@ -647,6 +719,44 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
|
|
|
|
|
|
h = GNUNET_new (struct GNUNET_RPS_Handle);
|
|
|
h->cfg = cfg;
|
|
|
+ if (GNUNET_OK !=
|
|
|
+ GNUNET_CONFIGURATION_get_value_float (cfg,
|
|
|
+ "RPS",
|
|
|
+ "DESIRED_PROBABILITY",
|
|
|
+ &h->desired_probability))
|
|
|
+ {
|
|
|
+ GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
|
|
|
+ "RPS", "DESIRED_PROBABILITY");
|
|
|
+ GNUNET_free (h);
|
|
|
+ return NULL;
|
|
|
+ }
|
|
|
+ if (0 > h->desired_probability ||
|
|
|
+ 1 < h->desired_probability)
|
|
|
+ {
|
|
|
+ LOG (GNUNET_ERROR_TYPE_ERROR,
|
|
|
+ "The desired probability must be in the interval [0;1]\n");
|
|
|
+ GNUNET_free (h);
|
|
|
+ return NULL;
|
|
|
+ }
|
|
|
+ if (GNUNET_OK !=
|
|
|
+ GNUNET_CONFIGURATION_get_value_float (cfg,
|
|
|
+ "RPS",
|
|
|
+ "DEFICIENCY_FACTOR",
|
|
|
+ &h->deficiency_factor))
|
|
|
+ {
|
|
|
+ GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
|
|
|
+ "RPS", "DEFICIENCY_FACTOR");
|
|
|
+ GNUNET_free (h);
|
|
|
+ return NULL;
|
|
|
+ }
|
|
|
+ if (0 > h->desired_probability ||
|
|
|
+ 1 < h->desired_probability)
|
|
|
+ {
|
|
|
+ LOG (GNUNET_ERROR_TYPE_ERROR,
|
|
|
+ "The deficiency factor must be in the interval [0;1]\n");
|
|
|
+ GNUNET_free (h);
|
|
|
+ return NULL;
|
|
|
+ }
|
|
|
reconnect (h);
|
|
|
if (NULL == h->mq)
|
|
|
{
|
|
@@ -725,6 +835,10 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle,
|
|
|
rh->num_requests = num_req_peers;
|
|
|
rh->sampler = RPS_sampler_mod_init (num_req_peers,
|
|
|
GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff
|
|
|
+ RPS_sampler_set_desired_probability (rh->sampler,
|
|
|
+ rps_handle->desired_probability);
|
|
|
+ RPS_sampler_set_deficiency_factor (rh->sampler,
|
|
|
+ rps_handle->deficiency_factor);
|
|
|
rh->sampler_rh = RPS_sampler_get_n_rand_peers (rh->sampler,
|
|
|
num_req_peers,
|
|
|
peers_ready_cb,
|
|
@@ -734,6 +848,9 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle,
|
|
|
rh); /* cls */
|
|
|
rh->ready_cb = ready_cb;
|
|
|
rh->ready_cb_cls = cls;
|
|
|
+ GNUNET_CONTAINER_DLL_insert (rps_handle->rh_head,
|
|
|
+ rps_handle->rh_tail,
|
|
|
+ rh);
|
|
|
|
|
|
return rh;
|
|
|
}
|
|
@@ -911,6 +1028,7 @@ GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh)
|
|
|
|
|
|
h = rh->rps_handle;
|
|
|
GNUNET_assert (NULL != rh);
|
|
|
+ GNUNET_assert (NULL != rh->srh);
|
|
|
GNUNET_assert (h == rh->srh->rps_handle);
|
|
|
GNUNET_RPS_stream_cancel (rh->srh);
|
|
|
rh->srh = NULL;
|
|
@@ -920,6 +1038,10 @@ GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh)
|
|
|
RPS_sampler_request_cancel (rh->sampler_rh);
|
|
|
}
|
|
|
RPS_sampler_destroy (rh->sampler);
|
|
|
+ rh->sampler = NULL;
|
|
|
+ GNUNET_CONTAINER_DLL_remove (h->rh_head,
|
|
|
+ h->rh_tail,
|
|
|
+ rh);
|
|
|
GNUNET_free (rh);
|
|
|
}
|
|
|
|
|
@@ -939,13 +1061,24 @@ GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h)
|
|
|
LOG (GNUNET_ERROR_TYPE_WARNING,
|
|
|
"Still waiting for replies\n");
|
|
|
for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = h->stream_requests_head;
|
|
|
- NULL != srh_iter;
|
|
|
- srh_iter = srh_next)
|
|
|
+ NULL != srh_iter;
|
|
|
+ srh_iter = srh_next)
|
|
|
{
|
|
|
srh_next = srh_iter->next;
|
|
|
GNUNET_RPS_stream_cancel (srh_iter);
|
|
|
}
|
|
|
}
|
|
|
+ if (NULL != h->rh_head)
|
|
|
+ {
|
|
|
+ LOG (GNUNET_ERROR_TYPE_WARNING,
|
|
|
+ "Not all requests were cancelled!\n");
|
|
|
+ for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head;
|
|
|
+ h->rh_head != NULL;
|
|
|
+ rh_iter = h->rh_head)
|
|
|
+ {
|
|
|
+ GNUNET_RPS_request_cancel (rh_iter);
|
|
|
+ }
|
|
|
+ }
|
|
|
if (NULL != srh_callback_peers)
|
|
|
{
|
|
|
GNUNET_free (srh_callback_peers);
|
|
@@ -957,6 +1090,8 @@ GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h)
|
|
|
"Still waiting for view updates\n");
|
|
|
GNUNET_RPS_view_request_cancel (h);
|
|
|
}
|
|
|
+ if (NULL != h->nse)
|
|
|
+ GNUNET_NSE_disconnect (h->nse);
|
|
|
GNUNET_MQ_destroy (h->mq);
|
|
|
GNUNET_free (h);
|
|
|
}
|