123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076 |
- /*
- This file is part of GNUnet
- Copyright (C) 2013-2017 GNUnet e.V.
- GNUnet is free software: you can redistribute it and/or modify it
- under the terms of the GNU Affero General Public License as published
- by the Free Software Foundation, either version 3 of the License,
- or (at your option) any later version.
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
- SPDX-License-Identifier: AGPL3.0-or-later
- */
- /**
- * @file set/gnunet-service-set.c
- * @brief two-peer set operations
- * @author Florian Dold
- * @author Christian Grothoff
- */
- #include "gnunet-service-set.h"
- #include "gnunet-service-set_union.h"
- #include "gnunet-service-set_intersection.h"
- #include "gnunet-service-set_protocol.h"
- #include "gnunet_statistics_service.h"
- /**
- * How long do we hold on to an incoming channel if there is
- * no local listener before giving up?
- */
- #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
- /**
- * Lazy copy requests made by a client.
- */
- struct LazyCopyRequest
- {
- /**
- * Kept in a DLL.
- */
- struct LazyCopyRequest *prev;
- /**
- * Kept in a DLL.
- */
- struct LazyCopyRequest *next;
- /**
- * Which set are we supposed to copy?
- */
- struct Set *source_set;
- /**
- * Cookie identifying the request.
- */
- uint32_t cookie;
- };
- /**
- * A listener is inhabited by a client, and waits for evaluation
- * requests from remote peers.
- */
- struct Listener
- {
- /**
- * Listeners are held in a doubly linked list.
- */
- struct Listener *next;
- /**
- * Listeners are held in a doubly linked list.
- */
- struct Listener *prev;
- /**
- * Head of DLL of operations this listener is responsible for.
- * Once the client has accepted/declined the operation, the
- * operation is moved to the respective set's operation DLLS.
- */
- struct Operation *op_head;
- /**
- * Tail of DLL of operations this listener is responsible for.
- * Once the client has accepted/declined the operation, the
- * operation is moved to the respective set's operation DLLS.
- */
- struct Operation *op_tail;
- /**
- * Client that owns the listener.
- * Only one client may own a listener.
- */
- struct ClientState *cs;
- /**
- * The port we are listening on with CADET.
- */
- struct GNUNET_CADET_Port *open_port;
- /**
- * Application ID for the operation, used to distinguish
- * multiple operations of the same type with the same peer.
- */
- struct GNUNET_HashCode app_id;
- /**
- * The type of the operation.
- */
- enum GNUNET_SET_OperationType operation;
- };
- /**
- * Handle to the cadet service, used to listen for and connect to
- * remote peers.
- */
- static struct GNUNET_CADET_Handle *cadet;
- /**
- * DLL of lazy copy requests by this client.
- */
- static struct LazyCopyRequest *lazy_copy_head;
- /**
- * DLL of lazy copy requests by this client.
- */
- static struct LazyCopyRequest *lazy_copy_tail;
- /**
- * Generator for unique cookie we set per lazy copy request.
- */
- static uint32_t lazy_copy_cookie;
- /**
- * Statistics handle.
- */
- struct GNUNET_STATISTICS_Handle *_GSS_statistics;
- /**
- * Listeners are held in a doubly linked list.
- */
- static struct Listener *listener_head;
- /**
- * Listeners are held in a doubly linked list.
- */
- static struct Listener *listener_tail;
- /**
- * Number of active clients.
- */
- static unsigned int num_clients;
- /**
- * Are we in shutdown? if #GNUNET_YES and the number of clients
- * drops to zero, disconnect from CADET.
- */
- static int in_shutdown;
- /**
- * Counter for allocating unique IDs for clients, used to identify
- * incoming operation requests from remote peers, that the client can
- * choose to accept or refuse. 0 must not be used (reserved for
- * uninitialized).
- */
- static uint32_t suggest_id;
- /**
- * Get the incoming socket associated with the given id.
- *
- * @param listener the listener to look in
- * @param id id to look for
- * @return the incoming socket associated with the id,
- * or NULL if there is none
- */
- static struct Operation *
- get_incoming (uint32_t id)
- {
- for (struct Listener *listener = listener_head;
- NULL != listener;
- listener = listener->next)
- {
- for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
- if (op->suggest_id == id)
- return op;
- }
- return NULL;
- }
- /**
- * Destroy an incoming request from a remote peer
- *
- * @param op remote request to destroy
- */
- static void
- incoming_destroy (struct Operation *op)
- {
- struct Listener *listener;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Destroying incoming operation %p\n",
- op);
- if (NULL != (listener = op->listener))
- {
- GNUNET_CONTAINER_DLL_remove (listener->op_head,
- listener->op_tail,
- op);
- op->listener = NULL;
- }
- if (NULL != op->timeout_task)
- {
- GNUNET_SCHEDULER_cancel (op->timeout_task);
- op->timeout_task = NULL;
- }
- _GSS_operation_destroy2 (op);
- }
- /**
- * Context for the #garbage_collect_cb().
- */
- struct GarbageContext
- {
- /**
- * Map for which we are garbage collecting removed elements.
- */
- struct GNUNET_CONTAINER_MultiHashMap *map;
- /**
- * Lowest generation for which an operation is still pending.
- */
- unsigned int min_op_generation;
- /**
- * Largest generation for which an operation is still pending.
- */
- unsigned int max_op_generation;
- };
- /**
- * Function invoked to check if an element can be removed from
- * the set's history because it is no longer needed.
- *
- * @param cls the `struct GarbageContext *`
- * @param key key of the element in the map
- * @param value the `struct ElementEntry *`
- * @return #GNUNET_OK (continue to iterate)
- */
- static int
- garbage_collect_cb (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
- {
- //struct GarbageContext *gc = cls;
- //struct ElementEntry *ee = value;
- //if (GNUNET_YES != ee->removed)
- // return GNUNET_OK;
- //if ( (gc->max_op_generation < ee->generation_added) ||
- // (ee->generation_removed > gc->min_op_generation) )
- //{
- // GNUNET_assert (GNUNET_YES ==
- // GNUNET_CONTAINER_multihashmap_remove (gc->map,
- // key,
- // ee));
- // GNUNET_free (ee);
- //}
- return GNUNET_OK;
- }
- /**
- * Collect and destroy elements that are not needed anymore, because
- * their lifetime (as determined by their generation) does not overlap
- * with any active set operation.
- *
- * @param set set to garbage collect
- */
- static void
- collect_generation_garbage (struct Set *set)
- {
- struct GarbageContext gc;
- gc.min_op_generation = UINT_MAX;
- gc.max_op_generation = 0;
- for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
- {
- gc.min_op_generation = GNUNET_MIN (gc.min_op_generation,
- op->generation_created);
- gc.max_op_generation = GNUNET_MAX (gc.max_op_generation,
- op->generation_created);
- }
- gc.map = set->content->elements;
- GNUNET_CONTAINER_multihashmap_iterate (set->content->elements,
- &garbage_collect_cb,
- &gc);
- }
- /**
- * Is @a generation in the range of exclusions?
- *
- * @param generation generation to query
- * @param excluded array of generations where the element is excluded
- * @param excluded_size length of the @a excluded array
- * @return #GNUNET_YES if @a generation is in any of the ranges
- */
- static int
- is_excluded_generation (unsigned int generation,
- struct GenerationRange *excluded,
- unsigned int excluded_size)
- {
- for (unsigned int i = 0; i < excluded_size; i++)
- if ( (generation >= excluded[i].start) &&
- (generation < excluded[i].end) )
- return GNUNET_YES;
- return GNUNET_NO;
- }
- /**
- * Is element @a ee part of the set during @a query_generation?
- *
- * @param ee element to test
- * @param query_generation generation to query
- * @param excluded array of generations where the element is excluded
- * @param excluded_size length of the @a excluded array
- * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
- */
- static int
- is_element_of_generation (struct ElementEntry *ee,
- unsigned int query_generation,
- struct GenerationRange *excluded,
- unsigned int excluded_size)
- {
- struct MutationEvent *mut;
- int is_present;
- GNUNET_assert (NULL != ee->mutations);
- if (GNUNET_YES ==
- is_excluded_generation (query_generation,
- excluded,
- excluded_size))
- {
- GNUNET_break (0);
- return GNUNET_NO;
- }
- is_present = GNUNET_NO;
- /* Could be made faster with binary search, but lists
- are small, so why bother. */
- for (unsigned int i = 0; i < ee->mutations_size; i++)
- {
- mut = &ee->mutations[i];
- if (mut->generation > query_generation)
- {
- /* The mutation doesn't apply to our generation
- anymore. We can'b break here, since mutations aren't
- sorted by generation. */
- continue;
- }
- if (GNUNET_YES ==
- is_excluded_generation (mut->generation,
- excluded,
- excluded_size))
- {
- /* The generation is excluded (because it belongs to another
- fork via a lazy copy) and thus mutations aren't considered
- for membership testing. */
- continue;
- }
- /* This would be an inconsistency in how we manage mutations. */
- if ( (GNUNET_YES == is_present) &&
- (GNUNET_YES == mut->added) )
- GNUNET_assert (0);
- /* Likewise. */
- if ( (GNUNET_NO == is_present) &&
- (GNUNET_NO == mut->added) )
- GNUNET_assert (0);
- is_present = mut->added;
- }
- return is_present;
- }
- /**
- * Is element @a ee part of the set used by @a op?
- *
- * @param ee element to test
- * @param op operation the defines the set and its generation
- * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
- */
- int
- _GSS_is_element_of_operation (struct ElementEntry *ee,
- struct Operation *op)
- {
- return is_element_of_generation (ee,
- op->generation_created,
- op->set->excluded_generations,
- op->set->excluded_generations_size);
- }
- /**
- * Destroy the given operation. Used for any operation where both
- * peers were known and that thus actually had a vt and channel. Must
- * not be used for operations where 'listener' is still set and we do
- * not know the other peer.
- *
- * Call the implementation-specific cancel function of the operation.
- * Disconnects from the remote peer. Does not disconnect the client,
- * as there may be multiple operations per set.
- *
- * @param op operation to destroy
- * @param gc #GNUNET_YES to perform garbage collection on the set
- */
- void
- _GSS_operation_destroy (struct Operation *op,
- int gc)
- {
- struct Set *set = op->set;
- struct GNUNET_CADET_Channel *channel;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Destroying operation %p\n",
- op);
- GNUNET_assert (NULL == op->listener);
- if (NULL != op->state)
- {
- set->vt->cancel (op);
- op->state = NULL;
- }
- if (NULL != set)
- {
- GNUNET_CONTAINER_DLL_remove (set->ops_head,
- set->ops_tail,
- op);
- op->set = NULL;
- }
- if (NULL != op->context_msg)
- {
- GNUNET_free (op->context_msg);
- op->context_msg = NULL;
- }
- if (NULL != (channel = op->channel))
- {
- /* This will free op; called conditionally as this helper function
- is also called from within the channel disconnect handler. */
- op->channel = NULL;
- GNUNET_CADET_channel_destroy (channel);
- }
- if ( (NULL != set) &&
- (GNUNET_YES == gc) )
- collect_generation_garbage (set);
- /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
- * there was a channel end handler that will free 'op' on the call stack. */
- }
- /**
- * Callback called when a client connects to the service.
- *
- * @param cls closure for the service
- * @param c the new client that connected to the service
- * @param mq the message queue used to send messages to the client
- * @return @a `struct ClientState`
- */
- static void *
- client_connect_cb (void *cls,
- struct GNUNET_SERVICE_Client *c,
- struct GNUNET_MQ_Handle *mq)
- {
- struct ClientState *cs;
- num_clients++;
- cs = GNUNET_new (struct ClientState);
- cs->client = c;
- cs->mq = mq;
- return cs;
- }
- /**
- * Iterator over hash map entries to free element entries.
- *
- * @param cls closure
- * @param key current key code
- * @param value a `struct ElementEntry *` to be free'd
- * @return #GNUNET_YES (continue to iterate)
- */
- static int
- destroy_elements_iterator (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
- {
- struct ElementEntry *ee = value;
- GNUNET_free_non_null (ee->mutations);
- GNUNET_free (ee);
- return GNUNET_YES;
- }
- /**
- * Clean up after a client has disconnected
- *
- * @param cls closure, unused
- * @param client the client to clean up after
- * @param internal_cls the `struct ClientState`
- */
- static void
- client_disconnect_cb (void *cls,
- struct GNUNET_SERVICE_Client *client,
- void *internal_cls)
- {
- struct ClientState *cs = internal_cls;
- struct Operation *op;
- struct Listener *listener;
- struct Set *set;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Client disconnected, cleaning up\n");
- if (NULL != (set = cs->set))
- {
- struct SetContent *content = set->content;
- struct PendingMutation *pm;
- struct PendingMutation *pm_current;
- struct LazyCopyRequest *lcr;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Destroying client's set\n");
- /* Destroy pending set operations */
- while (NULL != set->ops_head)
- _GSS_operation_destroy (set->ops_head,
- GNUNET_NO);
- /* Destroy operation-specific state */
- GNUNET_assert (NULL != set->state);
- set->vt->destroy_set (set->state);
- set->state = NULL;
- /* Clean up ongoing iterations */
- if (NULL != set->iter)
- {
- GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
- set->iter = NULL;
- set->iteration_id++;
- }
- /* discard any pending mutations that reference this set */
- pm = content->pending_mutations_head;
- while (NULL != pm)
- {
- pm_current = pm;
- pm = pm->next;
- if (pm_current->set == set)
- {
- GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
- content->pending_mutations_tail,
- pm_current);
- GNUNET_free (pm_current);
- }
- }
- /* free set content (or at least decrement RC) */
- set->content = NULL;
- GNUNET_assert (0 != content->refcount);
- content->refcount--;
- if (0 == content->refcount)
- {
- GNUNET_assert (NULL != content->elements);
- GNUNET_CONTAINER_multihashmap_iterate (content->elements,
- &destroy_elements_iterator,
- NULL);
- GNUNET_CONTAINER_multihashmap_destroy (content->elements);
- content->elements = NULL;
- GNUNET_free (content);
- }
- GNUNET_free_non_null (set->excluded_generations);
- set->excluded_generations = NULL;
- /* remove set from pending copy requests */
- lcr = lazy_copy_head;
- while (NULL != lcr)
- {
- struct LazyCopyRequest *lcr_current = lcr;
- lcr = lcr->next;
- if (lcr_current->source_set == set)
- {
- GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
- lazy_copy_tail,
- lcr_current);
- GNUNET_free (lcr_current);
- }
- }
- GNUNET_free (set);
- }
- if (NULL != (listener = cs->listener))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Destroying client's listener\n");
- GNUNET_CADET_close_port (listener->open_port);
- listener->open_port = NULL;
- while (NULL != (op = listener->op_head))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Destroying incoming operation `%u' from peer `%s'\n",
- (unsigned int) op->client_request_id,
- GNUNET_i2s (&op->peer));
- incoming_destroy (op);
- }
- GNUNET_CONTAINER_DLL_remove (listener_head,
- listener_tail,
- listener);
- GNUNET_free (listener);
- }
- GNUNET_free (cs);
- num_clients--;
- if ( (GNUNET_YES == in_shutdown) &&
- (0 == num_clients) )
- {
- if (NULL != cadet)
- {
- GNUNET_CADET_disconnect (cadet);
- cadet = NULL;
- }
- }
- }
- /**
- * Check a request for a set operation from another peer.
- *
- * @param cls the operation state
- * @param msg the received message
- * @return #GNUNET_OK if the channel should be kept alive,
- * #GNUNET_SYSERR to destroy the channel
- */
- static int
- check_incoming_msg (void *cls,
- const struct OperationRequestMessage *msg)
- {
- struct Operation *op = cls;
- struct Listener *listener = op->listener;
- const struct GNUNET_MessageHeader *nested_context;
- /* double operation request */
- if (0 != op->suggest_id)
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- /* This should be equivalent to the previous condition, but can't hurt to check twice */
- if (NULL == op->listener)
- {
- GNUNET_break (0);
- return GNUNET_SYSERR;
- }
- if (listener->operation != (enum GNUNET_SET_OperationType) ntohl (msg->operation))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- nested_context = GNUNET_MQ_extract_nested_mh (msg);
- if ( (NULL != nested_context) &&
- (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- return GNUNET_OK;
- }
- /**
- * Handle a request for a set operation from another peer. Checks if we
- * have a listener waiting for such a request (and in that case initiates
- * asking the listener about accepting the connection). If no listener
- * is waiting, we queue the operation request in hope that a listener
- * shows up soon (before timeout).
- *
- * This msg is expected as the first and only msg handled through the
- * non-operation bound virtual table, acceptance of this operation replaces
- * our virtual table and subsequent msgs would be routed differently (as
- * we then know what type of operation this is).
- *
- * @param cls the operation state
- * @param msg the received message
- * @return #GNUNET_OK if the channel should be kept alive,
- * #GNUNET_SYSERR to destroy the channel
- */
- static void
- handle_incoming_msg (void *cls,
- const struct OperationRequestMessage *msg)
- {
- struct Operation *op = cls;
- struct Listener *listener = op->listener;
- const struct GNUNET_MessageHeader *nested_context;
- struct GNUNET_MQ_Envelope *env;
- struct GNUNET_SET_RequestMessage *cmsg;
- nested_context = GNUNET_MQ_extract_nested_mh (msg);
- /* Make a copy of the nested_context (application-specific context
- information that is opaque to set) so we can pass it to the
- listener later on */
- if (NULL != nested_context)
- op->context_msg = GNUNET_copy_message (nested_context);
- op->remote_element_count = ntohl (msg->element_count);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received P2P operation request (op %u, port %s) for active listener\n",
- (uint32_t) ntohl (msg->operation),
- GNUNET_h2s (&op->listener->app_id));
- GNUNET_assert (0 == op->suggest_id);
- if (0 == suggest_id)
- suggest_id++;
- op->suggest_id = suggest_id++;
- GNUNET_assert (NULL != op->timeout_task);
- GNUNET_SCHEDULER_cancel (op->timeout_task);
- op->timeout_task = NULL;
- env = GNUNET_MQ_msg_nested_mh (cmsg,
- GNUNET_MESSAGE_TYPE_SET_REQUEST,
- op->context_msg);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Suggesting incoming request with accept id %u to listener %p of client %p\n",
- op->suggest_id,
- listener,
- listener->cs);
- cmsg->accept_id = htonl (op->suggest_id);
- cmsg->peer_id = op->peer;
- GNUNET_MQ_send (listener->cs->mq,
- env);
- /* NOTE: GNUNET_CADET_receive_done() will be called in
- #handle_client_accept() */
- }
- /**
- * Add an element to @a set as specified by @a msg
- *
- * @param set set to manipulate
- * @param msg message specifying the change
- */
- static void
- execute_add (struct Set *set,
- const struct GNUNET_SET_ElementMessage *msg)
- {
- struct GNUNET_SET_Element el;
- struct ElementEntry *ee;
- struct GNUNET_HashCode hash;
- GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type));
- el.size = ntohs (msg->header.size) - sizeof (*msg);
- el.data = &msg[1];
- el.element_type = ntohs (msg->element_type);
- GNUNET_SET_element_hash (&el,
- &hash);
- ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
- &hash);
- if (NULL == ee)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Client inserts element %s of size %u\n",
- GNUNET_h2s (&hash),
- el.size);
- ee = GNUNET_malloc (el.size + sizeof (*ee));
- ee->element.size = el.size;
- GNUNET_memcpy (&ee[1],
- el.data,
- el.size);
- ee->element.data = &ee[1];
- ee->element.element_type = el.element_type;
- ee->remote = GNUNET_NO;
- ee->mutations = NULL;
- ee->mutations_size = 0;
- ee->element_hash = hash;
- GNUNET_break (GNUNET_YES ==
- GNUNET_CONTAINER_multihashmap_put (set->content->elements,
- &ee->element_hash,
- ee,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- }
- else if (GNUNET_YES ==
- is_element_of_generation (ee,
- set->current_generation,
- set->excluded_generations,
- set->excluded_generations_size))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Client inserted element %s of size %u twice (ignored)\n",
- GNUNET_h2s (&hash),
- el.size);
- /* same element inserted twice */
- return;
- }
- {
- struct MutationEvent mut = {
- .generation = set->current_generation,
- .added = GNUNET_YES
- };
- GNUNET_array_append (ee->mutations,
- ee->mutations_size,
- mut);
- }
- set->vt->add (set->state,
- ee);
- }
- /**
- * Remove an element from @a set as specified by @a msg
- *
- * @param set set to manipulate
- * @param msg message specifying the change
- */
- static void
- execute_remove (struct Set *set,
- const struct GNUNET_SET_ElementMessage *msg)
- {
- struct GNUNET_SET_Element el;
- struct ElementEntry *ee;
- struct GNUNET_HashCode hash;
- GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (msg->header.type));
- el.size = ntohs (msg->header.size) - sizeof (*msg);
- el.data = &msg[1];
- el.element_type = ntohs (msg->element_type);
- GNUNET_SET_element_hash (&el, &hash);
- ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
- &hash);
- if (NULL == ee)
- {
- /* Client tried to remove non-existing element. */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Client removes non-existing element of size %u\n",
- el.size);
- return;
- }
- if (GNUNET_NO ==
- is_element_of_generation (ee,
- set->current_generation,
- set->excluded_generations,
- set->excluded_generations_size))
- {
- /* Client tried to remove element twice */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Client removed element of size %u twice (ignored)\n",
- el.size);
- return;
- }
- else
- {
- struct MutationEvent mut = {
- .generation = set->current_generation,
- .added = GNUNET_NO
- };
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Client removes element of size %u\n",
- el.size);
- GNUNET_array_append (ee->mutations,
- ee->mutations_size,
- mut);
- }
- set->vt->remove (set->state,
- ee);
- }
- /**
- * Perform a mutation on a set as specified by the @a msg
- *
- * @param set the set to mutate
- * @param msg specification of what to change
- */
- static void
- execute_mutation (struct Set *set,
- const struct GNUNET_SET_ElementMessage *msg)
- {
- switch (ntohs (msg->header.type))
- {
- case GNUNET_MESSAGE_TYPE_SET_ADD:
- execute_add (set, msg);
- break;
- case GNUNET_MESSAGE_TYPE_SET_REMOVE:
- execute_remove (set, msg);
- break;
- default:
- GNUNET_break (0);
- }
- }
- /**
- * Execute mutations that were delayed on a set because of
- * pending operations.
- *
- * @param set the set to execute mutations on
- */
- static void
- execute_delayed_mutations (struct Set *set)
- {
- struct PendingMutation *pm;
- if (0 != set->content->iterator_count)
- return; /* still cannot do this */
- while (NULL != (pm = set->content->pending_mutations_head))
- {
- GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
- set->content->pending_mutations_tail,
- pm);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Executing pending mutation on %p.\n",
- pm->set);
- execute_mutation (pm->set,
- pm->msg);
- GNUNET_free (pm->msg);
- GNUNET_free (pm);
- }
- }
- /**
- * Send the next element of a set to the set's client. The next element is given by
- * the set's current hashmap iterator. The set's iterator will be set to NULL if there
- * are no more elements in the set. The caller must ensure that the set's iterator is
- * valid.
- *
- * The client will acknowledge each received element with a
- * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message. Our
- * #handle_client_iter_ack() will then trigger the next transmission.
- * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
- *
- * @param set set that should send its next element to its client
- */
- static void
- send_client_element (struct Set *set)
- {
- int ret;
- struct ElementEntry *ee;
- struct GNUNET_MQ_Envelope *ev;
- struct GNUNET_SET_IterResponseMessage *msg;
- GNUNET_assert (NULL != set->iter);
- do {
- ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
- NULL,
- (const void **) &ee);
- if (GNUNET_NO == ret)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Iteration on %p done.\n",
- set);
- ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
- GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
- set->iter = NULL;
- set->iteration_id++;
- GNUNET_assert (set->content->iterator_count > 0);
- set->content->iterator_count--;
- execute_delayed_mutations (set);
- GNUNET_MQ_send (set->cs->mq,
- ev);
- return;
- }
- GNUNET_assert (NULL != ee);
- } while (GNUNET_NO ==
- is_element_of_generation (ee,
- set->iter_generation,
- set->excluded_generations,
- set->excluded_generations_size));
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending iteration element on %p.\n",
- set);
- ev = GNUNET_MQ_msg_extra (msg,
- ee->element.size,
- GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
- GNUNET_memcpy (&msg[1],
- ee->element.data,
- ee->element.size);
- msg->element_type = htons (ee->element.element_type);
- msg->iteration_id = htons (set->iteration_id);
- GNUNET_MQ_send (set->cs->mq,
- ev);
- }
- /**
- * Called when a client wants to iterate the elements of a set.
- * Checks if we have a set associated with the client and if we
- * can right now start an iteration. If all checks out, starts
- * sending the elements of the set to the client.
- *
- * @param cls client that sent the message
- * @param m message sent by the client
- */
- static void
- handle_client_iterate (void *cls,
- const struct GNUNET_MessageHeader *m)
- {
- struct ClientState *cs = cls;
- struct Set *set;
- if (NULL == (set = cs->set))
- {
- /* attempt to iterate over a non existing set */
- GNUNET_break (0);
- GNUNET_SERVICE_client_drop (cs->client);
- return;
- }
- if (NULL != set->iter)
- {
- /* Only one concurrent iterate-action allowed per set */
- GNUNET_break (0);
- GNUNET_SERVICE_client_drop (cs->client);
- return;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Iterating set %p in gen %u with %u content elements\n",
- (void *) set,
- set->current_generation,
- GNUNET_CONTAINER_multihashmap_size (set->content->elements));
- GNUNET_SERVICE_client_continue (cs->client);
- set->content->iterator_count++;
- set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
- set->iter_generation = set->current_generation;
- send_client_element (set);
- }
- /**
- * Called when a client wants to create a new set. This is typically
- * the first request from a client, and includes the type of set
- * operation to be performed.
- *
- * @param cls client that sent the message
- * @param m message sent by the client
- */
- static void
- handle_client_create_set (void *cls,
- const struct GNUNET_SET_CreateMessage *msg)
- {
- struct ClientState *cs = cls;
- struct Set *set;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Client created new set (operation %u)\n",
- (uint32_t) ntohl (msg->operation));
- if (NULL != cs->set)
- {
- /* There can only be one set per client */
- GNUNET_break (0);
- GNUNET_SERVICE_client_drop (cs->client);
- return;
- }
- set = GNUNET_new (struct Set);
- switch (ntohl (msg->operation))
- {
- case GNUNET_SET_OPERATION_INTERSECTION:
- set->vt = _GSS_intersection_vt ();
- break;
- case GNUNET_SET_OPERATION_UNION:
- set->vt = _GSS_union_vt ();
- break;
- default:
- GNUNET_free (set);
- GNUNET_break (0);
- GNUNET_SERVICE_client_drop (cs->client);
- return;
- }
- set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
- set->state = set->vt->create ();
- if (NULL == set->state)
- {
- /* initialization failed (i.e. out of memory) */
- GNUNET_free (set);
- GNUNET_SERVICE_client_drop (cs->client);
- return;
- }
- set->content = GNUNET_new (struct SetContent);
- set->content->refcount = 1;
- set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
- GNUNET_YES);
- set->cs = cs;
- cs->set = set;
- GNUNET_SERVICE_client_continue (cs->client);
- }
- /**
- * Timeout happens iff:
- * - we suggested an operation to our listener,
- * but did not receive a response in time
- * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
- *
- * @param cls channel context
- * @param tc context information (why was this task triggered now)
- */
- static void
- incoming_timeout_cb (void *cls)
- {
- struct Operation *op = cls;
- op->timeout_task = NULL;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Remote peer's incoming request timed out\n");
- incoming_destroy (op);
- }
- /**
- * Method called whenever another peer has added us to a channel the
- * other peer initiated. Only called (once) upon reception of data
- * from a channel we listen on.
- *
- * The channel context represents the operation itself and gets added
- * to a DLL, from where it gets looked up when our local listener
- * client responds to a proposed/suggested operation or connects and
- * associates with this operation.
- *
- * @param cls closure
- * @param channel new handle to the channel
- * @param source peer that started the channel
- * @return initial channel context for the channel
- * returns NULL on error
- */
- static void *
- channel_new_cb (void *cls,
- struct GNUNET_CADET_Channel *channel,
- const struct GNUNET_PeerIdentity *source)
- {
- struct Listener *listener = cls;
- struct Operation *op;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "New incoming channel\n");
- op = GNUNET_new (struct Operation);
- op->listener = listener;
- op->peer = *source;
- op->channel = channel;
- op->mq = GNUNET_CADET_get_mq (op->channel);
- op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
- UINT32_MAX);
- op->timeout_task
- = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
- &incoming_timeout_cb,
- op);
- GNUNET_CONTAINER_DLL_insert (listener->op_head,
- listener->op_tail,
- op);
- return op;
- }
- /**
- * Function called whenever a channel is destroyed. Should clean up
- * any associated state. It must NOT call
- * GNUNET_CADET_channel_destroy() on the channel.
- *
- * The peer_disconnect function is part of a a virtual table set initially either
- * when a peer creates a new channel with us, or once we create
- * a new channel ourselves (evaluate).
- *
- * Once we know the exact type of operation (union/intersection), the vt is
- * replaced with an operation specific instance (_GSS_[op]_vt).
- *
- * @param channel_ctx place where local state associated
- * with the channel is stored
- * @param channel connection to the other end (henceforth invalid)
- */
- static void
- channel_end_cb (void *channel_ctx,
- const struct GNUNET_CADET_Channel *channel)
- {
- struct Operation *op = channel_ctx;
- op->channel = NULL;
- _GSS_operation_destroy2 (op);
- }
- /**
- * This function probably should not exist
- * and be replaced by inlining more specific
- * logic in the various places where it is called.
- */
- void
- _GSS_operation_destroy2 (struct Operation *op)
- {
- struct GNUNET_CADET_Channel *channel;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "channel_end_cb called\n");
- if (NULL != (channel = op->channel))
- {
- /* This will free op; called conditionally as this helper function
- is also called from within the channel disconnect handler. */
- op->channel = NULL;
- GNUNET_CADET_channel_destroy (channel);
- }
- if (NULL != op->listener)
- {
- incoming_destroy (op);
- return;
- }
- if (NULL != op->set)
- op->set->vt->channel_death (op);
- else
- _GSS_operation_destroy (op,
- GNUNET_YES);
- GNUNET_free (op);
- }
- /**
- * Function called whenever an MQ-channel's transmission window size changes.
- *
- * The first callback in an outgoing channel will be with a non-zero value
- * and will mean the channel is connected to the destination.
- *
- * For an incoming channel it will be called immediately after the
- * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
- *
- * @param cls Channel closure.
- * @param channel Connection to the other end (henceforth invalid).
- * @param window_size New window size. If the is more messages than buffer size
- * this value will be negative..
- */
- static void
- channel_window_cb (void *cls,
- const struct GNUNET_CADET_Channel *channel,
- int window_size)
- {
- /* FIXME: not implemented, we could do flow control here... */
- }
- /**
- * Called when a client wants to create a new listener.
- *
- * @param cls client that sent the message
- * @param msg message sent by the client
- */
- static void
- handle_client_listen (void *cls,
- const struct GNUNET_SET_ListenMessage *msg)
- {
- struct ClientState *cs = cls;
- struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
- GNUNET_MQ_hd_var_size (incoming_msg,
- GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
- struct OperationRequestMessage,
- NULL),
- GNUNET_MQ_hd_var_size (union_p2p_ibf,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
- struct IBFMessage,
- NULL),
- GNUNET_MQ_hd_var_size (union_p2p_elements,
- GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
- struct GNUNET_SET_ElementMessage,
- NULL),
- GNUNET_MQ_hd_var_size (union_p2p_offer,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
- struct GNUNET_MessageHeader,
- NULL),
- GNUNET_MQ_hd_var_size (union_p2p_inquiry,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
- struct InquiryMessage,
- NULL),
- GNUNET_MQ_hd_var_size (union_p2p_demand,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
- struct GNUNET_MessageHeader,
- NULL),
- GNUNET_MQ_hd_fixed_size (union_p2p_done,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
- struct GNUNET_MessageHeader,
- NULL),
- GNUNET_MQ_hd_fixed_size (union_p2p_over,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
- struct GNUNET_MessageHeader,
- NULL),
- GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
- struct GNUNET_MessageHeader,
- NULL),
- GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
- struct GNUNET_MessageHeader,
- NULL),
- GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
- struct StrataEstimatorMessage,
- NULL),
- GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
- struct StrataEstimatorMessage,
- NULL),
- GNUNET_MQ_hd_var_size (union_p2p_full_element,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
- struct GNUNET_SET_ElementMessage,
- NULL),
- GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
- GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
- struct IntersectionElementInfoMessage,
- NULL),
- GNUNET_MQ_hd_var_size (intersection_p2p_bf,
- GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
- struct BFMessage,
- NULL),
- GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
- GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
- struct IntersectionDoneMessage,
- NULL),
- GNUNET_MQ_handler_end ()
- };
- struct Listener *listener;
- if (NULL != cs->listener)
- {
- /* max. one active listener per client! */
- GNUNET_break (0);
- GNUNET_SERVICE_client_drop (cs->client);
- return;
- }
- listener = GNUNET_new (struct Listener);
- listener->cs = cs;
- cs->listener = listener;
- listener->app_id = msg->app_id;
- listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
- GNUNET_CONTAINER_DLL_insert (listener_head,
- listener_tail,
- listener);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "New listener created (op %u, port %s)\n",
- listener->operation,
- GNUNET_h2s (&listener->app_id));
- listener->open_port
- = GNUNET_CADET_open_port (cadet,
- &msg->app_id,
- &channel_new_cb,
- listener,
- &channel_window_cb,
- &channel_end_cb,
- cadet_handlers);
- GNUNET_SERVICE_client_continue (cs->client);
- }
- /**
- * Called when the listening client rejects an operation
- * request by another peer.
- *
- * @param cls client that sent the message
- * @param msg message sent by the client
- */
- static void
- handle_client_reject (void *cls,
- const struct GNUNET_SET_RejectMessage *msg)
- {
- struct ClientState *cs = cls;
- struct Operation *op;
- op = get_incoming (ntohl (msg->accept_reject_id));
- if (NULL == op)
- {
- /* no matching incoming operation for this reject;
- could be that the other peer already disconnected... */
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Client rejected unknown operation %u\n",
- (unsigned int) ntohl (msg->accept_reject_id));
- GNUNET_SERVICE_client_continue (cs->client);
- return;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Peer request (op %u, app %s) rejected by client\n",
- op->listener->operation,
- GNUNET_h2s (&cs->listener->app_id));
- _GSS_operation_destroy2 (op);
- GNUNET_SERVICE_client_continue (cs->client);
- }
- /**
- * Called when a client wants to add or remove an element to a set it inhabits.
- *
- * @param cls client that sent the message
- * @param msg message sent by the client
- */
- static int
- check_client_mutation (void *cls,
- const struct GNUNET_SET_ElementMessage *msg)
- {
- /* NOTE: Technically, we should probably check with the
- block library whether the element we are given is well-formed */
- return GNUNET_OK;
- }
- /**
- * Called when a client wants to add or remove an element to a set it inhabits.
- *
- * @param cls client that sent the message
- * @param msg message sent by the client
- */
- static void
- handle_client_mutation (void *cls,
- const struct GNUNET_SET_ElementMessage *msg)
- {
- struct ClientState *cs = cls;
- struct Set *set;
- if (NULL == (set = cs->set))
- {
- /* client without a set requested an operation */
- GNUNET_break (0);
- GNUNET_SERVICE_client_drop (cs->client);
- return;
- }
- GNUNET_SERVICE_client_continue (cs->client);
- if (0 != set->content->iterator_count)
- {
- struct PendingMutation *pm;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Scheduling mutation on set\n");
- pm = GNUNET_new (struct PendingMutation);
- pm->msg = (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header);
- pm->set = set;
- GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head,
- set->content->pending_mutations_tail,
- pm);
- return;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Executing mutation on set\n");
- execute_mutation (set,
- msg);
- }
- /**
- * Advance the current generation of a set,
- * adding exclusion ranges if necessary.
- *
- * @param set the set where we want to advance the generation
- */
- static void
- advance_generation (struct Set *set)
- {
- struct GenerationRange r;
- if (set->current_generation == set->content->latest_generation)
- {
- set->content->latest_generation++;
- set->current_generation++;
- return;
- }
- GNUNET_assert (set->current_generation < set->content->latest_generation);
- r.start = set->current_generation + 1;
- r.end = set->content->latest_generation + 1;
- set->content->latest_generation = r.end;
- set->current_generation = r.end;
- GNUNET_array_append (set->excluded_generations,
- set->excluded_generations_size,
- r);
- }
- /**
- * Called when a client wants to initiate a set operation with another
- * peer. Initiates the CADET connection to the listener and sends the
- * request.
- *
- * @param cls client that sent the message
- * @param msg message sent by the client
- * @return #GNUNET_OK if the message is well-formed
- */
- static int
- check_client_evaluate (void *cls,
- const struct GNUNET_SET_EvaluateMessage *msg)
- {
- /* FIXME: suboptimal, even if the context below could be NULL,
- there are malformed messages this does not check for... */
- return GNUNET_OK;
- }
- /**
- * Called when a client wants to initiate a set operation with another
- * peer. Initiates the CADET connection to the listener and sends the
- * request.
- *
- * @param cls client that sent the message
- * @param msg message sent by the client
- */
- static void
- handle_client_evaluate (void *cls,
- const struct GNUNET_SET_EvaluateMessage *msg)
- {
- struct ClientState *cs = cls;
- struct Operation *op = GNUNET_new (struct Operation);
- const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
- GNUNET_MQ_hd_var_size (incoming_msg,
- GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
- struct OperationRequestMessage,
- op),
- GNUNET_MQ_hd_var_size (union_p2p_ibf,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
- struct IBFMessage,
- op),
- GNUNET_MQ_hd_var_size (union_p2p_elements,
- GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
- struct GNUNET_SET_ElementMessage,
- op),
- GNUNET_MQ_hd_var_size (union_p2p_offer,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
- struct GNUNET_MessageHeader,
- op),
- GNUNET_MQ_hd_var_size (union_p2p_inquiry,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
- struct InquiryMessage,
- op),
- GNUNET_MQ_hd_var_size (union_p2p_demand,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
- struct GNUNET_MessageHeader,
- op),
- GNUNET_MQ_hd_fixed_size (union_p2p_done,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
- struct GNUNET_MessageHeader,
- op),
- GNUNET_MQ_hd_fixed_size (union_p2p_over,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
- struct GNUNET_MessageHeader,
- op),
- GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
- struct GNUNET_MessageHeader,
- op),
- GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
- struct GNUNET_MessageHeader,
- op),
- GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
- struct StrataEstimatorMessage,
- op),
- GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
- struct StrataEstimatorMessage,
- op),
- GNUNET_MQ_hd_var_size (union_p2p_full_element,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
- struct GNUNET_SET_ElementMessage,
- op),
- GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
- GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
- struct IntersectionElementInfoMessage,
- op),
- GNUNET_MQ_hd_var_size (intersection_p2p_bf,
- GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
- struct BFMessage,
- op),
- GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
- GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
- struct IntersectionDoneMessage,
- op),
- GNUNET_MQ_handler_end ()
- };
- struct Set *set;
- const struct GNUNET_MessageHeader *context;
- if (NULL == (set = cs->set))
- {
- GNUNET_break (0);
- GNUNET_free (op);
- GNUNET_SERVICE_client_drop (cs->client);
- return;
- }
- op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
- UINT32_MAX);
- op->peer = msg->target_peer;
- op->result_mode = ntohl (msg->result_mode);
- op->client_request_id = ntohl (msg->request_id);
- op->byzantine = msg->byzantine;
- op->byzantine_lower_bound = msg->byzantine_lower_bound;
- op->force_full = msg->force_full;
- op->force_delta = msg->force_delta;
- context = GNUNET_MQ_extract_nested_mh (msg);
- /* Advance generation values, so that
- mutations won't interfer with the running operation. */
- op->set = set;
- op->generation_created = set->current_generation;
- advance_generation (set);
- GNUNET_CONTAINER_DLL_insert (set->ops_head,
- set->ops_tail,
- op);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Creating new CADET channel to port %s for set operation type %u\n",
- GNUNET_h2s (&msg->app_id),
- set->operation);
- op->channel = GNUNET_CADET_channel_create (cadet,
- op,
- &msg->target_peer,
- &msg->app_id,
- GNUNET_CADET_OPTION_RELIABLE,
- &channel_window_cb,
- &channel_end_cb,
- cadet_handlers);
- op->mq = GNUNET_CADET_get_mq (op->channel);
- op->state = set->vt->evaluate (op,
- context);
- if (NULL == op->state)
- {
- GNUNET_break (0);
- GNUNET_SERVICE_client_drop (cs->client);
- return;
- }
- GNUNET_SERVICE_client_continue (cs->client);
- }
- /**
- * Handle an ack from a client, and send the next element. Note
- * that we only expect acks for set elements, not after the
- * #GNUNET_MESSAGE_TYPE_SET_ITER_DONE message.
- *
- * @param cls client the client
- * @param ack the message
- */
- static void
- handle_client_iter_ack (void *cls,
- const struct GNUNET_SET_IterAckMessage *ack)
- {
- struct ClientState *cs = cls;
- struct Set *set;
- if (NULL == (set = cs->set))
- {
- /* client without a set acknowledged receiving a value */
- GNUNET_break (0);
- GNUNET_SERVICE_client_drop (cs->client);
- return;
- }
- if (NULL == set->iter)
- {
- /* client sent an ack, but we were not expecting one (as
- set iteration has finished) */
- GNUNET_break (0);
- GNUNET_SERVICE_client_drop (cs->client);
- return;
- }
- GNUNET_SERVICE_client_continue (cs->client);
- if (ntohl (ack->send_more))
- {
- send_client_element (set);
- }
- else
- {
- GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
- set->iter = NULL;
- set->iteration_id++;
- }
- }
- /**
- * Handle a request from the client to copy a set.
- *
- * @param cls the client
- * @param mh the message
- */
- static void
- handle_client_copy_lazy_prepare (void *cls,
- const struct GNUNET_MessageHeader *mh)
- {
- struct ClientState *cs = cls;
- struct Set *set;
- struct LazyCopyRequest *cr;
- struct GNUNET_MQ_Envelope *ev;
- struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
- if (NULL == (set = cs->set))
- {
- /* client without a set requested an operation */
- GNUNET_break (0);
- GNUNET_SERVICE_client_drop (cs->client);
- return;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Client requested creation of lazy copy\n");
- cr = GNUNET_new (struct LazyCopyRequest);
- cr->cookie = ++lazy_copy_cookie;
- cr->source_set = set;
- GNUNET_CONTAINER_DLL_insert (lazy_copy_head,
- lazy_copy_tail,
- cr);
- ev = GNUNET_MQ_msg (resp_msg,
- GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
- resp_msg->cookie = cr->cookie;
- GNUNET_MQ_send (set->cs->mq,
- ev);
- GNUNET_SERVICE_client_continue (cs->client);
- }
- /**
- * Handle a request from the client to connect to a copy of a set.
- *
- * @param cls the client
- * @param msg the message
- */
- static void
- handle_client_copy_lazy_connect (void *cls,
- const struct GNUNET_SET_CopyLazyConnectMessage *msg)
- {
- struct ClientState *cs = cls;
- struct LazyCopyRequest *cr;
- struct Set *set;
- int found;
- if (NULL != cs->set)
- {
- /* There can only be one set per client */
- GNUNET_break (0);
- GNUNET_SERVICE_client_drop (cs->client);
- return;
- }
- found = GNUNET_NO;
- for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
- {
- if (cr->cookie == msg->cookie)
- {
- found = GNUNET_YES;
- break;
- }
- }
- if (GNUNET_NO == found)
- {
- /* client asked for copy with cookie we don't know */
- GNUNET_break (0);
- GNUNET_SERVICE_client_drop (cs->client);
- return;
- }
- GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
- lazy_copy_tail,
- cr);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Client %p requested use of lazy copy\n",
- cs);
- set = GNUNET_new (struct Set);
- switch (cr->source_set->operation)
- {
- case GNUNET_SET_OPERATION_INTERSECTION:
- set->vt = _GSS_intersection_vt ();
- break;
- case GNUNET_SET_OPERATION_UNION:
- set->vt = _GSS_union_vt ();
- break;
- default:
- GNUNET_assert (0);
- return;
- }
- if (NULL == set->vt->copy_state)
- {
- /* Lazy copy not supported for this set operation */
- GNUNET_break (0);
- GNUNET_free (set);
- GNUNET_free (cr);
- GNUNET_SERVICE_client_drop (cs->client);
- return;
- }
- set->operation = cr->source_set->operation;
- set->state = set->vt->copy_state (cr->source_set->state);
- set->content = cr->source_set->content;
- set->content->refcount++;
- set->current_generation = cr->source_set->current_generation;
- set->excluded_generations_size = cr->source_set->excluded_generations_size;
- set->excluded_generations
- = GNUNET_memdup (cr->source_set->excluded_generations,
- set->excluded_generations_size * sizeof (struct GenerationRange));
- /* Advance the generation of the new set, so that mutations to the
- of the cloned set and the source set are independent. */
- advance_generation (set);
- set->cs = cs;
- cs->set = set;
- GNUNET_free (cr);
- GNUNET_SERVICE_client_continue (cs->client);
- }
- /**
- * Handle a request from the client to cancel a running set operation.
- *
- * @param cls the client
- * @param msg the message
- */
- static void
- handle_client_cancel (void *cls,
- const struct GNUNET_SET_CancelMessage *msg)
- {
- struct ClientState *cs = cls;
- struct Set *set;
- struct Operation *op;
- int found;
- if (NULL == (set = cs->set))
- {
- /* client without a set requested an operation */
- GNUNET_break (0);
- GNUNET_SERVICE_client_drop (cs->client);
- return;
- }
- found = GNUNET_NO;
- for (op = set->ops_head; NULL != op; op = op->next)
- {
- if (op->client_request_id == ntohl (msg->request_id))
- {
- found = GNUNET_YES;
- break;
- }
- }
- if (GNUNET_NO == found)
- {
- /* It may happen that the operation was already destroyed due to
- * the other peer disconnecting. The client may not know about this
- * yet and try to cancel the (just barely non-existent) operation.
- * So this is not a hard error.
- */
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Client canceled non-existent op %u\n",
- (uint32_t) ntohl (msg->request_id));
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Client requested cancel for op %u\n",
- (uint32_t) ntohl (msg->request_id));
- _GSS_operation_destroy (op,
- GNUNET_YES);
- }
- GNUNET_SERVICE_client_continue (cs->client);
- }
- /**
- * Handle a request from the client to accept a set operation that
- * came from a remote peer. We forward the accept to the associated
- * operation for handling
- *
- * @param cls the client
- * @param msg the message
- */
- static void
- handle_client_accept (void *cls,
- const struct GNUNET_SET_AcceptMessage *msg)
- {
- struct ClientState *cs = cls;
- struct Set *set;
- struct Operation *op;
- struct GNUNET_SET_ResultMessage *result_message;
- struct GNUNET_MQ_Envelope *ev;
- struct Listener *listener;
- if (NULL == (set = cs->set))
- {
- /* client without a set requested to accept */
- GNUNET_break (0);
- GNUNET_SERVICE_client_drop (cs->client);
- return;
- }
- op = get_incoming (ntohl (msg->accept_reject_id));
- if (NULL == op)
- {
- /* It is not an error if the set op does not exist -- it may
- * have been destroyed when the partner peer disconnected. */
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Client %p accepted request %u of listener %p that is no longer active\n",
- cs,
- ntohl (msg->accept_reject_id),
- cs->listener);
- ev = GNUNET_MQ_msg (result_message,
- GNUNET_MESSAGE_TYPE_SET_RESULT);
- result_message->request_id = msg->request_id;
- result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
- GNUNET_MQ_send (set->cs->mq,
- ev);
- GNUNET_SERVICE_client_continue (cs->client);
- return;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Client accepting request %u\n",
- (uint32_t) ntohl (msg->accept_reject_id));
- listener = op->listener;
- op->listener = NULL;
- GNUNET_CONTAINER_DLL_remove (listener->op_head,
- listener->op_tail,
- op);
- op->set = set;
- GNUNET_CONTAINER_DLL_insert (set->ops_head,
- set->ops_tail,
- op);
- op->client_request_id = ntohl (msg->request_id);
- op->result_mode = ntohl (msg->result_mode);
- op->byzantine = msg->byzantine;
- op->byzantine_lower_bound = msg->byzantine_lower_bound;
- op->force_full = msg->force_full;
- op->force_delta = msg->force_delta;
- /* Advance generation values, so that future mutations do not
- interfer with the running operation. */
- op->generation_created = set->current_generation;
- advance_generation (set);
- GNUNET_assert (NULL == op->state);
- op->state = set->vt->accept (op);
- if (NULL == op->state)
- {
- GNUNET_break (0);
- GNUNET_SERVICE_client_drop (cs->client);
- return;
- }
- /* Now allow CADET to continue, as we did not do this in
- #handle_incoming_msg (as we wanted to first see if the
- local client would accept the request). */
- GNUNET_CADET_receive_done (op->channel);
- GNUNET_SERVICE_client_continue (cs->client);
- }
- /**
- * Called to clean up, after a shutdown has been requested.
- *
- * @param cls closure, NULL
- */
- static void
- shutdown_task (void *cls)
- {
- /* Delay actual shutdown to allow service to disconnect clients */
- in_shutdown = GNUNET_YES;
- if (0 == num_clients)
- {
- if (NULL != cadet)
- {
- GNUNET_CADET_disconnect (cadet);
- cadet = NULL;
- }
- }
- GNUNET_STATISTICS_destroy (_GSS_statistics,
- GNUNET_YES);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "handled shutdown request\n");
- }
- /**
- * Function called by the service's run
- * method to run service-specific setup code.
- *
- * @param cls closure
- * @param cfg configuration to use
- * @param service the initialized service
- */
- static void
- run (void *cls,
- const struct GNUNET_CONFIGURATION_Handle *cfg,
- struct GNUNET_SERVICE_Handle *service)
- {
- /* FIXME: need to modify SERVICE (!) API to allow
- us to run a shutdown task *after* clients were
- forcefully disconnected! */
- GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
- NULL);
- _GSS_statistics = GNUNET_STATISTICS_create ("set",
- cfg);
- cadet = GNUNET_CADET_connect (cfg);
- if (NULL == cadet)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- _("Could not connect to CADET service\n"));
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
- }
- /**
- * Define "main" method using service macro.
- */
- GNUNET_SERVICE_MAIN
- ("set",
- GNUNET_SERVICE_OPTION_NONE,
- &run,
- &client_connect_cb,
- &client_disconnect_cb,
- NULL,
- GNUNET_MQ_hd_fixed_size (client_accept,
- GNUNET_MESSAGE_TYPE_SET_ACCEPT,
- struct GNUNET_SET_AcceptMessage,
- NULL),
- GNUNET_MQ_hd_fixed_size (client_iter_ack,
- GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
- struct GNUNET_SET_IterAckMessage,
- NULL),
- GNUNET_MQ_hd_var_size (client_mutation,
- GNUNET_MESSAGE_TYPE_SET_ADD,
- struct GNUNET_SET_ElementMessage,
- NULL),
- GNUNET_MQ_hd_fixed_size (client_create_set,
- GNUNET_MESSAGE_TYPE_SET_CREATE,
- struct GNUNET_SET_CreateMessage,
- NULL),
- GNUNET_MQ_hd_fixed_size (client_iterate,
- GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
- struct GNUNET_MessageHeader,
- NULL),
- GNUNET_MQ_hd_var_size (client_evaluate,
- GNUNET_MESSAGE_TYPE_SET_EVALUATE,
- struct GNUNET_SET_EvaluateMessage,
- NULL),
- GNUNET_MQ_hd_fixed_size (client_listen,
- GNUNET_MESSAGE_TYPE_SET_LISTEN,
- struct GNUNET_SET_ListenMessage,
- NULL),
- GNUNET_MQ_hd_fixed_size (client_reject,
- GNUNET_MESSAGE_TYPE_SET_REJECT,
- struct GNUNET_SET_RejectMessage,
- NULL),
- GNUNET_MQ_hd_var_size (client_mutation,
- GNUNET_MESSAGE_TYPE_SET_REMOVE,
- struct GNUNET_SET_ElementMessage,
- NULL),
- GNUNET_MQ_hd_fixed_size (client_cancel,
- GNUNET_MESSAGE_TYPE_SET_CANCEL,
- struct GNUNET_SET_CancelMessage,
- NULL),
- GNUNET_MQ_hd_fixed_size (client_copy_lazy_prepare,
- GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE,
- struct GNUNET_MessageHeader,
- NULL),
- GNUNET_MQ_hd_fixed_size (client_copy_lazy_connect,
- GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT,
- struct GNUNET_SET_CopyLazyConnectMessage,
- NULL),
- GNUNET_MQ_handler_end ());
- /* end of gnunet-service-set.c */
|