123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430 |
- /*
- * This file is part of GNUnet
- * (C) 2013 Christian Grothoff (and other contributing authors)
- *
- * GNUnet is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published
- * by the Free Software Foundation; either version 3, or (at your
- * option) any later version.
- *
- * GNUnet is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with GNUnet; see the file COPYING. If not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
- */
- /**
- * @file psyc/gnunet-service-psyc.c
- * @brief PSYC service
- * @author Gabor X Toth
- */
- #include <inttypes.h>
- #include "platform.h"
- #include "gnunet_util_lib.h"
- #include "gnunet_constants.h"
- #include "gnunet_protocols.h"
- #include "gnunet_statistics_service.h"
- #include "gnunet_multicast_service.h"
- #include "gnunet_psycstore_service.h"
- #include "gnunet_psyc_service.h"
- #include "gnunet_psyc_util_lib.h"
- #include "psyc.h"
- /**
- * Handle to our current configuration.
- */
- static const struct GNUNET_CONFIGURATION_Handle *cfg;
- /**
- * Handle to the statistics service.
- */
- static struct GNUNET_STATISTICS_Handle *stats;
- /**
- * Notification context, simplifies client broadcasts.
- */
- static struct GNUNET_SERVER_NotificationContext *nc;
- /**
- * Handle to the PSYCstore.
- */
- static struct GNUNET_PSYCSTORE_Handle *store;
- /**
- * All connected masters.
- * Channel's pub_key_hash -> struct Master
- */
- static struct GNUNET_CONTAINER_MultiHashMap *masters;
- /**
- * All connected slaves.
- * Channel's pub_key_hash -> struct Slave
- */
- static struct GNUNET_CONTAINER_MultiHashMap *slaves;
- /**
- * Connected slaves per channel.
- * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
- */
- static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
- /**
- * Message in the transmission queue.
- */
- struct TransmitMessage
- {
- struct TransmitMessage *prev;
- struct TransmitMessage *next;
- struct GNUNET_SERVER_Client *client;
- /**
- * ID assigned to the message.
- */
- uint64_t id;
- /**
- * Size of message.
- */
- uint16_t size;
- /**
- * @see enum MessageState
- */
- uint8_t state;
- /**
- * Whether a message ACK has already been sent to the client.
- * #GNUNET_YES or #GNUNET_NO
- */
- uint8_t ack_sent;
- /* Followed by message */
- };
- /**
- * Cache for received message fragments.
- * Message fragments are only sent to clients after all modifiers arrived.
- *
- * chan_key -> MultiHashMap chan_msgs
- */
- static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
- /**
- * Entry in the chan_msgs hashmap of @a recv_cache:
- * fragment_id -> RecvCacheEntry
- */
- struct RecvCacheEntry
- {
- struct GNUNET_MULTICAST_MessageHeader *mmsg;
- uint16_t ref_count;
- };
- /**
- * Entry in the @a recv_frags hash map of a @a Channel.
- * message_id -> FragmentQueue
- */
- struct FragmentQueue
- {
- /**
- * Fragment IDs stored in @a recv_cache.
- */
- struct GNUNET_CONTAINER_Heap *fragments;
- /**
- * Total size of received fragments.
- */
- uint64_t size;
- /**
- * Total size of received header fragments (METHOD & MODIFIERs)
- */
- uint64_t header_size;
- /**
- * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
- */
- uint64_t state_delta;
- /**
- * The @a flags field from struct GNUNET_PSYC_MessageMethod.
- */
- uint32_t flags;
- /**
- * Receive state of message.
- *
- * @see MessageFragmentState
- */
- uint8_t state;
- /**
- * Is the message queued for delivery to the client?
- * i.e. added to the recv_msgs queue
- */
- uint8_t is_queued;
- };
- /**
- * List of connected clients.
- */
- struct ClientListItem
- {
- struct ClientListItem *prev;
- struct ClientListItem *next;
- struct GNUNET_SERVER_Client *client;
- };
- /**
- * Common part of the client context for both a channel master and slave.
- */
- struct Channel
- {
- struct ClientListItem *clients_head;
- struct ClientListItem *clients_tail;
- struct TransmitMessage *tmit_head;
- struct TransmitMessage *tmit_tail;
- /**
- * Current PSYCstore operation.
- */
- struct GNUNET_PSYCSTORE_OperationHandle *store_op;
- /**
- * Received fragments not yet sent to the client.
- * message_id -> FragmentQueue
- */
- struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
- /**
- * Received message IDs not yet sent to the client.
- */
- struct GNUNET_CONTAINER_Heap *recv_msgs;
- /**
- * Public key of the channel.
- */
- struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
- /**
- * Hash of @a pub_key.
- */
- struct GNUNET_HashCode pub_key_hash;
- /**
- * Last message ID sent to the client.
- * 0 if there is no such message.
- */
- uint64_t max_message_id;
- /**
- * ID of the last stateful message, where the state operations has been
- * processed and saved to PSYCstore and which has been sent to the client.
- * 0 if there is no such message.
- */
- uint64_t max_state_message_id;
- /**
- * Expected value size for the modifier being received from the PSYC service.
- */
- uint32_t tmit_mod_value_size_expected;
- /**
- * Actual value size for the modifier being received from the PSYC service.
- */
- uint32_t tmit_mod_value_size;
- /**
- * @see enum MessageState
- */
- uint8_t tmit_state;
- /**
- * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
- */
- uint8_t is_master;
- /**
- * Is this channel ready to receive messages from client?
- * #GNUNET_YES or #GNUNET_NO
- */
- uint8_t is_ready;
- /**
- * Is the client disconnected?
- * #GNUNET_YES or #GNUNET_NO
- */
- uint8_t is_disconnected;
- };
- /**
- * Client context for a channel master.
- */
- struct Master
- {
- /**
- * Channel struct common for Master and Slave
- */
- struct Channel chn;
- /**
- * Private key of the channel.
- */
- struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
- /**
- * Handle for the multicast origin.
- */
- struct GNUNET_MULTICAST_Origin *origin;
- /**
- * Transmit handle for multicast.
- */
- struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
- /**
- * Incoming join requests from multicast.
- * member_key -> struct GNUNET_MULTICAST_JoinHandle *
- */
- struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
- /**
- * Last message ID transmitted to this channel.
- *
- * Incremented before sending a message, thus the message_id in messages sent
- * starts from 1.
- */
- uint64_t max_message_id;
- /**
- * ID of the last message with state operations transmitted to the channel.
- * 0 if there is no such message.
- */
- uint64_t max_state_message_id;
- /**
- * Maximum group generation transmitted to the channel.
- */
- uint64_t max_group_generation;
- /**
- * @see enum GNUNET_PSYC_Policy
- */
- enum GNUNET_PSYC_Policy policy;
- };
- /**
- * Client context for a channel slave.
- */
- struct Slave
- {
- /**
- * Channel struct common for Master and Slave
- */
- struct Channel chn;
- /**
- * Private key of the slave.
- */
- struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
- /**
- * Public key of the slave.
- */
- struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
- /**
- * Hash of @a pub_key.
- */
- struct GNUNET_HashCode pub_key_hash;
- /**
- * Handle for the multicast member.
- */
- struct GNUNET_MULTICAST_Member *member;
- /**
- * Transmit handle for multicast.
- */
- struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
- /**
- * Peer identity of the origin.
- */
- struct GNUNET_PeerIdentity origin;
- /**
- * Number of items in @a relays.
- */
- uint32_t relay_count;
- /**
- * Relays that multicast can use to connect.
- */
- struct GNUNET_PeerIdentity *relays;
- /**
- * Join request to be transmitted to the master on join.
- */
- struct GNUNET_PSYC_Message *join_msg;
- /**
- * Join decision received from multicast.
- */
- struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
- /**
- * Maximum request ID for this channel.
- */
- uint64_t max_request_id;
- };
- struct OperationClosure
- {
- struct GNUNET_SERVER_Client *client;
- struct Channel *chn;
- uint64_t op_id;
- };
- static void
- transmit_message (struct Channel *chn);
- static uint64_t
- message_queue_drop (struct Channel *chn);
- /**
- * Task run during shutdown.
- *
- * @param cls unused
- * @param tc unused
- */
- static void
- shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
- {
- if (NULL != nc)
- {
- GNUNET_SERVER_notification_context_destroy (nc);
- nc = NULL;
- }
- if (NULL != stats)
- {
- GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
- stats = NULL;
- }
- }
- /**
- * Clean up master data structures after a client disconnected.
- */
- static void
- cleanup_master (struct Master *mst)
- {
- struct Channel *chn = &mst->chn;
- if (NULL != mst->origin)
- GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
- GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
- GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn);
- }
- /**
- * Clean up slave data structures after a client disconnected.
- */
- static void
- cleanup_slave (struct Slave *slv)
- {
- struct Channel *chn = &slv->chn;
- struct GNUNET_CONTAINER_MultiHashMap *
- chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
- &chn->pub_key_hash);
- GNUNET_assert (NULL != chn_slv);
- GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
- if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
- {
- GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
- chn_slv);
- GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
- }
- GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
- if (NULL != slv->join_msg)
- {
- GNUNET_free (slv->join_msg);
- slv->join_msg = NULL;
- }
- if (NULL != slv->relays)
- {
- GNUNET_free (slv->relays);
- slv->relays = NULL;
- }
- if (NULL != slv->member)
- {
- GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
- slv->member = NULL;
- }
- GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn);
- }
- /**
- * Clean up channel data structures after a client disconnected.
- */
- static void
- cleanup_channel (struct Channel *chn)
- {
- message_queue_drop (chn);
- GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash);
- if (NULL != chn->store_op)
- {
- GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
- chn->store_op = NULL;
- }
- (GNUNET_YES == chn->is_master)
- ? cleanup_master ((struct Master *) chn)
- : cleanup_slave ((struct Slave *) chn);
- GNUNET_free (chn);
- }
- /**
- * Called whenever a client is disconnected.
- * Frees our resources associated with that client.
- *
- * @param cls Closure.
- * @param client Identification of the client.
- */
- static void
- client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
- {
- if (NULL == client)
- return;
- struct Channel *
- chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
- if (NULL == chn)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p User context is NULL in client_disconnect()\n", chn);
- GNUNET_break (0);
- return;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Client (%s) disconnected from channel %s\n",
- chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
- GNUNET_h2s (&chn->pub_key_hash));
- struct ClientListItem *cli = chn->clients_head;
- while (NULL != cli)
- {
- if (cli->client == client)
- {
- GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
- GNUNET_free (cli);
- break;
- }
- cli = cli->next;
- }
- if (NULL == chn->clients_head)
- { /* Last client disconnected. */
- if (NULL != chn->tmit_head)
- { /* Send pending messages to multicast before cleanup. */
- transmit_message (chn);
- }
- else
- {
- cleanup_channel (chn);
- }
- }
- }
- /**
- * Send message to all clients connected to the channel.
- */
- static void
- client_send_msg (const struct Channel *chn,
- const struct GNUNET_MessageHeader *msg)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Sending message to clients.\n", chn);
- struct ClientListItem *cli = chn->clients_head;
- while (NULL != cli)
- {
- GNUNET_SERVER_notification_context_add (nc, cli->client);
- GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
- cli = cli->next;
- }
- }
- /**
- * Send a result code back to the client.
- *
- * @param client
- * Client that should receive the result code.
- * @param result_code
- * Code to transmit.
- * @param op_id
- * Operation ID in network byte order.
- * @param err_msg
- * Error message to include (or NULL for none).
- */
- static void
- client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
- int64_t result_code, const char *err_msg)
- {
- struct OperationResult *res;
- size_t err_size = 0;
- if (NULL != err_msg)
- err_size = strnlen (err_msg,
- GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (*res)) + 1;
- res = GNUNET_malloc (sizeof (struct OperationResult) + err_size);
- res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
- res->header.size = htons (sizeof (struct OperationResult) + err_size);
- res->result_code = GNUNET_htonll (result_code + INT64_MAX + 1);
- res->op_id = op_id;
- if (0 < err_size)
- {
- memcpy (&res[1], err_msg, err_size);
- ((char *) &res[1])[err_size - 1] = '\0';
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Sending result to client for operation #%" PRIu64 ": "
- "%" PRId64 " (%s)\n",
- client, GNUNET_ntohll (op_id), result_code, err_msg);
- GNUNET_SERVER_notification_context_add (nc, client);
- GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
- GNUNET_NO);
- GNUNET_free (res);
- }
- /**
- * Closure for join_mem_test_cb()
- */
- struct JoinMemTestClosure
- {
- struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
- struct Channel *chn;
- struct GNUNET_MULTICAST_JoinHandle *jh;
- struct GNUNET_PSYC_JoinRequestMessage *join_msg;
- };
- /**
- * Membership test result callback used for join requests.
- */
- static void
- join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
- {
- struct JoinMemTestClosure *jcls = cls;
- if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
- { /* Pass on join request to client if this is a master channel */
- struct Master *mst = (struct Master *) jcls->chn;
- struct GNUNET_HashCode slave_key_hash;
- GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
- &slave_key_hash);
- GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- client_send_msg (jcls->chn, &jcls->join_msg->header);
- }
- else
- {
- // FIXME: add relays
- GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
- }
- GNUNET_free (jcls->join_msg);
- GNUNET_free (jcls);
- }
- /**
- * Incoming join request from multicast.
- */
- static void
- mcast_recv_join_request (void *cls,
- const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
- const struct GNUNET_MessageHeader *join_msg,
- struct GNUNET_MULTICAST_JoinHandle *jh)
- {
- struct Channel *chn = cls;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn);
- uint16_t join_msg_size = 0;
- if (NULL != join_msg)
- {
- if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
- {
- join_msg_size = ntohs (join_msg->size);
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "%p Got join message with invalid type %u.\n",
- chn, ntohs (join_msg->type));
- }
- }
- struct GNUNET_PSYC_JoinRequestMessage *
- req = GNUNET_malloc (sizeof (*req) + join_msg_size);
- req->header.size = htons (sizeof (*req) + join_msg_size);
- req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
- req->slave_key = *slave_key;
- if (0 < join_msg_size)
- memcpy (&req[1], join_msg, join_msg_size);
- struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
- jcls->slave_key = *slave_key;
- jcls->chn = chn;
- jcls->jh = jh;
- jcls->join_msg = req;
- GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
- chn->max_message_id, 0,
- &join_mem_test_cb, jcls);
- }
- /**
- * Join decision received from multicast.
- */
- static void
- mcast_recv_join_decision (void *cls, int is_admitted,
- const struct GNUNET_PeerIdentity *peer,
- uint16_t relay_count,
- const struct GNUNET_PeerIdentity *relays,
- const struct GNUNET_MessageHeader *join_resp)
- {
- struct Slave *slv = cls;
- struct Channel *chn = &slv->chn;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Got join decision: %d\n", slv, is_admitted);
- uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
- struct GNUNET_PSYC_JoinDecisionMessage *
- dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
- dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
- dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
- dcsn->is_admitted = htonl (is_admitted);
- if (0 < join_resp_size)
- memcpy (&dcsn[1], join_resp, join_resp_size);
- client_send_msg (chn, &dcsn->header);
- if (GNUNET_YES == is_admitted)
- {
- chn->is_ready = GNUNET_YES;
- }
- else
- {
- slv->member = NULL;
- }
- }
- /**
- * Received result of GNUNET_PSYCSTORE_membership_test()
- */
- static void
- store_recv_membership_test_result (void *cls, int64_t result, const char *err_msg)
- {
- struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p GNUNET_PSYCSTORE_membership_test() returned %" PRId64 " (%s)\n",
- mth, result, err_msg);
- GNUNET_MULTICAST_membership_test_result (mth, result);
- }
- /**
- * Incoming membership test request from multicast.
- */
- static void
- mcast_recv_membership_test (void *cls,
- const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
- uint64_t message_id, uint64_t group_generation,
- struct GNUNET_MULTICAST_MembershipTestHandle *mth)
- {
- struct Channel *chn = cls;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Received membership test request from multicast.\n",
- mth);
- GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
- message_id, group_generation,
- &store_recv_membership_test_result, mth);
- }
- static int
- store_recv_fragment_replay (void *cls,
- struct GNUNET_MULTICAST_MessageHeader *msg,
- enum GNUNET_PSYCSTORE_MessageFlags flags)
- {
- struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
- GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
- return GNUNET_YES;
- }
- /**
- * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
- */
- static void
- store_recv_fragment_replay_result (void *cls, int64_t result, const char *err_msg)
- {
- struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%s)\n",
- rh, result, err_msg);
- switch (result)
- {
- case GNUNET_YES:
- break;
- case GNUNET_NO:
- GNUNET_MULTICAST_replay_response (rh, NULL,
- GNUNET_MULTICAST_REC_NOT_FOUND);
- break;
- case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
- GNUNET_MULTICAST_replay_response (rh, NULL,
- GNUNET_MULTICAST_REC_ACCESS_DENIED);
- break;
- case GNUNET_SYSERR:
- GNUNET_MULTICAST_replay_response (rh, NULL,
- GNUNET_MULTICAST_REC_INTERNAL_ERROR);
- break;
- }
- GNUNET_MULTICAST_replay_response_end (rh);
- }
- /**
- * Incoming fragment replay request from multicast.
- */
- static void
- mcast_recv_replay_fragment (void *cls,
- const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
- uint64_t fragment_id, uint64_t flags,
- struct GNUNET_MULTICAST_ReplayHandle *rh)
- {
- struct Channel *chn = cls;
- GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_key,
- fragment_id, fragment_id,
- &store_recv_fragment_replay,
- &store_recv_fragment_replay_result, rh);
- }
- /**
- * Incoming message replay request from multicast.
- */
- static void
- mcast_recv_replay_message (void *cls,
- const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
- uint64_t message_id,
- uint64_t fragment_offset,
- uint64_t flags,
- struct GNUNET_MULTICAST_ReplayHandle *rh)
- {
- struct Channel *chn = cls;
- GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key,
- message_id, message_id,
- &store_recv_fragment_replay,
- &store_recv_fragment_replay_result, rh);
- }
- /**
- * Convert an uint64_t in network byte order to a HashCode
- * that can be used as key in a MultiHashMap
- */
- static inline void
- hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
- {
- /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
- /* TODO: use built-in byte swap functions if available */
- n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
- n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
- *key = (struct GNUNET_HashCode) {};
- *((uint64_t *) key)
- = (n << 32) | (n >> 32);
- }
- /**
- * Convert an uint64_t in host byte order to a HashCode
- * that can be used as key in a MultiHashMap
- */
- static inline void
- hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
- {
- #if __BYTE_ORDER == __BIG_ENDIAN
- hash_key_from_nll (key, n);
- #elif __BYTE_ORDER == __LITTLE_ENDIAN
- *key = (struct GNUNET_HashCode) {};
- *((uint64_t *) key) = n;
- #else
- #error byteorder undefined
- #endif
- }
- /**
- * Send multicast message to all clients connected to the channel.
- */
- static void
- client_send_mcast_msg (struct Channel *chn,
- const struct GNUNET_MULTICAST_MessageHeader *mmsg,
- uint32_t flags)
- {
- struct GNUNET_PSYC_MessageHeader *pmsg;
- uint16_t size = ntohs (mmsg->header.size);
- uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Sending multicast message to client. "
- "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
- chn, GNUNET_ntohll (mmsg->fragment_id),
- GNUNET_ntohll (mmsg->message_id));
- pmsg = GNUNET_malloc (psize);
- pmsg->header.size = htons (psize);
- pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
- pmsg->message_id = mmsg->message_id;
- pmsg->fragment_offset = mmsg->fragment_offset;
- pmsg->flags = htonl (flags);
- memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
- client_send_msg (chn, &pmsg->header);
- GNUNET_free (pmsg);
- }
- /**
- * Send multicast request to all clients connected to the channel.
- */
- static void
- client_send_mcast_req (struct Master *mst,
- const struct GNUNET_MULTICAST_RequestHeader *req)
- {
- struct Channel *chn = &mst->chn;
- struct GNUNET_PSYC_MessageHeader *pmsg;
- uint16_t size = ntohs (req->header.size);
- uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Sending multicast request to client. "
- "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
- chn, GNUNET_ntohll (req->fragment_id),
- GNUNET_ntohll (req->request_id));
- pmsg = GNUNET_malloc (psize);
- pmsg->header.size = htons (psize);
- pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
- pmsg->message_id = req->request_id;
- pmsg->fragment_offset = req->fragment_offset;
- pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
- memcpy (&pmsg[1], &req[1], size - sizeof (*req));
- client_send_msg (chn, &pmsg->header);
- GNUNET_free (pmsg);
- }
- /**
- * Insert a multicast message fragment into the queue belonging to the message.
- *
- * @param chn Channel.
- * @param mmsg Multicast message fragment.
- * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
- * @param first_ptype First PSYC message part type in @a mmsg.
- * @param last_ptype Last PSYC message part type in @a mmsg.
- */
- static void
- fragment_queue_insert (struct Channel *chn,
- const struct GNUNET_MULTICAST_MessageHeader *mmsg,
- uint16_t first_ptype, uint16_t last_ptype)
- {
- const uint16_t size = ntohs (mmsg->header.size);
- const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
- struct GNUNET_CONTAINER_MultiHashMap
- *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
- &chn->pub_key_hash);
- struct GNUNET_HashCode msg_id_hash;
- hash_key_from_nll (&msg_id_hash, mmsg->message_id);
- struct FragmentQueue
- *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
- if (NULL == fragq)
- {
- fragq = GNUNET_new (struct FragmentQueue);
- fragq->state = MSG_FRAG_STATE_HEADER;
- fragq->fragments
- = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
- GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
- if (NULL == chan_msgs)
- {
- chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
- GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
- }
- }
- struct GNUNET_HashCode frag_id_hash;
- hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
- struct RecvCacheEntry
- *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
- if (NULL == cache_entry)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Adding message fragment to cache. "
- "message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
- chn, GNUNET_ntohll (mmsg->message_id),
- GNUNET_ntohll (mmsg->fragment_id));
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p header_size: %" PRIu64 " + %u\n",
- chn, fragq->header_size, size);
- cache_entry = GNUNET_new (struct RecvCacheEntry);
- cache_entry->ref_count = 1;
- cache_entry->mmsg = GNUNET_malloc (size);
- memcpy (cache_entry->mmsg, mmsg, size);
- GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
- }
- else
- {
- cache_entry->ref_count++;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Message fragment is already in cache. "
- "message_id: %" PRIu64 ", fragment_id: %" PRIu64
- ", ref_count: %u\n",
- chn, GNUNET_ntohll (mmsg->message_id),
- GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
- }
- if (MSG_FRAG_STATE_HEADER == fragq->state)
- {
- if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
- {
- struct GNUNET_PSYC_MessageMethod *
- pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
- fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
- fragq->flags = ntohl (pmeth->flags);
- }
- if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
- {
- fragq->header_size += size;
- }
- else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
- || frag_offset == fragq->header_size)
- { /* header is now complete */
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Header of message %" PRIu64 " is complete.\n",
- chn, GNUNET_ntohll (mmsg->message_id));
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Adding message %" PRIu64 " to queue.\n",
- chn, GNUNET_ntohll (mmsg->message_id));
- fragq->state = MSG_FRAG_STATE_DATA;
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Header of message %" PRIu64 " is NOT complete yet: "
- "%" PRIu64 " != %" PRIu64 "\n",
- chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
- fragq->header_size);
- }
- }
- switch (last_ptype)
- {
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
- if (frag_offset == fragq->size)
- fragq->state = MSG_FRAG_STATE_END;
- else
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Message %" PRIu64 " is NOT complete yet: "
- "%" PRIu64 " != %" PRIu64 "\n",
- chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
- fragq->size);
- break;
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
- /* Drop message without delivering to client if it's a single fragment */
- fragq->state =
- (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
- ? MSG_FRAG_STATE_DROP
- : MSG_FRAG_STATE_CANCEL;
- }
- switch (fragq->state)
- {
- case MSG_FRAG_STATE_DATA:
- case MSG_FRAG_STATE_END:
- case MSG_FRAG_STATE_CANCEL:
- if (GNUNET_NO == fragq->is_queued)
- {
- GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
- GNUNET_ntohll (mmsg->message_id));
- fragq->is_queued = GNUNET_YES;
- }
- }
- fragq->size += size;
- GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
- GNUNET_ntohll (mmsg->fragment_id));
- }
- /**
- * Run fragment queue of a message.
- *
- * Send fragments of a message in order to client, after all modifiers arrived
- * from multicast.
- *
- * @param chn Channel.
- * @param msg_id ID of the message @a fragq belongs to.
- * @param fragq Fragment queue of the message.
- * @param drop Drop message without delivering to client?
- * #GNUNET_YES or #GNUNET_NO.
- */
- static void
- fragment_queue_run (struct Channel *chn, uint64_t msg_id,
- struct FragmentQueue *fragq, uint8_t drop)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Running message fragment queue for message %" PRIu64
- " (state: %u).\n",
- chn, msg_id, fragq->state);
- struct GNUNET_CONTAINER_MultiHashMap
- *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
- &chn->pub_key_hash);
- GNUNET_assert (NULL != chan_msgs);
- uint64_t frag_id;
- while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
- &frag_id))
- {
- struct GNUNET_HashCode frag_id_hash;
- hash_key_from_hll (&frag_id_hash, frag_id);
- struct RecvCacheEntry *cache_entry
- = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
- if (cache_entry != NULL)
- {
- if (GNUNET_NO == drop)
- {
- client_send_mcast_msg (chn, cache_entry->mmsg, 0);
- }
- if (cache_entry->ref_count <= 1)
- {
- GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
- cache_entry);
- GNUNET_free (cache_entry->mmsg);
- GNUNET_free (cache_entry);
- }
- else
- {
- cache_entry->ref_count--;
- }
- }
- #if CACHE_AGING_IMPLEMENTED
- else if (GNUNET_NO == drop)
- {
- /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
- }
- #endif
- GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
- }
- if (MSG_FRAG_STATE_END <= fragq->state)
- {
- struct GNUNET_HashCode msg_id_hash;
- hash_key_from_hll (&msg_id_hash, msg_id);
- GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
- GNUNET_CONTAINER_heap_destroy (fragq->fragments);
- GNUNET_free (fragq);
- }
- else
- {
- fragq->is_queued = GNUNET_NO;
- }
- }
- /**
- * Run message queue.
- *
- * Send messages in queue to client in order after a message has arrived from
- * multicast, according to the following:
- * - A message is only sent if all of its modifiers arrived.
- * - A stateful message is only sent if the previous stateful message
- * has already been delivered to the client.
- *
- * @param chn Channel.
- *
- * @return Number of messages removed from queue and sent to client.
- */
- static uint64_t
- message_queue_run (struct Channel *chn)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Running message queue.\n", chn);
- uint64_t n = 0;
- uint64_t msg_id;
- while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
- &msg_id))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
- struct GNUNET_HashCode msg_id_hash;
- hash_key_from_hll (&msg_id_hash, msg_id);
- struct FragmentQueue *
- fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
- if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p No fragq (%p) or header not complete.\n",
- chn, fragq);
- break;
- }
- if (MSG_FRAG_STATE_HEADER == fragq->state)
- {
- /* Check if there's a missing message before the current one */
- if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
- {
- if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
- && msg_id - 1 != chn->max_message_id)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Out of order message. "
- "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
- chn, msg_id, chn->max_message_id);
- break;
- }
- }
- else
- {
- if (msg_id - fragq->state_delta != chn->max_state_message_id)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Out of order stateful message. "
- "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
- chn, msg_id, fragq->state_delta, chn->max_state_message_id);
- break;
- }
- #if TODO
- /* FIXME: apply modifiers to state in PSYCstore */
- GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, message_id,
- store_recv_state_modify_result, cls);
- #endif
- chn->max_state_message_id = msg_id;
- }
- chn->max_message_id = msg_id;
- }
- fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
- GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
- n++;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
- return n;
- }
- /**
- * Drop message queue of a channel.
- *
- * Remove all messages in queue without sending it to clients.
- *
- * @param chn Channel.
- *
- * @return Number of messages removed from queue.
- */
- static uint64_t
- message_queue_drop (struct Channel *chn)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Dropping message queue.\n", chn);
- uint64_t n = 0;
- uint64_t msg_id;
- while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
- &msg_id))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
- struct GNUNET_HashCode msg_id_hash;
- hash_key_from_hll (&msg_id_hash, msg_id);
- struct FragmentQueue *
- fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
- fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
- GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
- n++;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
- return n;
- }
- /**
- * Received result of GNUNET_PSYCSTORE_fragment_store().
- */
- static void
- store_recv_fragment_store_result (void *cls, int64_t result, const char *err_msg)
- {
- struct Channel *chn = cls;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%s)\n",
- chn, result, err_msg);
- }
- /**
- * Handle incoming message fragment from multicast.
- *
- * Store it using PSYCstore and send it to the clients of the channel in order.
- */
- static void
- mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
- {
- struct Channel *chn = cls;
- uint16_t size = ntohs (mmsg->header.size);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Received multicast message of size %u.\n",
- chn, size);
- GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
- &store_recv_fragment_store_result, chn);
- uint16_t first_ptype = 0, last_ptype = 0;
- if (GNUNET_SYSERR
- == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
- (const char *) &mmsg[1],
- &first_ptype, &last_ptype))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Dropping incoming multicast message with invalid parts.\n",
- chn);
- GNUNET_break_op (0);
- return;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Message parts: first: type %u, last: type %u\n",
- first_ptype, last_ptype);
- fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
- message_queue_run (chn);
- }
- /**
- * Incoming request fragment from multicast for a master.
- *
- * @param cls Master.
- * @param req The request.
- */
- static void
- mcast_recv_request (void *cls,
- const struct GNUNET_MULTICAST_RequestHeader *req)
- {
- struct Master *mst = cls;
- uint16_t size = ntohs (req->header.size);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Received multicast request of size %u.\n",
- mst, size);
- uint16_t first_ptype = 0, last_ptype = 0;
- if (GNUNET_SYSERR
- == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
- (const char *) &req[1],
- &first_ptype, &last_ptype))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Dropping incoming multicast request with invalid parts.\n",
- mst);
- GNUNET_break_op (0);
- return;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Message parts: first: type %u, last: type %u\n",
- first_ptype, last_ptype);
- /* FIXME: in-order delivery */
- client_send_mcast_req (mst, req);
- }
- /**
- * Response from PSYCstore with the current counter values for a channel master.
- */
- static void
- store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
- uint64_t max_message_id, uint64_t max_group_generation,
- uint64_t max_state_message_id)
- {
- struct Master *mst = cls;
- struct Channel *chn = &mst->chn;
- chn->store_op = NULL;
- struct GNUNET_PSYC_CountersResultMessage res;
- res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
- res.header.size = htons (sizeof (res));
- res.result_code = htonl (result - INT32_MIN);
- res.max_message_id = GNUNET_htonll (max_message_id);
- if (GNUNET_OK == result || GNUNET_NO == result)
- {
- mst->max_message_id = max_message_id;
- chn->max_message_id = max_message_id;
- chn->max_state_message_id = max_state_message_id;
- mst->max_group_generation = max_group_generation;
- mst->origin
- = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
- &mcast_recv_join_request,
- &mcast_recv_membership_test,
- &mcast_recv_replay_fragment,
- &mcast_recv_replay_message,
- &mcast_recv_request,
- &mcast_recv_message, chn);
- chn->is_ready = GNUNET_YES;
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "%p GNUNET_PSYCSTORE_counters_get() "
- "returned %d for channel %s.\n",
- chn, result, GNUNET_h2s (&chn->pub_key_hash));
- }
- client_send_msg (chn, &res.header);
- }
- /**
- * Response from PSYCstore with the current counter values for a channel slave.
- */
- void
- store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
- uint64_t max_message_id, uint64_t max_group_generation,
- uint64_t max_state_message_id)
- {
- struct Slave *slv = cls;
- struct Channel *chn = &slv->chn;
- chn->store_op = NULL;
- struct GNUNET_PSYC_CountersResultMessage res;
- res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
- res.header.size = htons (sizeof (res));
- res.result_code = htonl (result - INT32_MIN);
- res.max_message_id = GNUNET_htonll (max_message_id);
- if (GNUNET_OK == result || GNUNET_NO == result)
- {
- chn->max_message_id = max_message_id;
- chn->max_state_message_id = max_state_message_id;
- slv->member
- = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
- &slv->origin,
- slv->relay_count, slv->relays,
- &slv->join_msg->header,
- &mcast_recv_join_request,
- &mcast_recv_join_decision,
- &mcast_recv_membership_test,
- &mcast_recv_replay_fragment,
- &mcast_recv_replay_message,
- &mcast_recv_message, chn);
- if (NULL != slv->join_msg)
- {
- GNUNET_free (slv->join_msg);
- slv->join_msg = NULL;
- }
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "%p GNUNET_PSYCSTORE_counters_get() "
- "returned %d for channel %s.\n",
- chn, result, GNUNET_h2s (&chn->pub_key_hash));
- }
- client_send_msg (chn, &res.header);
- }
- static void
- channel_init (struct Channel *chn)
- {
- chn->recv_msgs
- = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
- chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
- }
- /**
- * Handle a connecting client starting a channel master.
- */
- static void
- client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
- {
- const struct MasterStartRequest *req
- = (const struct MasterStartRequest *) msg;
- struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
- struct GNUNET_HashCode pub_key_hash;
- GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
- GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
- struct Master *
- mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
- struct Channel *chn;
- if (NULL == mst)
- {
- mst = GNUNET_new (struct Master);
- mst->policy = ntohl (req->policy);
- mst->priv_key = req->channel_key;
- mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
- chn = &mst->chn;
- chn->is_master = GNUNET_YES;
- chn->pub_key = pub_key;
- chn->pub_key_hash = pub_key_hash;
- channel_init (chn);
- GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
- store_recv_master_counters, mst);
- }
- else
- {
- chn = &mst->chn;
- struct GNUNET_PSYC_CountersResultMessage res;
- res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
- res.header.size = htons (sizeof (res));
- res.result_code = htonl ((uint32_t) GNUNET_OK + INT32_MIN);
- res.max_message_id = GNUNET_htonll (mst->max_message_id);
- GNUNET_SERVER_notification_context_add (nc, client);
- GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
- GNUNET_NO);
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Client connected as master to channel %s.\n",
- mst, GNUNET_h2s (&chn->pub_key_hash));
- struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
- cli->client = client;
- GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
- GNUNET_SERVER_client_set_user_context (client, chn);
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
- }
- /**
- * Handle a connecting client joining as a channel slave.
- */
- static void
- client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
- {
- const struct SlaveJoinRequest *req
- = (const struct SlaveJoinRequest *) msg;
- uint16_t req_size = ntohs (req->header.size);
- struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
- struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
- GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
- GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
- GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
- struct GNUNET_CONTAINER_MultiHashMap *
- chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
- struct Slave *slv = NULL;
- struct Channel *chn;
- if (NULL != chn_slv)
- {
- slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_key_hash);
- }
- if (NULL == slv)
- {
- slv = GNUNET_new (struct Slave);
- slv->priv_key = req->slave_key;
- slv->pub_key = slv_pub_key;
- slv->pub_key_hash = slv_pub_key_hash;
- slv->origin = req->origin;
- slv->relay_count = ntohl (req->relay_count);
- const struct GNUNET_PeerIdentity *
- relays = (const struct GNUNET_PeerIdentity *) &req[1];
- uint16_t relay_size = slv->relay_count * sizeof (*relays);
- uint16_t join_msg_size = 0;
- if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
- <= req_size)
- {
- join_msg_size = ntohs (slv->join_msg->header.size);
- slv->join_msg = GNUNET_malloc (join_msg_size);
- memcpy (slv->join_msg, ((char *) &req[1]) + relay_size, join_msg_size);
- }
- if (sizeof (*req) + relay_size + join_msg_size != req_size)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "%u + %u + %u != %u\n",
- sizeof (*req), relay_size, join_msg_size, req_size);
- GNUNET_break (0);
- GNUNET_SERVER_client_disconnect (client);
- return;
- }
- if (0 < slv->relay_count)
- {
- slv->relays = GNUNET_malloc (relay_size);
- memcpy (slv->relays, &req[1], relay_size);
- }
- chn = &slv->chn;
- chn->is_master = GNUNET_NO;
- chn->pub_key = req->channel_key;
- chn->pub_key_hash = pub_key_hash;
- channel_init (chn);
- if (NULL == chn_slv)
- {
- chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
- GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
- }
- GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
- GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
- &store_recv_slave_counters, slv);
- }
- else
- {
- chn = &slv->chn;
- struct GNUNET_PSYC_CountersResultMessage res;
- res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
- res.header.size = htons (sizeof (res));
- res.result_code = htonl ((uint32_t) GNUNET_OK - INT32_MIN);
- res.max_message_id = GNUNET_htonll (chn->max_message_id);
- GNUNET_SERVER_notification_context_add (nc, client);
- GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
- GNUNET_NO);
- if (NULL == slv->member)
- {
- slv->member
- = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
- &slv->origin,
- slv->relay_count, slv->relays,
- &slv->join_msg->header,
- &mcast_recv_join_request,
- &mcast_recv_join_decision,
- &mcast_recv_membership_test,
- &mcast_recv_replay_fragment,
- &mcast_recv_replay_message,
- &mcast_recv_message, chn);
- if (NULL != slv->join_msg)
- {
- GNUNET_free (slv->join_msg);
- slv->join_msg = NULL;
- }
- }
- else if (NULL != slv->join_dcsn)
- {
- GNUNET_SERVER_notification_context_add (nc, client);
- GNUNET_SERVER_notification_context_unicast (nc, client,
- &slv->join_dcsn->header,
- GNUNET_NO);
- }
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Client connected as slave to channel %s.\n",
- slv, GNUNET_h2s (&chn->pub_key_hash));
- struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
- cli->client = client;
- GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
- GNUNET_SERVER_client_set_user_context (client, chn);
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
- }
- struct JoinDecisionClosure
- {
- int32_t is_admitted;
- struct GNUNET_MessageHeader *msg;
- };
- /**
- * Iterator callback for sending join decisions to multicast.
- */
- static int
- mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
- void *value)
- {
- struct JoinDecisionClosure *jcls = cls;
- struct GNUNET_MULTICAST_JoinHandle *jh = value;
- // FIXME: add relays
- GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
- return GNUNET_YES;
- }
- /**
- * Join decision from client.
- */
- static void
- client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
- {
- struct Channel *
- chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
- GNUNET_assert (GNUNET_YES == chn->is_master);
- struct Master *mst = (struct Master *) chn;
- struct GNUNET_PSYC_JoinDecisionMessage *
- dcsn = (struct GNUNET_PSYC_JoinDecisionMessage *) msg;
- struct JoinDecisionClosure jcls;
- jcls.is_admitted = ntohl (dcsn->is_admitted);
- jcls.msg
- = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
- ? (struct GNUNET_MessageHeader *) &dcsn[1]
- : NULL;
- struct GNUNET_HashCode slave_key_hash;
- GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key),
- &slave_key_hash);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Got join decision (%d) from client for channel %s..\n",
- mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p ..and slave %s.\n",
- mst, GNUNET_h2s (&slave_key_hash));
- GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
- &mcast_send_join_decision, &jcls);
- GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash);
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
- }
- /**
- * Send acknowledgement to a client.
- *
- * Sent after a message fragment has been passed on to multicast.
- *
- * @param chn The channel struct for the client.
- */
- static void
- send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
- {
- struct GNUNET_MessageHeader res;
- res.size = htons (sizeof (res));
- res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
- /* FIXME */
- GNUNET_SERVER_notification_context_add (nc, client);
- GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
- }
- /**
- * Callback for the transmit functions of multicast.
- */
- static int
- transmit_notify (void *cls, size_t *data_size, void *data)
- {
- struct Channel *chn = cls;
- struct TransmitMessage *tmit_msg = chn->tmit_head;
- if (NULL == tmit_msg || *data_size < tmit_msg->size)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p transmit_notify: nothing to send.\n", chn);
- *data_size = 0;
- return GNUNET_NO;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
- *data_size = tmit_msg->size;
- memcpy (data, &tmit_msg[1], *data_size);
- int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES;
- if (NULL != tmit_msg->client && GNUNET_NO == tmit_msg->ack_sent)
- send_message_ack (chn, tmit_msg->client);
- GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
- GNUNET_free (tmit_msg);
- if (NULL != chn->tmit_head)
- {
- transmit_message (chn);
- }
- else if (GNUNET_YES == chn->is_disconnected)
- {
- /* FIXME: handle partial message (when still in_transmit) */
- cleanup_channel (chn);
- }
- return ret;
- }
- /**
- * Callback for the transmit functions of multicast.
- */
- static int
- master_transmit_notify (void *cls, size_t *data_size, void *data)
- {
- int ret = transmit_notify (cls, data_size, data);
- if (GNUNET_YES == ret)
- {
- struct Master *mst = cls;
- mst->tmit_handle = NULL;
- }
- return ret;
- }
- /**
- * Callback for the transmit functions of multicast.
- */
- static int
- slave_transmit_notify (void *cls, size_t *data_size, void *data)
- {
- int ret = transmit_notify (cls, data_size, data);
- if (GNUNET_YES == ret)
- {
- struct Slave *slv = cls;
- slv->tmit_handle = NULL;
- }
- return ret;
- }
- /**
- * Transmit a message from a channel master to the multicast group.
- */
- static void
- master_transmit_message (struct Master *mst)
- {
- if (NULL == mst->tmit_handle)
- {
- mst->tmit_handle
- = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
- mst->max_group_generation,
- master_transmit_notify, mst);
- }
- else
- {
- GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
- }
- }
- /**
- * Transmit a message from a channel slave to the multicast group.
- */
- static void
- slave_transmit_message (struct Slave *slv)
- {
- if (NULL == slv->tmit_handle)
- {
- slv->tmit_handle
- = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
- slave_transmit_notify, slv);
- }
- else
- {
- GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
- }
- }
- static void
- transmit_message (struct Channel *chn)
- {
- chn->is_master
- ? master_transmit_message ((struct Master *) chn)
- : slave_transmit_message ((struct Slave *) chn);
- }
- /**
- * Queue a message from a channel master for sending to the multicast group.
- */
- static void
- master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
- uint16_t first_ptype, uint16_t last_ptype)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst);
- if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
- {
- tmit_msg->id = ++mst->max_message_id;
- struct GNUNET_PSYC_MessageMethod *pmeth
- = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
- if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
- {
- pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
- }
- else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
- {
- pmeth->state_delta = GNUNET_htonll (tmit_msg->id
- - mst->max_state_message_id);
- }
- else
- {
- pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
- }
- }
- }
- /**
- * Queue a message from a channel slave for sending to the multicast group.
- */
- static void
- slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
- uint16_t first_ptype, uint16_t last_ptype)
- {
- if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
- {
- struct GNUNET_PSYC_MessageMethod *pmeth
- = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
- pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
- tmit_msg->id = ++slv->max_request_id;
- }
- }
- /**
- * Queue PSYC message parts for sending to multicast.
- *
- * @param chn Channel to send to.
- * @param client Client the message originates from.
- * @param data_size Size of @a data.
- * @param data Concatenated message parts.
- * @param first_ptype First message part type in @a data.
- * @param last_ptype Last message part type in @a data.
- */
- static struct TransmitMessage *
- queue_message (struct Channel *chn,
- struct GNUNET_SERVER_Client *client,
- size_t data_size,
- const void *data,
- uint16_t first_ptype, uint16_t last_ptype)
- {
- struct TransmitMessage *
- tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
- memcpy (&tmit_msg[1], data, data_size);
- tmit_msg->client = client;
- tmit_msg->size = data_size;
- tmit_msg->state = chn->tmit_state;
- /* FIXME: separate queue per message ID */
- GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
- chn->is_master
- ? master_queue_message ((struct Master *) chn, tmit_msg,
- first_ptype, last_ptype)
- : slave_queue_message ((struct Slave *) chn, tmit_msg,
- first_ptype, last_ptype);
- return tmit_msg;
- }
- /**
- * Cancel transmission of current message.
- *
- * @param chn Channel to send to.
- * @param client Client the message originates from.
- */
- static void
- transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
- {
- uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
- struct GNUNET_MessageHeader msg;
- msg.size = htons (sizeof (msg));
- msg.type = htons (type);
- queue_message (chn, client, sizeof (msg), &msg, type, type);
- transmit_message (chn);
- /* FIXME: cleanup */
- }
- /**
- * Incoming message from a master or slave client.
- */
- static void
- client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
- {
- struct Channel *
- chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
- GNUNET_assert (NULL != chn);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Received message from client.\n", chn);
- GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
- if (GNUNET_YES != chn->is_ready)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Channel is not ready yet, disconnecting client.\n", chn);
- GNUNET_break (0);
- GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
- return;
- }
- uint16_t size = ntohs (msg->size);
- if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", chn);
- GNUNET_break (0);
- transmit_cancel (chn, client);
- GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
- return;
- }
- uint16_t first_ptype = 0, last_ptype = 0;
- if (GNUNET_SYSERR
- == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
- (const char *) &msg[1],
- &first_ptype, &last_ptype))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "%p Received invalid message part from client.\n", chn);
- GNUNET_break (0);
- transmit_cancel (chn, client);
- GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
- return;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Received message with first part type %u and last part type %u.\n",
- chn, first_ptype, last_ptype);
- queue_message (chn, client, size - sizeof (*msg), &msg[1],
- first_ptype, last_ptype);
- transmit_message (chn);
- /* FIXME: send a few ACKs even before transmit_notify is called */
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
- };
- struct MembershipStoreClosure
- {
- struct GNUNET_SERVER_Client *client;
- struct Channel *chn;
- uint64_t op_id;
- };
- /**
- * Received result of GNUNET_PSYCSTORE_membership_store()
- */
- static void
- store_recv_membership_store_result (void *cls, int64_t result, const char *err_msg)
- {
- struct MembershipStoreClosure *mcls = cls;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%s)\n",
- mcls->chn, result, err_msg);
- client_send_result (mcls->client, mcls->op_id, result, err_msg);
- }
- /**
- * Client requests to add/remove a slave in the membership database.
- */
- static void
- client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
- {
- struct Channel *
- chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
- GNUNET_assert (NULL != chn);
- const struct ChannelMembershipStoreRequest *
- req = (const struct ChannelMembershipStoreRequest *) msg;
- struct MembershipStoreClosure *mcls = GNUNET_malloc (sizeof (*mcls));
- mcls->client = client;
- mcls->chn = chn;
- mcls->op_id = req->op_id;
- uint64_t announced_at = GNUNET_ntohll (req->announced_at);
- uint64_t effective_since = GNUNET_ntohll (req->effective_since);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Received membership store request from client.\n", chn);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
- chn, req->did_join, announced_at, effective_since);
- GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key,
- req->did_join, announced_at, effective_since,
- 0, /* FIXME: group_generation */
- &store_recv_membership_store_result, mcls);
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
- }
- static int
- store_recv_fragment_history (void *cls,
- struct GNUNET_MULTICAST_MessageHeader *msg,
- enum GNUNET_PSYCSTORE_MessageFlags flags)
- {
- struct OperationClosure *opcls = cls;
- struct Channel *chn = opcls->chn;
- client_send_mcast_msg (chn, msg, GNUNET_PSYC_MESSAGE_HISTORIC);
- return GNUNET_YES;
- }
- /**
- * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
- */
- static void
- store_recv_fragment_history_result (void *cls, int64_t result, const char *err_msg)
- {
- struct OperationClosure *opcls = cls;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p History replay #%" PRIu64 ": "
- "PSYCSTORE returned %" PRId64 " (%s)\n",
- opcls->chn, opcls->op_id, result, err_msg);
- client_send_result (opcls->client, opcls->op_id, result, err_msg);
- }
- /**
- * Client requests channel history from PSYCstore.
- */
- static void
- client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
- {
- struct Channel *
- chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
- GNUNET_assert (NULL != chn);
- const struct HistoryRequest *
- req = (const struct HistoryRequest *) msg;
- struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
- opcls->client = client;
- opcls->chn = chn;
- opcls->op_id = req->op_id;
- if (0 == req->message_limit)
- GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
- GNUNET_ntohll (req->start_message_id),
- GNUNET_ntohll (req->end_message_id),
- &store_recv_fragment_history,
- &store_recv_fragment_history_result, opcls);
- else
- GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
- GNUNET_ntohll (req->message_limit),
- &store_recv_fragment_history,
- &store_recv_fragment_history_result,
- opcls);
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
- }
- /**
- * Received state var from PSYCstore, send it to client.
- */
- static int
- store_recv_state_var (void *cls, const char *name,
- const void *value, size_t value_size)
- {
- struct OperationClosure *opcls = cls;
- struct OperationResult *op;
- if (NULL != name)
- {
- uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
- struct GNUNET_PSYC_MessageModifier *mod;
- op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + name_size + value_size);
- op->header.size = htons (sizeof (*op) + sizeof (*mod) + name_size + value_size);
- op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
- op->op_id = opcls->op_id;
- mod = (struct GNUNET_PSYC_MessageModifier *) &op[1];
- mod->header.size = htons (sizeof (*mod) + name_size + value_size);
- mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
- mod->name_size = htons (name_size);
- mod->value_size = htonl (value_size);
- mod->oper = htons (GNUNET_ENV_OP_ASSIGN);
- memcpy (&mod[1], name, name_size);
- memcpy (((char *) &mod[1]) + name_size, value, value_size);
- }
- else
- {
- struct GNUNET_MessageHeader *mod;
- op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + value_size);
- op->header.size = htons (sizeof (*op) + sizeof (*mod) + value_size);
- op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
- op->op_id = opcls->op_id;
- mod = (struct GNUNET_MessageHeader *) &op[1];
- mod->size = htons (sizeof (*mod) + value_size);
- mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
- memcpy (&mod[1], value, value_size);
- }
- GNUNET_SERVER_notification_context_add (nc, opcls->client);
- GNUNET_SERVER_notification_context_unicast (nc, opcls->client, &op->header,
- GNUNET_NO);
- return GNUNET_YES;
- }
- /**
- * Received result of GNUNET_PSYCSTORE_state_get()
- * or GNUNET_PSYCSTORE_state_get_prefix()
- */
- static void
- store_recv_state_result (void *cls, int64_t result, const char *err_msg)
- {
- struct OperationClosure *opcls = cls;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p History replay #%" PRIu64 ": "
- "PSYCSTORE returned %" PRId64 " (%s)\n",
- opcls->chn, opcls->op_id, result, err_msg);
- client_send_result (opcls->client, opcls->op_id, result, err_msg);
- }
- /**
- * Client requests best matching state variable from PSYCstore.
- */
- static void
- client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
- {
- struct Channel *
- chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
- GNUNET_assert (NULL != chn);
- const struct StateRequest *
- req = (const struct StateRequest *) msg;
- uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
- const char *name = (const char *) &req[1];
- if (0 == name_size || '\0' != name[name_size - 1])
- {
- GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
- return;
- }
- struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
- opcls->client = client;
- opcls->chn = chn;
- opcls->op_id = req->op_id;
- GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
- &store_recv_state_var,
- &store_recv_state_result, opcls);
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
- }
- /**
- * Client requests state variables with a given prefix from PSYCstore.
- */
- static void
- client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
- {
- struct Channel *
- chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
- GNUNET_assert (NULL != chn);
- const struct StateRequest *
- req = (const struct StateRequest *) msg;
- uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
- const char *name = (const char *) &req[1];
- if (0 == name_size || '\0' != name[name_size - 1])
- {
- GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
- return;
- }
- struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
- opcls->client = client;
- opcls->chn = chn;
- opcls->op_id = req->op_id;
- GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
- &store_recv_state_var,
- &store_recv_state_result, opcls);
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
- }
- static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
- { &client_recv_master_start, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
- { &client_recv_slave_join, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
- { &client_recv_join_decision, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
- { &client_recv_psyc_message, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
- { &client_recv_membership_store, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
- { &client_recv_history_replay, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
- { &client_recv_state_get, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
- { &client_recv_state_get_prefix, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
- { NULL, NULL, 0, 0 }
- };
- /**
- * Initialize the PSYC service.
- *
- * @param cls Closure.
- * @param server The initialized server.
- * @param c Configuration to use.
- */
- static void
- run (void *cls, struct GNUNET_SERVER_Handle *server,
- const struct GNUNET_CONFIGURATION_Handle *c)
- {
- cfg = c;
- store = GNUNET_PSYCSTORE_connect (cfg);
- stats = GNUNET_STATISTICS_create ("psyc", cfg);
- masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
- slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
- channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
- recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
- nc = GNUNET_SERVER_notification_context_create (server, 1);
- GNUNET_SERVER_add_handlers (server, server_handlers);
- GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
- &shutdown_task, NULL);
- }
- /**
- * The main function for the service.
- *
- * @param argc number of arguments from the command line
- * @param argv command line arguments
- * @return 0 ok, 1 on error
- */
- int
- main (int argc, char *const *argv)
- {
- return (GNUNET_OK ==
- GNUNET_SERVICE_run (argc, argv, "psyc",
- GNUNET_SERVICE_OPTION_NONE,
- &run, NULL)) ? 0 : 1;
- }
- /* end of gnunet-service-psyc.c */
|