123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914 |
- /*
- This file is part of GNUnet.
- Copyright (C) 2013-2016 GNUnet e.V.
- GNUnet is free software: you can redistribute it and/or modify it
- under the terms of the GNU Affero General Public License as published
- by the Free Software Foundation, either version 3 of the License,
- or (at your option) any later version.
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
- /**
- * @file peerstore/peerstore_api.c
- * @brief API for peerstore
- * @author Omar Tarabai
- * @author Christian Grothoff
- */
- #include "platform.h"
- #include "gnunet_util_lib.h"
- #include "peerstore.h"
- #include "peerstore_common.h"
- #define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__)
- /******************************************************************************/
- /************************ DATA STRUCTURES ****************************/
- /******************************************************************************/
- /**
- * Handle to the PEERSTORE service.
- */
- struct GNUNET_PEERSTORE_Handle
- {
- /**
- * Our configuration.
- */
- const struct GNUNET_CONFIGURATION_Handle *cfg;
- /**
- * Message queue
- */
- struct GNUNET_MQ_Handle *mq;
- /**
- * Head of active STORE requests.
- */
- struct GNUNET_PEERSTORE_StoreContext *store_head;
- /**
- * Tail of active STORE requests.
- */
- struct GNUNET_PEERSTORE_StoreContext *store_tail;
- /**
- * Head of active ITERATE requests.
- */
- struct GNUNET_PEERSTORE_IterateContext *iterate_head;
- /**
- * Tail of active ITERATE requests.
- */
- struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
- /**
- * Hashmap of watch requests
- */
- struct GNUNET_CONTAINER_MultiHashMap *watches;
- /**
- * Are we in the process of disconnecting but need to sync first?
- */
- int disconnecting;
- };
- /**
- * Context for a store request
- */
- struct GNUNET_PEERSTORE_StoreContext
- {
- /**
- * Kept in a DLL.
- */
- struct GNUNET_PEERSTORE_StoreContext *next;
- /**
- * Kept in a DLL.
- */
- struct GNUNET_PEERSTORE_StoreContext *prev;
- /**
- * Handle to the PEERSTORE service.
- */
- struct GNUNET_PEERSTORE_Handle *h;
- /**
- * Continuation called with service response
- */
- GNUNET_PEERSTORE_Continuation cont;
- /**
- * Closure for @e cont
- */
- void *cont_cls;
- /**
- * Which subsystem does the store?
- */
- char *sub_system;
- /**
- * Key for the store operation.
- */
- char *key;
- /**
- * Contains @e size bytes.
- */
- void *value;
- /**
- * Peer the store is for.
- */
- struct GNUNET_PeerIdentity peer;
- /**
- * Number of bytes in @e value.
- */
- size_t size;
- /**
- * When does the value expire?
- */
- struct GNUNET_TIME_Absolute expiry;
- /**
- * Options for the store operation.
- */
- enum GNUNET_PEERSTORE_StoreOption options;
- };
- /**
- * Context for a iterate request
- */
- struct GNUNET_PEERSTORE_IterateContext
- {
- /**
- * Kept in a DLL.
- */
- struct GNUNET_PEERSTORE_IterateContext *next;
- /**
- * Kept in a DLL.
- */
- struct GNUNET_PEERSTORE_IterateContext *prev;
- /**
- * Handle to the PEERSTORE service.
- */
- struct GNUNET_PEERSTORE_Handle *h;
- /**
- * Which subsystem does the store?
- */
- char *sub_system;
- /**
- * Peer the store is for.
- */
- struct GNUNET_PeerIdentity peer;
- /**
- * Key for the store operation.
- */
- char *key;
- /**
- * Callback with each matching record
- */
- GNUNET_PEERSTORE_Processor callback;
- /**
- * Closure for @e callback
- */
- void *callback_cls;
- /**
- * #GNUNET_YES if we are currently processing records.
- */
- int iterating;
- };
- /**
- * Context for a watch request
- */
- struct GNUNET_PEERSTORE_WatchContext
- {
- /**
- * Kept in a DLL.
- */
- struct GNUNET_PEERSTORE_WatchContext *next;
- /**
- * Kept in a DLL.
- */
- struct GNUNET_PEERSTORE_WatchContext *prev;
- /**
- * Handle to the PEERSTORE service.
- */
- struct GNUNET_PEERSTORE_Handle *h;
- /**
- * Callback with each record received
- */
- GNUNET_PEERSTORE_Processor callback;
- /**
- * Closure for @e callback
- */
- void *callback_cls;
- /**
- * Hash of the combined key
- */
- struct GNUNET_HashCode keyhash;
- };
- /******************************************************************************/
- /******************* DECLARATIONS *********************/
- /******************************************************************************/
- /**
- * Close the existing connection to PEERSTORE and reconnect.
- *
- * @param h handle to the service
- */
- static void
- reconnect (struct GNUNET_PEERSTORE_Handle *h);
- /**
- * Callback after MQ envelope is sent
- *
- * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
- */
- static void
- store_request_sent (void *cls)
- {
- struct GNUNET_PEERSTORE_StoreContext *sc = cls;
- GNUNET_PEERSTORE_Continuation cont;
- void *cont_cls;
- cont = sc->cont;
- cont_cls = sc->cont_cls;
- GNUNET_PEERSTORE_store_cancel (sc);
- if (NULL != cont)
- cont (cont_cls, GNUNET_OK);
- }
- /******************************************************************************/
- /******************* CONNECTION FUNCTIONS *********************/
- /******************************************************************************/
- /**
- * Function called when we had trouble talking to the service.
- */
- static void
- handle_client_error (void *cls,
- enum GNUNET_MQ_Error error)
- {
- struct GNUNET_PEERSTORE_Handle *h = cls;
- LOG (GNUNET_ERROR_TYPE_ERROR,
- "Received an error notification from MQ of type: %d\n",
- error);
- reconnect (h);
- }
- /**
- * Iterator over previous watches to resend them
- *
- * @param cls the `struct GNUNET_PEERSTORE_Handle`
- * @param key key for the watch
- * @param value the `struct GNUNET_PEERSTORE_WatchContext *`
- * @return #GNUNET_YES (continue to iterate)
- */
- static int
- rewatch_it (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
- {
- struct GNUNET_PEERSTORE_Handle *h = cls;
- struct GNUNET_PEERSTORE_WatchContext *wc = value;
- struct StoreKeyHashMessage *hm;
- struct GNUNET_MQ_Envelope *ev;
- ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
- hm->keyhash = wc->keyhash;
- GNUNET_MQ_send (h->mq, ev);
- return GNUNET_YES;
- }
- /**
- * Iterator over watch requests to cancel them.
- *
- * @param cls unsused
- * @param key key to the watch request
- * @param value watch context
- * @return #GNUNET_YES to continue iteration
- */
- static int
- destroy_watch (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
- {
- struct GNUNET_PEERSTORE_WatchContext *wc = value;
- GNUNET_PEERSTORE_watch_cancel (wc);
- return GNUNET_YES;
- }
- /**
- * Kill the connection to the service. This can be delayed in case of pending
- * STORE requests and the user explicitly asked to sync first. Otherwise it is
- * performed instantly.
- *
- * @param h Handle to the service.
- */
- static void
- do_disconnect (struct GNUNET_PEERSTORE_Handle *h)
- {
- if (NULL != h->mq)
- {
- GNUNET_MQ_destroy (h->mq);
- h->mq = NULL;
- }
- GNUNET_free (h);
- }
- /**
- * Connect to the PEERSTORE service.
- *
- * @param cfg configuration to use
- * @return NULL on error
- */
- struct GNUNET_PEERSTORE_Handle *
- GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
- {
- struct GNUNET_PEERSTORE_Handle *h;
- h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
- h->cfg = cfg;
- h->disconnecting = GNUNET_NO;
- reconnect (h);
- if (NULL == h->mq)
- {
- GNUNET_free (h);
- return NULL;
- }
- return h;
- }
- /**
- * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests
- * will be canceled.
- * Any pending STORE requests will depend on @e snyc_first flag.
- *
- * @param h handle to disconnect
- * @param sync_first send any pending STORE requests before disconnecting
- */
- void
- GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h,
- int sync_first)
- {
- struct GNUNET_PEERSTORE_IterateContext *ic;
- struct GNUNET_PEERSTORE_StoreContext *sc;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
- if (NULL != h->watches)
- {
- GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
- GNUNET_CONTAINER_multihashmap_destroy (h->watches);
- h->watches = NULL;
- }
- while (NULL != (ic = h->iterate_head))
- {
- GNUNET_break (0);
- GNUNET_PEERSTORE_iterate_cancel (ic);
- }
- if (NULL != h->store_head)
- {
- if (GNUNET_YES == sync_first)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Delaying disconnection due to pending store requests.\n");
- h->disconnecting = GNUNET_YES;
- return;
- }
- while (NULL != (sc = h->store_head))
- GNUNET_PEERSTORE_store_cancel (sc);
- }
- do_disconnect (h);
- }
- /******************************************************************************/
- /******************* STORE FUNCTIONS *********************/
- /******************************************************************************/
- /**
- * Cancel a store request
- *
- * @param sc Store request context
- */
- void
- GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
- {
- struct GNUNET_PEERSTORE_Handle *h = sc->h;
- GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
- GNUNET_free (sc->sub_system);
- GNUNET_free (sc->value);
- GNUNET_free (sc->key);
- GNUNET_free (sc);
- if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head))
- do_disconnect (h);
- }
- /**
- * Store a new entry in the PEERSTORE.
- * Note that stored entries can be lost in some cases
- * such as power failure.
- *
- * @param h Handle to the PEERSTORE service
- * @param sub_system name of the sub system
- * @param peer Peer Identity
- * @param key entry key
- * @param value entry value BLOB
- * @param size size of @e value
- * @param expiry absolute time after which the entry is (possibly) deleted
- * @param options options specific to the storage operation
- * @param cont Continuation function after the store request is sent
- * @param cont_cls Closure for @a cont
- */
- struct GNUNET_PEERSTORE_StoreContext *
- GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
- const char *sub_system,
- const struct GNUNET_PeerIdentity *peer,
- const char *key,
- const void *value, size_t size,
- struct GNUNET_TIME_Absolute expiry,
- enum GNUNET_PEERSTORE_StoreOption options,
- GNUNET_PEERSTORE_Continuation cont,
- void *cont_cls)
- {
- struct GNUNET_MQ_Envelope *ev;
- struct GNUNET_PEERSTORE_StoreContext *sc;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
- size, sub_system, GNUNET_i2s (peer), key);
- ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, value, size,
- expiry, options,
- GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
- sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
- sc->sub_system = GNUNET_strdup (sub_system);
- sc->peer = *peer;
- sc->key = GNUNET_strdup (key);
- sc->value = GNUNET_memdup (value, size);
- sc->size = size;
- sc->expiry = expiry;
- sc->options = options;
- sc->cont = cont;
- sc->cont_cls = cont_cls;
- sc->h = h;
- GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
- GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
- GNUNET_MQ_send (h->mq, ev);
- return sc;
- }
- /******************************************************************************/
- /******************* ITERATE FUNCTIONS *********************/
- /******************************************************************************/
- /**
- * When a response for iterate request is received
- *
- * @param cls a `struct GNUNET_PEERSTORE_Handle *`
- * @param msg message received
- */
- static void
- handle_iterate_end (void *cls,
- const struct GNUNET_MessageHeader *msg)
- {
- struct GNUNET_PEERSTORE_Handle *h = cls;
- struct GNUNET_PEERSTORE_IterateContext *ic;
- GNUNET_PEERSTORE_Processor callback;
- void *callback_cls;
- ic = h->iterate_head;
- if (NULL == ic)
- {
- LOG (GNUNET_ERROR_TYPE_ERROR,
- _("Unexpected iteration response, this should not happen.\n"));
- reconnect (h);
- return;
- }
- callback = ic->callback;
- callback_cls = ic->callback_cls;
- ic->iterating = GNUNET_NO;
- GNUNET_PEERSTORE_iterate_cancel (ic);
- if (NULL != callback)
- callback (callback_cls, NULL, NULL);
- }
- /**
- * When a response for iterate request is received, check the
- * message is well-formed.
- *
- * @param cls a `struct GNUNET_PEERSTORE_Handle *`
- * @param msg message received
- */
- static int
- check_iterate_result (void *cls,
- const struct StoreRecordMessage *msg)
- {
- /* we defer validation to #handle_iterate_result */
- return GNUNET_OK;
- }
- /**
- * When a response for iterate request is received
- *
- * @param cls a `struct GNUNET_PEERSTORE_Handle *`
- * @param msg message received
- */
- static void
- handle_iterate_result (void *cls,
- const struct StoreRecordMessage *msg)
- {
- struct GNUNET_PEERSTORE_Handle *h = cls;
- struct GNUNET_PEERSTORE_IterateContext *ic;
- GNUNET_PEERSTORE_Processor callback;
- void *callback_cls;
- struct GNUNET_PEERSTORE_Record *record;
- ic = h->iterate_head;
- if (NULL == ic)
- {
- LOG (GNUNET_ERROR_TYPE_ERROR,
- _("Unexpected iteration response, this should not happen.\n"));
- reconnect (h);
- return;
- }
- ic->iterating = GNUNET_YES;
- callback = ic->callback;
- callback_cls = ic->callback_cls;
- if (NULL == callback)
- return;
- record = PEERSTORE_parse_record_message (msg);
- if (NULL == record)
- {
- callback (callback_cls,
- NULL,
- _("Received a malformed response from service."));
- }
- else
- {
- callback (callback_cls,
- record,
- NULL);
- PEERSTORE_destroy_record (record);
- }
- }
- /**
- * Cancel an iterate request
- * Please do not call after the iterate request is done
- *
- * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
- */
- void
- GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
- {
- if (GNUNET_NO == ic->iterating)
- {
- GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head,
- ic->h->iterate_tail,
- ic);
- GNUNET_free (ic->sub_system);
- GNUNET_free_non_null (ic->key);
- GNUNET_free (ic);
- }
- else
- ic->callback = NULL;
- }
- /**
- * Iterate over records matching supplied key information
- *
- * @param h handle to the PEERSTORE service
- * @param sub_system name of sub system
- * @param peer Peer identity (can be NULL)
- * @param key entry key string (can be NULL)
- * @param callback function called with each matching record, all NULL's on end
- * @param callback_cls closure for @a callback
- * @return Handle to iteration request
- */
- struct GNUNET_PEERSTORE_IterateContext *
- GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
- const char *sub_system,
- const struct GNUNET_PeerIdentity *peer,
- const char *key,
- GNUNET_PEERSTORE_Processor callback,
- void *callback_cls)
- {
- struct GNUNET_MQ_Envelope *ev;
- struct GNUNET_PEERSTORE_IterateContext *ic;
- ev = PEERSTORE_create_record_mq_envelope (sub_system,
- peer,
- key,
- NULL, 0,
- GNUNET_TIME_UNIT_FOREVER_ABS,
- 0,
- GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
- ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
- ic->callback = callback;
- ic->callback_cls = callback_cls;
- ic->h = h;
- ic->sub_system = GNUNET_strdup (sub_system);
- if (NULL != peer)
- ic->peer = *peer;
- if (NULL != key)
- ic->key = GNUNET_strdup (key);
- GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head,
- h->iterate_tail,
- ic);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Sending an iterate request for sub system `%s'\n",
- sub_system);
- GNUNET_MQ_send (h->mq, ev);
- return ic;
- }
- /******************************************************************************/
- /******************* WATCH FUNCTIONS *********************/
- /******************************************************************************/
- /**
- * When a watch record is received, validate it is well-formed.
- *
- * @param cls a `struct GNUNET_PEERSTORE_Handle *`
- * @param msg message received
- */
- static int
- check_watch_record (void *cls,
- const struct StoreRecordMessage *msg)
- {
- /* we defer validation to #handle_watch_result */
- return GNUNET_OK;
- }
- /**
- * When a watch record is received, process it.
- *
- * @param cls a `struct GNUNET_PEERSTORE_Handle *`
- * @param msg message received
- */
- static void
- handle_watch_record (void *cls,
- const struct StoreRecordMessage *msg)
- {
- struct GNUNET_PEERSTORE_Handle *h = cls;
- struct GNUNET_PEERSTORE_Record *record;
- struct GNUNET_HashCode keyhash;
- struct GNUNET_PEERSTORE_WatchContext *wc;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Received a watch record from service.\n");
- record = PEERSTORE_parse_record_message (msg);
- if (NULL == record)
- {
- reconnect (h);
- return;
- }
- PEERSTORE_hash_key (record->sub_system,
- &record->peer,
- record->key,
- &keyhash);
- // FIXME: what if there are multiple watches for the same key?
- wc = GNUNET_CONTAINER_multihashmap_get (h->watches,
- &keyhash);
- if (NULL == wc)
- {
- LOG (GNUNET_ERROR_TYPE_ERROR,
- _("Received a watch result for a non existing watch.\n"));
- PEERSTORE_destroy_record (record);
- reconnect (h);
- return;
- }
- if (NULL != wc->callback)
- wc->callback (wc->callback_cls,
- record,
- NULL);
- PEERSTORE_destroy_record (record);
- }
- /**
- * Close the existing connection to PEERSTORE and reconnect.
- *
- * @param h handle to the service
- */
- static void
- reconnect (struct GNUNET_PEERSTORE_Handle *h)
- {
- struct GNUNET_MQ_MessageHandler mq_handlers[] = {
- GNUNET_MQ_hd_fixed_size (iterate_end,
- GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
- struct GNUNET_MessageHeader,
- h),
- GNUNET_MQ_hd_var_size (iterate_result,
- GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD,
- struct StoreRecordMessage,
- h),
- GNUNET_MQ_hd_var_size (watch_record,
- GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD,
- struct StoreRecordMessage,
- h),
- GNUNET_MQ_handler_end ()
- };
- struct GNUNET_PEERSTORE_IterateContext *ic;
- struct GNUNET_PEERSTORE_IterateContext *next;
- GNUNET_PEERSTORE_Processor icb;
- void *icb_cls;
- struct GNUNET_PEERSTORE_StoreContext *sc;
- struct GNUNET_MQ_Envelope *ev;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Reconnecting...\n");
- for (ic = h->iterate_head; NULL != ic; ic = next)
- {
- next = ic->next;
- if (GNUNET_YES == ic->iterating)
- {
- icb = ic->callback;
- icb_cls = ic->callback_cls;
- GNUNET_PEERSTORE_iterate_cancel (ic);
- if (NULL != icb)
- icb (icb_cls,
- NULL,
- "Iteration canceled due to reconnection");
- }
- }
- if (NULL != h->mq)
- {
- GNUNET_MQ_destroy (h->mq);
- h->mq = NULL;
- }
- h->mq = GNUNET_CLIENT_connect (h->cfg,
- "peerstore",
- mq_handlers,
- &handle_client_error,
- h);
- if (NULL == h->mq)
- return;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Resending pending requests after reconnect.\n");
- if (NULL != h->watches)
- GNUNET_CONTAINER_multihashmap_iterate (h->watches,
- &rewatch_it,
- h);
- for (ic = h->iterate_head; NULL != ic; ic = ic->next)
- {
- ev = PEERSTORE_create_record_mq_envelope (ic->sub_system,
- &ic->peer,
- ic->key,
- NULL, 0,
- GNUNET_TIME_UNIT_FOREVER_ABS,
- 0,
- GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
- GNUNET_MQ_send (h->mq, ev);
- }
- for (sc = h->store_head; NULL != sc; sc = sc->next)
- {
- ev = PEERSTORE_create_record_mq_envelope (sc->sub_system,
- &sc->peer,
- sc->key,
- sc->value,
- sc->size,
- sc->expiry,
- sc->options,
- GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
- GNUNET_MQ_notify_sent (ev,
- &store_request_sent,
- sc);
- GNUNET_MQ_send (h->mq,
- ev);
- }
- }
- /**
- * Cancel a watch request
- *
- * @param wc handle to the watch request
- */
- void
- GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
- {
- struct GNUNET_PEERSTORE_Handle *h = wc->h;
- struct GNUNET_MQ_Envelope *ev;
- struct StoreKeyHashMessage *hm;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Canceling watch.\n");
- ev = GNUNET_MQ_msg (hm,
- GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
- hm->keyhash = wc->keyhash;
- GNUNET_MQ_send (h->mq, ev);
- GNUNET_CONTAINER_multihashmap_remove (h->watches,
- &wc->keyhash,
- wc);
- GNUNET_free (wc);
- }
- /**
- * Request watching a given key
- * User will be notified with any new values added to key
- *
- * @param h handle to the PEERSTORE service
- * @param sub_system name of sub system
- * @param peer Peer identity
- * @param key entry key string
- * @param callback function called with each new value
- * @param callback_cls closure for @a callback
- * @return Handle to watch request
- */
- struct GNUNET_PEERSTORE_WatchContext *
- GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
- const char *sub_system,
- const struct GNUNET_PeerIdentity *peer,
- const char *key,
- GNUNET_PEERSTORE_Processor callback,
- void *callback_cls)
- {
- struct GNUNET_MQ_Envelope *ev;
- struct StoreKeyHashMessage *hm;
- struct GNUNET_PEERSTORE_WatchContext *wc;
- ev = GNUNET_MQ_msg (hm,
- GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
- PEERSTORE_hash_key (sub_system,
- peer,
- key,
- &hm->keyhash);
- wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
- wc->callback = callback;
- wc->callback_cls = callback_cls;
- wc->h = h;
- wc->keyhash = hm->keyhash;
- if (NULL == h->watches)
- h->watches = GNUNET_CONTAINER_multihashmap_create (5,
- GNUNET_NO);
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap_put (h->watches,
- &wc->keyhash,
- wc,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
- sub_system,
- GNUNET_i2s (peer),
- key);
- GNUNET_MQ_send (h->mq,
- ev);
- return wc;
- }
- /* end of peerstore_api.c */
|