gnunet-service-psyc.c 70 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430
  1. /*
  2. * This file is part of GNUnet
  3. * (C) 2013 Christian Grothoff (and other contributing authors)
  4. *
  5. * GNUnet is free software; you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published
  7. * by the Free Software Foundation; either version 3, or (at your
  8. * option) any later version.
  9. *
  10. * GNUnet is distributed in the hope that it will be useful, but
  11. * WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  13. * General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with GNUnet; see the file COPYING. If not, write to the
  17. * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  18. * Boston, MA 02111-1307, USA.
  19. */
  20. /**
  21. * @file psyc/gnunet-service-psyc.c
  22. * @brief PSYC service
  23. * @author Gabor X Toth
  24. */
  25. #include <inttypes.h>
  26. #include "platform.h"
  27. #include "gnunet_util_lib.h"
  28. #include "gnunet_constants.h"
  29. #include "gnunet_protocols.h"
  30. #include "gnunet_statistics_service.h"
  31. #include "gnunet_multicast_service.h"
  32. #include "gnunet_psycstore_service.h"
  33. #include "gnunet_psyc_service.h"
  34. #include "gnunet_psyc_util_lib.h"
  35. #include "psyc.h"
  36. /**
  37. * Handle to our current configuration.
  38. */
  39. static const struct GNUNET_CONFIGURATION_Handle *cfg;
  40. /**
  41. * Handle to the statistics service.
  42. */
  43. static struct GNUNET_STATISTICS_Handle *stats;
  44. /**
  45. * Notification context, simplifies client broadcasts.
  46. */
  47. static struct GNUNET_SERVER_NotificationContext *nc;
  48. /**
  49. * Handle to the PSYCstore.
  50. */
  51. static struct GNUNET_PSYCSTORE_Handle *store;
  52. /**
  53. * All connected masters.
  54. * Channel's pub_key_hash -> struct Master
  55. */
  56. static struct GNUNET_CONTAINER_MultiHashMap *masters;
  57. /**
  58. * All connected slaves.
  59. * Channel's pub_key_hash -> struct Slave
  60. */
  61. static struct GNUNET_CONTAINER_MultiHashMap *slaves;
  62. /**
  63. * Connected slaves per channel.
  64. * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
  65. */
  66. static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
  67. /**
  68. * Message in the transmission queue.
  69. */
  70. struct TransmitMessage
  71. {
  72. struct TransmitMessage *prev;
  73. struct TransmitMessage *next;
  74. struct GNUNET_SERVER_Client *client;
  75. /**
  76. * ID assigned to the message.
  77. */
  78. uint64_t id;
  79. /**
  80. * Size of message.
  81. */
  82. uint16_t size;
  83. /**
  84. * @see enum MessageState
  85. */
  86. uint8_t state;
  87. /**
  88. * Whether a message ACK has already been sent to the client.
  89. * #GNUNET_YES or #GNUNET_NO
  90. */
  91. uint8_t ack_sent;
  92. /* Followed by message */
  93. };
  94. /**
  95. * Cache for received message fragments.
  96. * Message fragments are only sent to clients after all modifiers arrived.
  97. *
  98. * chan_key -> MultiHashMap chan_msgs
  99. */
  100. static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
  101. /**
  102. * Entry in the chan_msgs hashmap of @a recv_cache:
  103. * fragment_id -> RecvCacheEntry
  104. */
  105. struct RecvCacheEntry
  106. {
  107. struct GNUNET_MULTICAST_MessageHeader *mmsg;
  108. uint16_t ref_count;
  109. };
  110. /**
  111. * Entry in the @a recv_frags hash map of a @a Channel.
  112. * message_id -> FragmentQueue
  113. */
  114. struct FragmentQueue
  115. {
  116. /**
  117. * Fragment IDs stored in @a recv_cache.
  118. */
  119. struct GNUNET_CONTAINER_Heap *fragments;
  120. /**
  121. * Total size of received fragments.
  122. */
  123. uint64_t size;
  124. /**
  125. * Total size of received header fragments (METHOD & MODIFIERs)
  126. */
  127. uint64_t header_size;
  128. /**
  129. * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
  130. */
  131. uint64_t state_delta;
  132. /**
  133. * The @a flags field from struct GNUNET_PSYC_MessageMethod.
  134. */
  135. uint32_t flags;
  136. /**
  137. * Receive state of message.
  138. *
  139. * @see MessageFragmentState
  140. */
  141. uint8_t state;
  142. /**
  143. * Is the message queued for delivery to the client?
  144. * i.e. added to the recv_msgs queue
  145. */
  146. uint8_t is_queued;
  147. };
  148. /**
  149. * List of connected clients.
  150. */
  151. struct ClientListItem
  152. {
  153. struct ClientListItem *prev;
  154. struct ClientListItem *next;
  155. struct GNUNET_SERVER_Client *client;
  156. };
  157. /**
  158. * Common part of the client context for both a channel master and slave.
  159. */
  160. struct Channel
  161. {
  162. struct ClientListItem *clients_head;
  163. struct ClientListItem *clients_tail;
  164. struct TransmitMessage *tmit_head;
  165. struct TransmitMessage *tmit_tail;
  166. /**
  167. * Current PSYCstore operation.
  168. */
  169. struct GNUNET_PSYCSTORE_OperationHandle *store_op;
  170. /**
  171. * Received fragments not yet sent to the client.
  172. * message_id -> FragmentQueue
  173. */
  174. struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
  175. /**
  176. * Received message IDs not yet sent to the client.
  177. */
  178. struct GNUNET_CONTAINER_Heap *recv_msgs;
  179. /**
  180. * Public key of the channel.
  181. */
  182. struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
  183. /**
  184. * Hash of @a pub_key.
  185. */
  186. struct GNUNET_HashCode pub_key_hash;
  187. /**
  188. * Last message ID sent to the client.
  189. * 0 if there is no such message.
  190. */
  191. uint64_t max_message_id;
  192. /**
  193. * ID of the last stateful message, where the state operations has been
  194. * processed and saved to PSYCstore and which has been sent to the client.
  195. * 0 if there is no such message.
  196. */
  197. uint64_t max_state_message_id;
  198. /**
  199. * Expected value size for the modifier being received from the PSYC service.
  200. */
  201. uint32_t tmit_mod_value_size_expected;
  202. /**
  203. * Actual value size for the modifier being received from the PSYC service.
  204. */
  205. uint32_t tmit_mod_value_size;
  206. /**
  207. * @see enum MessageState
  208. */
  209. uint8_t tmit_state;
  210. /**
  211. * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
  212. */
  213. uint8_t is_master;
  214. /**
  215. * Is this channel ready to receive messages from client?
  216. * #GNUNET_YES or #GNUNET_NO
  217. */
  218. uint8_t is_ready;
  219. /**
  220. * Is the client disconnected?
  221. * #GNUNET_YES or #GNUNET_NO
  222. */
  223. uint8_t is_disconnected;
  224. };
  225. /**
  226. * Client context for a channel master.
  227. */
  228. struct Master
  229. {
  230. /**
  231. * Channel struct common for Master and Slave
  232. */
  233. struct Channel chn;
  234. /**
  235. * Private key of the channel.
  236. */
  237. struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
  238. /**
  239. * Handle for the multicast origin.
  240. */
  241. struct GNUNET_MULTICAST_Origin *origin;
  242. /**
  243. * Transmit handle for multicast.
  244. */
  245. struct GNUNET_MULTICAST_OriginTransmitHandle *tmit_handle;
  246. /**
  247. * Incoming join requests from multicast.
  248. * member_key -> struct GNUNET_MULTICAST_JoinHandle *
  249. */
  250. struct GNUNET_CONTAINER_MultiHashMap *join_reqs;
  251. /**
  252. * Last message ID transmitted to this channel.
  253. *
  254. * Incremented before sending a message, thus the message_id in messages sent
  255. * starts from 1.
  256. */
  257. uint64_t max_message_id;
  258. /**
  259. * ID of the last message with state operations transmitted to the channel.
  260. * 0 if there is no such message.
  261. */
  262. uint64_t max_state_message_id;
  263. /**
  264. * Maximum group generation transmitted to the channel.
  265. */
  266. uint64_t max_group_generation;
  267. /**
  268. * @see enum GNUNET_PSYC_Policy
  269. */
  270. enum GNUNET_PSYC_Policy policy;
  271. };
  272. /**
  273. * Client context for a channel slave.
  274. */
  275. struct Slave
  276. {
  277. /**
  278. * Channel struct common for Master and Slave
  279. */
  280. struct Channel chn;
  281. /**
  282. * Private key of the slave.
  283. */
  284. struct GNUNET_CRYPTO_EcdsaPrivateKey priv_key;
  285. /**
  286. * Public key of the slave.
  287. */
  288. struct GNUNET_CRYPTO_EcdsaPublicKey pub_key;
  289. /**
  290. * Hash of @a pub_key.
  291. */
  292. struct GNUNET_HashCode pub_key_hash;
  293. /**
  294. * Handle for the multicast member.
  295. */
  296. struct GNUNET_MULTICAST_Member *member;
  297. /**
  298. * Transmit handle for multicast.
  299. */
  300. struct GNUNET_MULTICAST_MemberTransmitHandle *tmit_handle;
  301. /**
  302. * Peer identity of the origin.
  303. */
  304. struct GNUNET_PeerIdentity origin;
  305. /**
  306. * Number of items in @a relays.
  307. */
  308. uint32_t relay_count;
  309. /**
  310. * Relays that multicast can use to connect.
  311. */
  312. struct GNUNET_PeerIdentity *relays;
  313. /**
  314. * Join request to be transmitted to the master on join.
  315. */
  316. struct GNUNET_PSYC_Message *join_msg;
  317. /**
  318. * Join decision received from multicast.
  319. */
  320. struct GNUNET_PSYC_JoinDecisionMessage *join_dcsn;
  321. /**
  322. * Maximum request ID for this channel.
  323. */
  324. uint64_t max_request_id;
  325. };
  326. struct OperationClosure
  327. {
  328. struct GNUNET_SERVER_Client *client;
  329. struct Channel *chn;
  330. uint64_t op_id;
  331. };
  332. static void
  333. transmit_message (struct Channel *chn);
  334. static uint64_t
  335. message_queue_drop (struct Channel *chn);
  336. /**
  337. * Task run during shutdown.
  338. *
  339. * @param cls unused
  340. * @param tc unused
  341. */
  342. static void
  343. shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  344. {
  345. if (NULL != nc)
  346. {
  347. GNUNET_SERVER_notification_context_destroy (nc);
  348. nc = NULL;
  349. }
  350. if (NULL != stats)
  351. {
  352. GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
  353. stats = NULL;
  354. }
  355. }
  356. /**
  357. * Clean up master data structures after a client disconnected.
  358. */
  359. static void
  360. cleanup_master (struct Master *mst)
  361. {
  362. struct Channel *chn = &mst->chn;
  363. if (NULL != mst->origin)
  364. GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
  365. GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
  366. GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn);
  367. }
  368. /**
  369. * Clean up slave data structures after a client disconnected.
  370. */
  371. static void
  372. cleanup_slave (struct Slave *slv)
  373. {
  374. struct Channel *chn = &slv->chn;
  375. struct GNUNET_CONTAINER_MultiHashMap *
  376. chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
  377. &chn->pub_key_hash);
  378. GNUNET_assert (NULL != chn_slv);
  379. GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
  380. if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
  381. {
  382. GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
  383. chn_slv);
  384. GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
  385. }
  386. GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
  387. if (NULL != slv->join_msg)
  388. {
  389. GNUNET_free (slv->join_msg);
  390. slv->join_msg = NULL;
  391. }
  392. if (NULL != slv->relays)
  393. {
  394. GNUNET_free (slv->relays);
  395. slv->relays = NULL;
  396. }
  397. if (NULL != slv->member)
  398. {
  399. GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
  400. slv->member = NULL;
  401. }
  402. GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn);
  403. }
  404. /**
  405. * Clean up channel data structures after a client disconnected.
  406. */
  407. static void
  408. cleanup_channel (struct Channel *chn)
  409. {
  410. message_queue_drop (chn);
  411. GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash);
  412. if (NULL != chn->store_op)
  413. {
  414. GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
  415. chn->store_op = NULL;
  416. }
  417. (GNUNET_YES == chn->is_master)
  418. ? cleanup_master ((struct Master *) chn)
  419. : cleanup_slave ((struct Slave *) chn);
  420. GNUNET_free (chn);
  421. }
  422. /**
  423. * Called whenever a client is disconnected.
  424. * Frees our resources associated with that client.
  425. *
  426. * @param cls Closure.
  427. * @param client Identification of the client.
  428. */
  429. static void
  430. client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
  431. {
  432. if (NULL == client)
  433. return;
  434. struct Channel *
  435. chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
  436. if (NULL == chn)
  437. {
  438. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  439. "%p User context is NULL in client_disconnect()\n", chn);
  440. GNUNET_break (0);
  441. return;
  442. }
  443. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  444. "%p Client (%s) disconnected from channel %s\n",
  445. chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
  446. GNUNET_h2s (&chn->pub_key_hash));
  447. struct ClientListItem *cli = chn->clients_head;
  448. while (NULL != cli)
  449. {
  450. if (cli->client == client)
  451. {
  452. GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
  453. GNUNET_free (cli);
  454. break;
  455. }
  456. cli = cli->next;
  457. }
  458. if (NULL == chn->clients_head)
  459. { /* Last client disconnected. */
  460. if (NULL != chn->tmit_head)
  461. { /* Send pending messages to multicast before cleanup. */
  462. transmit_message (chn);
  463. }
  464. else
  465. {
  466. cleanup_channel (chn);
  467. }
  468. }
  469. }
  470. /**
  471. * Send message to all clients connected to the channel.
  472. */
  473. static void
  474. client_send_msg (const struct Channel *chn,
  475. const struct GNUNET_MessageHeader *msg)
  476. {
  477. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  478. "%p Sending message to clients.\n", chn);
  479. struct ClientListItem *cli = chn->clients_head;
  480. while (NULL != cli)
  481. {
  482. GNUNET_SERVER_notification_context_add (nc, cli->client);
  483. GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO);
  484. cli = cli->next;
  485. }
  486. }
  487. /**
  488. * Send a result code back to the client.
  489. *
  490. * @param client
  491. * Client that should receive the result code.
  492. * @param result_code
  493. * Code to transmit.
  494. * @param op_id
  495. * Operation ID in network byte order.
  496. * @param err_msg
  497. * Error message to include (or NULL for none).
  498. */
  499. static void
  500. client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
  501. int64_t result_code, const char *err_msg)
  502. {
  503. struct OperationResult *res;
  504. size_t err_size = 0;
  505. if (NULL != err_msg)
  506. err_size = strnlen (err_msg,
  507. GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (*res)) + 1;
  508. res = GNUNET_malloc (sizeof (struct OperationResult) + err_size);
  509. res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
  510. res->header.size = htons (sizeof (struct OperationResult) + err_size);
  511. res->result_code = GNUNET_htonll (result_code + INT64_MAX + 1);
  512. res->op_id = op_id;
  513. if (0 < err_size)
  514. {
  515. memcpy (&res[1], err_msg, err_size);
  516. ((char *) &res[1])[err_size - 1] = '\0';
  517. }
  518. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  519. "%p Sending result to client for operation #%" PRIu64 ": "
  520. "%" PRId64 " (%s)\n",
  521. client, GNUNET_ntohll (op_id), result_code, err_msg);
  522. GNUNET_SERVER_notification_context_add (nc, client);
  523. GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
  524. GNUNET_NO);
  525. GNUNET_free (res);
  526. }
  527. /**
  528. * Closure for join_mem_test_cb()
  529. */
  530. struct JoinMemTestClosure
  531. {
  532. struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
  533. struct Channel *chn;
  534. struct GNUNET_MULTICAST_JoinHandle *jh;
  535. struct GNUNET_PSYC_JoinRequestMessage *join_msg;
  536. };
  537. /**
  538. * Membership test result callback used for join requests.
  539. */
  540. static void
  541. join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
  542. {
  543. struct JoinMemTestClosure *jcls = cls;
  544. if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
  545. { /* Pass on join request to client if this is a master channel */
  546. struct Master *mst = (struct Master *) jcls->chn;
  547. struct GNUNET_HashCode slave_key_hash;
  548. GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
  549. &slave_key_hash);
  550. GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
  551. GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
  552. client_send_msg (jcls->chn, &jcls->join_msg->header);
  553. }
  554. else
  555. {
  556. // FIXME: add relays
  557. GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
  558. }
  559. GNUNET_free (jcls->join_msg);
  560. GNUNET_free (jcls);
  561. }
  562. /**
  563. * Incoming join request from multicast.
  564. */
  565. static void
  566. mcast_recv_join_request (void *cls,
  567. const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
  568. const struct GNUNET_MessageHeader *join_msg,
  569. struct GNUNET_MULTICAST_JoinHandle *jh)
  570. {
  571. struct Channel *chn = cls;
  572. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn);
  573. uint16_t join_msg_size = 0;
  574. if (NULL != join_msg)
  575. {
  576. if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE == ntohs (join_msg->type))
  577. {
  578. join_msg_size = ntohs (join_msg->size);
  579. }
  580. else
  581. {
  582. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  583. "%p Got join message with invalid type %u.\n",
  584. chn, ntohs (join_msg->type));
  585. }
  586. }
  587. struct GNUNET_PSYC_JoinRequestMessage *
  588. req = GNUNET_malloc (sizeof (*req) + join_msg_size);
  589. req->header.size = htons (sizeof (*req) + join_msg_size);
  590. req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
  591. req->slave_key = *slave_key;
  592. if (0 < join_msg_size)
  593. memcpy (&req[1], join_msg, join_msg_size);
  594. struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
  595. jcls->slave_key = *slave_key;
  596. jcls->chn = chn;
  597. jcls->jh = jh;
  598. jcls->join_msg = req;
  599. GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
  600. chn->max_message_id, 0,
  601. &join_mem_test_cb, jcls);
  602. }
  603. /**
  604. * Join decision received from multicast.
  605. */
  606. static void
  607. mcast_recv_join_decision (void *cls, int is_admitted,
  608. const struct GNUNET_PeerIdentity *peer,
  609. uint16_t relay_count,
  610. const struct GNUNET_PeerIdentity *relays,
  611. const struct GNUNET_MessageHeader *join_resp)
  612. {
  613. struct Slave *slv = cls;
  614. struct Channel *chn = &slv->chn;
  615. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  616. "%p Got join decision: %d\n", slv, is_admitted);
  617. uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
  618. struct GNUNET_PSYC_JoinDecisionMessage *
  619. dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
  620. dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
  621. dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
  622. dcsn->is_admitted = htonl (is_admitted);
  623. if (0 < join_resp_size)
  624. memcpy (&dcsn[1], join_resp, join_resp_size);
  625. client_send_msg (chn, &dcsn->header);
  626. if (GNUNET_YES == is_admitted)
  627. {
  628. chn->is_ready = GNUNET_YES;
  629. }
  630. else
  631. {
  632. slv->member = NULL;
  633. }
  634. }
  635. /**
  636. * Received result of GNUNET_PSYCSTORE_membership_test()
  637. */
  638. static void
  639. store_recv_membership_test_result (void *cls, int64_t result, const char *err_msg)
  640. {
  641. struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls;
  642. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  643. "%p GNUNET_PSYCSTORE_membership_test() returned %" PRId64 " (%s)\n",
  644. mth, result, err_msg);
  645. GNUNET_MULTICAST_membership_test_result (mth, result);
  646. }
  647. /**
  648. * Incoming membership test request from multicast.
  649. */
  650. static void
  651. mcast_recv_membership_test (void *cls,
  652. const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
  653. uint64_t message_id, uint64_t group_generation,
  654. struct GNUNET_MULTICAST_MembershipTestHandle *mth)
  655. {
  656. struct Channel *chn = cls;
  657. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  658. "%p Received membership test request from multicast.\n",
  659. mth);
  660. GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
  661. message_id, group_generation,
  662. &store_recv_membership_test_result, mth);
  663. }
  664. static int
  665. store_recv_fragment_replay (void *cls,
  666. struct GNUNET_MULTICAST_MessageHeader *msg,
  667. enum GNUNET_PSYCSTORE_MessageFlags flags)
  668. {
  669. struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
  670. GNUNET_MULTICAST_replay_response (rh, &msg->header, GNUNET_MULTICAST_REC_OK);
  671. return GNUNET_YES;
  672. }
  673. /**
  674. * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
  675. */
  676. static void
  677. store_recv_fragment_replay_result (void *cls, int64_t result, const char *err_msg)
  678. {
  679. struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
  680. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  681. "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%s)\n",
  682. rh, result, err_msg);
  683. switch (result)
  684. {
  685. case GNUNET_YES:
  686. break;
  687. case GNUNET_NO:
  688. GNUNET_MULTICAST_replay_response (rh, NULL,
  689. GNUNET_MULTICAST_REC_NOT_FOUND);
  690. break;
  691. case GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED:
  692. GNUNET_MULTICAST_replay_response (rh, NULL,
  693. GNUNET_MULTICAST_REC_ACCESS_DENIED);
  694. break;
  695. case GNUNET_SYSERR:
  696. GNUNET_MULTICAST_replay_response (rh, NULL,
  697. GNUNET_MULTICAST_REC_INTERNAL_ERROR);
  698. break;
  699. }
  700. GNUNET_MULTICAST_replay_response_end (rh);
  701. }
  702. /**
  703. * Incoming fragment replay request from multicast.
  704. */
  705. static void
  706. mcast_recv_replay_fragment (void *cls,
  707. const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
  708. uint64_t fragment_id, uint64_t flags,
  709. struct GNUNET_MULTICAST_ReplayHandle *rh)
  710. {
  711. struct Channel *chn = cls;
  712. GNUNET_PSYCSTORE_fragment_get (store, &chn->pub_key, slave_key,
  713. fragment_id, fragment_id,
  714. &store_recv_fragment_replay,
  715. &store_recv_fragment_replay_result, rh);
  716. }
  717. /**
  718. * Incoming message replay request from multicast.
  719. */
  720. static void
  721. mcast_recv_replay_message (void *cls,
  722. const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
  723. uint64_t message_id,
  724. uint64_t fragment_offset,
  725. uint64_t flags,
  726. struct GNUNET_MULTICAST_ReplayHandle *rh)
  727. {
  728. struct Channel *chn = cls;
  729. GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key,
  730. message_id, message_id,
  731. &store_recv_fragment_replay,
  732. &store_recv_fragment_replay_result, rh);
  733. }
  734. /**
  735. * Convert an uint64_t in network byte order to a HashCode
  736. * that can be used as key in a MultiHashMap
  737. */
  738. static inline void
  739. hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
  740. {
  741. /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */
  742. /* TODO: use built-in byte swap functions if available */
  743. n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL);
  744. n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL);
  745. *key = (struct GNUNET_HashCode) {};
  746. *((uint64_t *) key)
  747. = (n << 32) | (n >> 32);
  748. }
  749. /**
  750. * Convert an uint64_t in host byte order to a HashCode
  751. * that can be used as key in a MultiHashMap
  752. */
  753. static inline void
  754. hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
  755. {
  756. #if __BYTE_ORDER == __BIG_ENDIAN
  757. hash_key_from_nll (key, n);
  758. #elif __BYTE_ORDER == __LITTLE_ENDIAN
  759. *key = (struct GNUNET_HashCode) {};
  760. *((uint64_t *) key) = n;
  761. #else
  762. #error byteorder undefined
  763. #endif
  764. }
  765. /**
  766. * Send multicast message to all clients connected to the channel.
  767. */
  768. static void
  769. client_send_mcast_msg (struct Channel *chn,
  770. const struct GNUNET_MULTICAST_MessageHeader *mmsg,
  771. uint32_t flags)
  772. {
  773. struct GNUNET_PSYC_MessageHeader *pmsg;
  774. uint16_t size = ntohs (mmsg->header.size);
  775. uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
  776. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  777. "%p Sending multicast message to client. "
  778. "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
  779. chn, GNUNET_ntohll (mmsg->fragment_id),
  780. GNUNET_ntohll (mmsg->message_id));
  781. pmsg = GNUNET_malloc (psize);
  782. pmsg->header.size = htons (psize);
  783. pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
  784. pmsg->message_id = mmsg->message_id;
  785. pmsg->fragment_offset = mmsg->fragment_offset;
  786. pmsg->flags = htonl (flags);
  787. memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
  788. client_send_msg (chn, &pmsg->header);
  789. GNUNET_free (pmsg);
  790. }
  791. /**
  792. * Send multicast request to all clients connected to the channel.
  793. */
  794. static void
  795. client_send_mcast_req (struct Master *mst,
  796. const struct GNUNET_MULTICAST_RequestHeader *req)
  797. {
  798. struct Channel *chn = &mst->chn;
  799. struct GNUNET_PSYC_MessageHeader *pmsg;
  800. uint16_t size = ntohs (req->header.size);
  801. uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
  802. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  803. "%p Sending multicast request to client. "
  804. "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
  805. chn, GNUNET_ntohll (req->fragment_id),
  806. GNUNET_ntohll (req->request_id));
  807. pmsg = GNUNET_malloc (psize);
  808. pmsg->header.size = htons (psize);
  809. pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
  810. pmsg->message_id = req->request_id;
  811. pmsg->fragment_offset = req->fragment_offset;
  812. pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
  813. memcpy (&pmsg[1], &req[1], size - sizeof (*req));
  814. client_send_msg (chn, &pmsg->header);
  815. GNUNET_free (pmsg);
  816. }
  817. /**
  818. * Insert a multicast message fragment into the queue belonging to the message.
  819. *
  820. * @param chn Channel.
  821. * @param mmsg Multicast message fragment.
  822. * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
  823. * @param first_ptype First PSYC message part type in @a mmsg.
  824. * @param last_ptype Last PSYC message part type in @a mmsg.
  825. */
  826. static void
  827. fragment_queue_insert (struct Channel *chn,
  828. const struct GNUNET_MULTICAST_MessageHeader *mmsg,
  829. uint16_t first_ptype, uint16_t last_ptype)
  830. {
  831. const uint16_t size = ntohs (mmsg->header.size);
  832. const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
  833. struct GNUNET_CONTAINER_MultiHashMap
  834. *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
  835. &chn->pub_key_hash);
  836. struct GNUNET_HashCode msg_id_hash;
  837. hash_key_from_nll (&msg_id_hash, mmsg->message_id);
  838. struct FragmentQueue
  839. *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
  840. if (NULL == fragq)
  841. {
  842. fragq = GNUNET_new (struct FragmentQueue);
  843. fragq->state = MSG_FRAG_STATE_HEADER;
  844. fragq->fragments
  845. = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
  846. GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
  847. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
  848. if (NULL == chan_msgs)
  849. {
  850. chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
  851. GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs,
  852. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
  853. }
  854. }
  855. struct GNUNET_HashCode frag_id_hash;
  856. hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
  857. struct RecvCacheEntry
  858. *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
  859. if (NULL == cache_entry)
  860. {
  861. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  862. "%p Adding message fragment to cache. "
  863. "message_id: %" PRIu64 ", fragment_id: %" PRIu64 "\n",
  864. chn, GNUNET_ntohll (mmsg->message_id),
  865. GNUNET_ntohll (mmsg->fragment_id));
  866. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  867. "%p header_size: %" PRIu64 " + %u\n",
  868. chn, fragq->header_size, size);
  869. cache_entry = GNUNET_new (struct RecvCacheEntry);
  870. cache_entry->ref_count = 1;
  871. cache_entry->mmsg = GNUNET_malloc (size);
  872. memcpy (cache_entry->mmsg, mmsg, size);
  873. GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
  874. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
  875. }
  876. else
  877. {
  878. cache_entry->ref_count++;
  879. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  880. "%p Message fragment is already in cache. "
  881. "message_id: %" PRIu64 ", fragment_id: %" PRIu64
  882. ", ref_count: %u\n",
  883. chn, GNUNET_ntohll (mmsg->message_id),
  884. GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
  885. }
  886. if (MSG_FRAG_STATE_HEADER == fragq->state)
  887. {
  888. if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
  889. {
  890. struct GNUNET_PSYC_MessageMethod *
  891. pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
  892. fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
  893. fragq->flags = ntohl (pmeth->flags);
  894. }
  895. if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
  896. {
  897. fragq->header_size += size;
  898. }
  899. else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
  900. || frag_offset == fragq->header_size)
  901. { /* header is now complete */
  902. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  903. "%p Header of message %" PRIu64 " is complete.\n",
  904. chn, GNUNET_ntohll (mmsg->message_id));
  905. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  906. "%p Adding message %" PRIu64 " to queue.\n",
  907. chn, GNUNET_ntohll (mmsg->message_id));
  908. fragq->state = MSG_FRAG_STATE_DATA;
  909. }
  910. else
  911. {
  912. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  913. "%p Header of message %" PRIu64 " is NOT complete yet: "
  914. "%" PRIu64 " != %" PRIu64 "\n",
  915. chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
  916. fragq->header_size);
  917. }
  918. }
  919. switch (last_ptype)
  920. {
  921. case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
  922. if (frag_offset == fragq->size)
  923. fragq->state = MSG_FRAG_STATE_END;
  924. else
  925. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  926. "%p Message %" PRIu64 " is NOT complete yet: "
  927. "%" PRIu64 " != %" PRIu64 "\n",
  928. chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
  929. fragq->size);
  930. break;
  931. case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
  932. /* Drop message without delivering to client if it's a single fragment */
  933. fragq->state =
  934. (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
  935. ? MSG_FRAG_STATE_DROP
  936. : MSG_FRAG_STATE_CANCEL;
  937. }
  938. switch (fragq->state)
  939. {
  940. case MSG_FRAG_STATE_DATA:
  941. case MSG_FRAG_STATE_END:
  942. case MSG_FRAG_STATE_CANCEL:
  943. if (GNUNET_NO == fragq->is_queued)
  944. {
  945. GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
  946. GNUNET_ntohll (mmsg->message_id));
  947. fragq->is_queued = GNUNET_YES;
  948. }
  949. }
  950. fragq->size += size;
  951. GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
  952. GNUNET_ntohll (mmsg->fragment_id));
  953. }
  954. /**
  955. * Run fragment queue of a message.
  956. *
  957. * Send fragments of a message in order to client, after all modifiers arrived
  958. * from multicast.
  959. *
  960. * @param chn Channel.
  961. * @param msg_id ID of the message @a fragq belongs to.
  962. * @param fragq Fragment queue of the message.
  963. * @param drop Drop message without delivering to client?
  964. * #GNUNET_YES or #GNUNET_NO.
  965. */
  966. static void
  967. fragment_queue_run (struct Channel *chn, uint64_t msg_id,
  968. struct FragmentQueue *fragq, uint8_t drop)
  969. {
  970. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  971. "%p Running message fragment queue for message %" PRIu64
  972. " (state: %u).\n",
  973. chn, msg_id, fragq->state);
  974. struct GNUNET_CONTAINER_MultiHashMap
  975. *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
  976. &chn->pub_key_hash);
  977. GNUNET_assert (NULL != chan_msgs);
  978. uint64_t frag_id;
  979. while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
  980. &frag_id))
  981. {
  982. struct GNUNET_HashCode frag_id_hash;
  983. hash_key_from_hll (&frag_id_hash, frag_id);
  984. struct RecvCacheEntry *cache_entry
  985. = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
  986. if (cache_entry != NULL)
  987. {
  988. if (GNUNET_NO == drop)
  989. {
  990. client_send_mcast_msg (chn, cache_entry->mmsg, 0);
  991. }
  992. if (cache_entry->ref_count <= 1)
  993. {
  994. GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
  995. cache_entry);
  996. GNUNET_free (cache_entry->mmsg);
  997. GNUNET_free (cache_entry);
  998. }
  999. else
  1000. {
  1001. cache_entry->ref_count--;
  1002. }
  1003. }
  1004. #if CACHE_AGING_IMPLEMENTED
  1005. else if (GNUNET_NO == drop)
  1006. {
  1007. /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
  1008. }
  1009. #endif
  1010. GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
  1011. }
  1012. if (MSG_FRAG_STATE_END <= fragq->state)
  1013. {
  1014. struct GNUNET_HashCode msg_id_hash;
  1015. hash_key_from_hll (&msg_id_hash, msg_id);
  1016. GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq);
  1017. GNUNET_CONTAINER_heap_destroy (fragq->fragments);
  1018. GNUNET_free (fragq);
  1019. }
  1020. else
  1021. {
  1022. fragq->is_queued = GNUNET_NO;
  1023. }
  1024. }
  1025. /**
  1026. * Run message queue.
  1027. *
  1028. * Send messages in queue to client in order after a message has arrived from
  1029. * multicast, according to the following:
  1030. * - A message is only sent if all of its modifiers arrived.
  1031. * - A stateful message is only sent if the previous stateful message
  1032. * has already been delivered to the client.
  1033. *
  1034. * @param chn Channel.
  1035. *
  1036. * @return Number of messages removed from queue and sent to client.
  1037. */
  1038. static uint64_t
  1039. message_queue_run (struct Channel *chn)
  1040. {
  1041. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  1042. "%p Running message queue.\n", chn);
  1043. uint64_t n = 0;
  1044. uint64_t msg_id;
  1045. while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
  1046. &msg_id))
  1047. {
  1048. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  1049. "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
  1050. struct GNUNET_HashCode msg_id_hash;
  1051. hash_key_from_hll (&msg_id_hash, msg_id);
  1052. struct FragmentQueue *
  1053. fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
  1054. if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
  1055. {
  1056. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  1057. "%p No fragq (%p) or header not complete.\n",
  1058. chn, fragq);
  1059. break;
  1060. }
  1061. if (MSG_FRAG_STATE_HEADER == fragq->state)
  1062. {
  1063. /* Check if there's a missing message before the current one */
  1064. if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
  1065. {
  1066. if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
  1067. && msg_id - 1 != chn->max_message_id)
  1068. {
  1069. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  1070. "%p Out of order message. "
  1071. "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
  1072. chn, msg_id, chn->max_message_id);
  1073. break;
  1074. }
  1075. }
  1076. else
  1077. {
  1078. if (msg_id - fragq->state_delta != chn->max_state_message_id)
  1079. {
  1080. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  1081. "%p Out of order stateful message. "
  1082. "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
  1083. chn, msg_id, fragq->state_delta, chn->max_state_message_id);
  1084. break;
  1085. }
  1086. #if TODO
  1087. /* FIXME: apply modifiers to state in PSYCstore */
  1088. GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, message_id,
  1089. store_recv_state_modify_result, cls);
  1090. #endif
  1091. chn->max_state_message_id = msg_id;
  1092. }
  1093. chn->max_message_id = msg_id;
  1094. }
  1095. fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
  1096. GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
  1097. n++;
  1098. }
  1099. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1100. "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
  1101. return n;
  1102. }
  1103. /**
  1104. * Drop message queue of a channel.
  1105. *
  1106. * Remove all messages in queue without sending it to clients.
  1107. *
  1108. * @param chn Channel.
  1109. *
  1110. * @return Number of messages removed from queue.
  1111. */
  1112. static uint64_t
  1113. message_queue_drop (struct Channel *chn)
  1114. {
  1115. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  1116. "%p Dropping message queue.\n", chn);
  1117. uint64_t n = 0;
  1118. uint64_t msg_id;
  1119. while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
  1120. &msg_id))
  1121. {
  1122. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  1123. "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
  1124. struct GNUNET_HashCode msg_id_hash;
  1125. hash_key_from_hll (&msg_id_hash, msg_id);
  1126. struct FragmentQueue *
  1127. fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
  1128. fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
  1129. GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
  1130. n++;
  1131. }
  1132. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1133. "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
  1134. return n;
  1135. }
  1136. /**
  1137. * Received result of GNUNET_PSYCSTORE_fragment_store().
  1138. */
  1139. static void
  1140. store_recv_fragment_store_result (void *cls, int64_t result, const char *err_msg)
  1141. {
  1142. struct Channel *chn = cls;
  1143. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1144. "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%s)\n",
  1145. chn, result, err_msg);
  1146. }
  1147. /**
  1148. * Handle incoming message fragment from multicast.
  1149. *
  1150. * Store it using PSYCstore and send it to the clients of the channel in order.
  1151. */
  1152. static void
  1153. mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg)
  1154. {
  1155. struct Channel *chn = cls;
  1156. uint16_t size = ntohs (mmsg->header.size);
  1157. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1158. "%p Received multicast message of size %u.\n",
  1159. chn, size);
  1160. GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
  1161. &store_recv_fragment_store_result, chn);
  1162. uint16_t first_ptype = 0, last_ptype = 0;
  1163. if (GNUNET_SYSERR
  1164. == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
  1165. (const char *) &mmsg[1],
  1166. &first_ptype, &last_ptype))
  1167. {
  1168. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  1169. "%p Dropping incoming multicast message with invalid parts.\n",
  1170. chn);
  1171. GNUNET_break_op (0);
  1172. return;
  1173. }
  1174. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1175. "Message parts: first: type %u, last: type %u\n",
  1176. first_ptype, last_ptype);
  1177. fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
  1178. message_queue_run (chn);
  1179. }
  1180. /**
  1181. * Incoming request fragment from multicast for a master.
  1182. *
  1183. * @param cls Master.
  1184. * @param req The request.
  1185. */
  1186. static void
  1187. mcast_recv_request (void *cls,
  1188. const struct GNUNET_MULTICAST_RequestHeader *req)
  1189. {
  1190. struct Master *mst = cls;
  1191. uint16_t size = ntohs (req->header.size);
  1192. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1193. "%p Received multicast request of size %u.\n",
  1194. mst, size);
  1195. uint16_t first_ptype = 0, last_ptype = 0;
  1196. if (GNUNET_SYSERR
  1197. == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
  1198. (const char *) &req[1],
  1199. &first_ptype, &last_ptype))
  1200. {
  1201. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  1202. "%p Dropping incoming multicast request with invalid parts.\n",
  1203. mst);
  1204. GNUNET_break_op (0);
  1205. return;
  1206. }
  1207. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1208. "Message parts: first: type %u, last: type %u\n",
  1209. first_ptype, last_ptype);
  1210. /* FIXME: in-order delivery */
  1211. client_send_mcast_req (mst, req);
  1212. }
  1213. /**
  1214. * Response from PSYCstore with the current counter values for a channel master.
  1215. */
  1216. static void
  1217. store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
  1218. uint64_t max_message_id, uint64_t max_group_generation,
  1219. uint64_t max_state_message_id)
  1220. {
  1221. struct Master *mst = cls;
  1222. struct Channel *chn = &mst->chn;
  1223. chn->store_op = NULL;
  1224. struct GNUNET_PSYC_CountersResultMessage res;
  1225. res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
  1226. res.header.size = htons (sizeof (res));
  1227. res.result_code = htonl (result - INT32_MIN);
  1228. res.max_message_id = GNUNET_htonll (max_message_id);
  1229. if (GNUNET_OK == result || GNUNET_NO == result)
  1230. {
  1231. mst->max_message_id = max_message_id;
  1232. chn->max_message_id = max_message_id;
  1233. chn->max_state_message_id = max_state_message_id;
  1234. mst->max_group_generation = max_group_generation;
  1235. mst->origin
  1236. = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
  1237. &mcast_recv_join_request,
  1238. &mcast_recv_membership_test,
  1239. &mcast_recv_replay_fragment,
  1240. &mcast_recv_replay_message,
  1241. &mcast_recv_request,
  1242. &mcast_recv_message, chn);
  1243. chn->is_ready = GNUNET_YES;
  1244. }
  1245. else
  1246. {
  1247. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  1248. "%p GNUNET_PSYCSTORE_counters_get() "
  1249. "returned %d for channel %s.\n",
  1250. chn, result, GNUNET_h2s (&chn->pub_key_hash));
  1251. }
  1252. client_send_msg (chn, &res.header);
  1253. }
  1254. /**
  1255. * Response from PSYCstore with the current counter values for a channel slave.
  1256. */
  1257. void
  1258. store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
  1259. uint64_t max_message_id, uint64_t max_group_generation,
  1260. uint64_t max_state_message_id)
  1261. {
  1262. struct Slave *slv = cls;
  1263. struct Channel *chn = &slv->chn;
  1264. chn->store_op = NULL;
  1265. struct GNUNET_PSYC_CountersResultMessage res;
  1266. res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
  1267. res.header.size = htons (sizeof (res));
  1268. res.result_code = htonl (result - INT32_MIN);
  1269. res.max_message_id = GNUNET_htonll (max_message_id);
  1270. if (GNUNET_OK == result || GNUNET_NO == result)
  1271. {
  1272. chn->max_message_id = max_message_id;
  1273. chn->max_state_message_id = max_state_message_id;
  1274. slv->member
  1275. = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
  1276. &slv->origin,
  1277. slv->relay_count, slv->relays,
  1278. &slv->join_msg->header,
  1279. &mcast_recv_join_request,
  1280. &mcast_recv_join_decision,
  1281. &mcast_recv_membership_test,
  1282. &mcast_recv_replay_fragment,
  1283. &mcast_recv_replay_message,
  1284. &mcast_recv_message, chn);
  1285. if (NULL != slv->join_msg)
  1286. {
  1287. GNUNET_free (slv->join_msg);
  1288. slv->join_msg = NULL;
  1289. }
  1290. }
  1291. else
  1292. {
  1293. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  1294. "%p GNUNET_PSYCSTORE_counters_get() "
  1295. "returned %d for channel %s.\n",
  1296. chn, result, GNUNET_h2s (&chn->pub_key_hash));
  1297. }
  1298. client_send_msg (chn, &res.header);
  1299. }
  1300. static void
  1301. channel_init (struct Channel *chn)
  1302. {
  1303. chn->recv_msgs
  1304. = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
  1305. chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
  1306. }
  1307. /**
  1308. * Handle a connecting client starting a channel master.
  1309. */
  1310. static void
  1311. client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
  1312. const struct GNUNET_MessageHeader *msg)
  1313. {
  1314. const struct MasterStartRequest *req
  1315. = (const struct MasterStartRequest *) msg;
  1316. struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
  1317. struct GNUNET_HashCode pub_key_hash;
  1318. GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
  1319. GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
  1320. struct Master *
  1321. mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
  1322. struct Channel *chn;
  1323. if (NULL == mst)
  1324. {
  1325. mst = GNUNET_new (struct Master);
  1326. mst->policy = ntohl (req->policy);
  1327. mst->priv_key = req->channel_key;
  1328. mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
  1329. chn = &mst->chn;
  1330. chn->is_master = GNUNET_YES;
  1331. chn->pub_key = pub_key;
  1332. chn->pub_key_hash = pub_key_hash;
  1333. channel_init (chn);
  1334. GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
  1335. GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
  1336. chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
  1337. store_recv_master_counters, mst);
  1338. }
  1339. else
  1340. {
  1341. chn = &mst->chn;
  1342. struct GNUNET_PSYC_CountersResultMessage res;
  1343. res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
  1344. res.header.size = htons (sizeof (res));
  1345. res.result_code = htonl ((uint32_t) GNUNET_OK + INT32_MIN);
  1346. res.max_message_id = GNUNET_htonll (mst->max_message_id);
  1347. GNUNET_SERVER_notification_context_add (nc, client);
  1348. GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
  1349. GNUNET_NO);
  1350. }
  1351. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1352. "%p Client connected as master to channel %s.\n",
  1353. mst, GNUNET_h2s (&chn->pub_key_hash));
  1354. struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
  1355. cli->client = client;
  1356. GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
  1357. GNUNET_SERVER_client_set_user_context (client, chn);
  1358. GNUNET_SERVER_receive_done (client, GNUNET_OK);
  1359. }
  1360. /**
  1361. * Handle a connecting client joining as a channel slave.
  1362. */
  1363. static void
  1364. client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
  1365. const struct GNUNET_MessageHeader *msg)
  1366. {
  1367. const struct SlaveJoinRequest *req
  1368. = (const struct SlaveJoinRequest *) msg;
  1369. uint16_t req_size = ntohs (req->header.size);
  1370. struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
  1371. struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
  1372. GNUNET_CRYPTO_ecdsa_key_get_public (&req->slave_key, &slv_pub_key);
  1373. GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
  1374. GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash);
  1375. struct GNUNET_CONTAINER_MultiHashMap *
  1376. chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
  1377. struct Slave *slv = NULL;
  1378. struct Channel *chn;
  1379. if (NULL != chn_slv)
  1380. {
  1381. slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_key_hash);
  1382. }
  1383. if (NULL == slv)
  1384. {
  1385. slv = GNUNET_new (struct Slave);
  1386. slv->priv_key = req->slave_key;
  1387. slv->pub_key = slv_pub_key;
  1388. slv->pub_key_hash = slv_pub_key_hash;
  1389. slv->origin = req->origin;
  1390. slv->relay_count = ntohl (req->relay_count);
  1391. const struct GNUNET_PeerIdentity *
  1392. relays = (const struct GNUNET_PeerIdentity *) &req[1];
  1393. uint16_t relay_size = slv->relay_count * sizeof (*relays);
  1394. uint16_t join_msg_size = 0;
  1395. if (sizeof (*req) + relay_size + sizeof (struct GNUNET_MessageHeader)
  1396. <= req_size)
  1397. {
  1398. join_msg_size = ntohs (slv->join_msg->header.size);
  1399. slv->join_msg = GNUNET_malloc (join_msg_size);
  1400. memcpy (slv->join_msg, ((char *) &req[1]) + relay_size, join_msg_size);
  1401. }
  1402. if (sizeof (*req) + relay_size + join_msg_size != req_size)
  1403. {
  1404. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  1405. "%u + %u + %u != %u\n",
  1406. sizeof (*req), relay_size, join_msg_size, req_size);
  1407. GNUNET_break (0);
  1408. GNUNET_SERVER_client_disconnect (client);
  1409. return;
  1410. }
  1411. if (0 < slv->relay_count)
  1412. {
  1413. slv->relays = GNUNET_malloc (relay_size);
  1414. memcpy (slv->relays, &req[1], relay_size);
  1415. }
  1416. chn = &slv->chn;
  1417. chn->is_master = GNUNET_NO;
  1418. chn->pub_key = req->channel_key;
  1419. chn->pub_key_hash = pub_key_hash;
  1420. channel_init (chn);
  1421. if (NULL == chn_slv)
  1422. {
  1423. chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
  1424. GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv,
  1425. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
  1426. }
  1427. GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
  1428. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
  1429. GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
  1430. GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
  1431. chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
  1432. &store_recv_slave_counters, slv);
  1433. }
  1434. else
  1435. {
  1436. chn = &slv->chn;
  1437. struct GNUNET_PSYC_CountersResultMessage res;
  1438. res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
  1439. res.header.size = htons (sizeof (res));
  1440. res.result_code = htonl ((uint32_t) GNUNET_OK - INT32_MIN);
  1441. res.max_message_id = GNUNET_htonll (chn->max_message_id);
  1442. GNUNET_SERVER_notification_context_add (nc, client);
  1443. GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
  1444. GNUNET_NO);
  1445. if (NULL == slv->member)
  1446. {
  1447. slv->member
  1448. = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
  1449. &slv->origin,
  1450. slv->relay_count, slv->relays,
  1451. &slv->join_msg->header,
  1452. &mcast_recv_join_request,
  1453. &mcast_recv_join_decision,
  1454. &mcast_recv_membership_test,
  1455. &mcast_recv_replay_fragment,
  1456. &mcast_recv_replay_message,
  1457. &mcast_recv_message, chn);
  1458. if (NULL != slv->join_msg)
  1459. {
  1460. GNUNET_free (slv->join_msg);
  1461. slv->join_msg = NULL;
  1462. }
  1463. }
  1464. else if (NULL != slv->join_dcsn)
  1465. {
  1466. GNUNET_SERVER_notification_context_add (nc, client);
  1467. GNUNET_SERVER_notification_context_unicast (nc, client,
  1468. &slv->join_dcsn->header,
  1469. GNUNET_NO);
  1470. }
  1471. }
  1472. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1473. "%p Client connected as slave to channel %s.\n",
  1474. slv, GNUNET_h2s (&chn->pub_key_hash));
  1475. struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
  1476. cli->client = client;
  1477. GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
  1478. GNUNET_SERVER_client_set_user_context (client, chn);
  1479. GNUNET_SERVER_receive_done (client, GNUNET_OK);
  1480. }
  1481. struct JoinDecisionClosure
  1482. {
  1483. int32_t is_admitted;
  1484. struct GNUNET_MessageHeader *msg;
  1485. };
  1486. /**
  1487. * Iterator callback for sending join decisions to multicast.
  1488. */
  1489. static int
  1490. mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash,
  1491. void *value)
  1492. {
  1493. struct JoinDecisionClosure *jcls = cls;
  1494. struct GNUNET_MULTICAST_JoinHandle *jh = value;
  1495. // FIXME: add relays
  1496. GNUNET_MULTICAST_join_decision (jh, jcls->is_admitted, 0, NULL, jcls->msg);
  1497. return GNUNET_YES;
  1498. }
  1499. /**
  1500. * Join decision from client.
  1501. */
  1502. static void
  1503. client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
  1504. const struct GNUNET_MessageHeader *msg)
  1505. {
  1506. struct Channel *
  1507. chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
  1508. GNUNET_assert (GNUNET_YES == chn->is_master);
  1509. struct Master *mst = (struct Master *) chn;
  1510. struct GNUNET_PSYC_JoinDecisionMessage *
  1511. dcsn = (struct GNUNET_PSYC_JoinDecisionMessage *) msg;
  1512. struct JoinDecisionClosure jcls;
  1513. jcls.is_admitted = ntohl (dcsn->is_admitted);
  1514. jcls.msg
  1515. = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
  1516. ? (struct GNUNET_MessageHeader *) &dcsn[1]
  1517. : NULL;
  1518. struct GNUNET_HashCode slave_key_hash;
  1519. GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key),
  1520. &slave_key_hash);
  1521. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1522. "%p Got join decision (%d) from client for channel %s..\n",
  1523. mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
  1524. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1525. "%p ..and slave %s.\n",
  1526. mst, GNUNET_h2s (&slave_key_hash));
  1527. GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
  1528. &mcast_send_join_decision, &jcls);
  1529. GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash);
  1530. GNUNET_SERVER_receive_done (client, GNUNET_OK);
  1531. }
  1532. /**
  1533. * Send acknowledgement to a client.
  1534. *
  1535. * Sent after a message fragment has been passed on to multicast.
  1536. *
  1537. * @param chn The channel struct for the client.
  1538. */
  1539. static void
  1540. send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
  1541. {
  1542. struct GNUNET_MessageHeader res;
  1543. res.size = htons (sizeof (res));
  1544. res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
  1545. /* FIXME */
  1546. GNUNET_SERVER_notification_context_add (nc, client);
  1547. GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
  1548. }
  1549. /**
  1550. * Callback for the transmit functions of multicast.
  1551. */
  1552. static int
  1553. transmit_notify (void *cls, size_t *data_size, void *data)
  1554. {
  1555. struct Channel *chn = cls;
  1556. struct TransmitMessage *tmit_msg = chn->tmit_head;
  1557. if (NULL == tmit_msg || *data_size < tmit_msg->size)
  1558. {
  1559. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1560. "%p transmit_notify: nothing to send.\n", chn);
  1561. *data_size = 0;
  1562. return GNUNET_NO;
  1563. }
  1564. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1565. "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
  1566. *data_size = tmit_msg->size;
  1567. memcpy (data, &tmit_msg[1], *data_size);
  1568. int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES;
  1569. if (NULL != tmit_msg->client && GNUNET_NO == tmit_msg->ack_sent)
  1570. send_message_ack (chn, tmit_msg->client);
  1571. GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
  1572. GNUNET_free (tmit_msg);
  1573. if (NULL != chn->tmit_head)
  1574. {
  1575. transmit_message (chn);
  1576. }
  1577. else if (GNUNET_YES == chn->is_disconnected)
  1578. {
  1579. /* FIXME: handle partial message (when still in_transmit) */
  1580. cleanup_channel (chn);
  1581. }
  1582. return ret;
  1583. }
  1584. /**
  1585. * Callback for the transmit functions of multicast.
  1586. */
  1587. static int
  1588. master_transmit_notify (void *cls, size_t *data_size, void *data)
  1589. {
  1590. int ret = transmit_notify (cls, data_size, data);
  1591. if (GNUNET_YES == ret)
  1592. {
  1593. struct Master *mst = cls;
  1594. mst->tmit_handle = NULL;
  1595. }
  1596. return ret;
  1597. }
  1598. /**
  1599. * Callback for the transmit functions of multicast.
  1600. */
  1601. static int
  1602. slave_transmit_notify (void *cls, size_t *data_size, void *data)
  1603. {
  1604. int ret = transmit_notify (cls, data_size, data);
  1605. if (GNUNET_YES == ret)
  1606. {
  1607. struct Slave *slv = cls;
  1608. slv->tmit_handle = NULL;
  1609. }
  1610. return ret;
  1611. }
  1612. /**
  1613. * Transmit a message from a channel master to the multicast group.
  1614. */
  1615. static void
  1616. master_transmit_message (struct Master *mst)
  1617. {
  1618. if (NULL == mst->tmit_handle)
  1619. {
  1620. mst->tmit_handle
  1621. = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
  1622. mst->max_group_generation,
  1623. master_transmit_notify, mst);
  1624. }
  1625. else
  1626. {
  1627. GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
  1628. }
  1629. }
  1630. /**
  1631. * Transmit a message from a channel slave to the multicast group.
  1632. */
  1633. static void
  1634. slave_transmit_message (struct Slave *slv)
  1635. {
  1636. if (NULL == slv->tmit_handle)
  1637. {
  1638. slv->tmit_handle
  1639. = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
  1640. slave_transmit_notify, slv);
  1641. }
  1642. else
  1643. {
  1644. GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
  1645. }
  1646. }
  1647. static void
  1648. transmit_message (struct Channel *chn)
  1649. {
  1650. chn->is_master
  1651. ? master_transmit_message ((struct Master *) chn)
  1652. : slave_transmit_message ((struct Slave *) chn);
  1653. }
  1654. /**
  1655. * Queue a message from a channel master for sending to the multicast group.
  1656. */
  1657. static void
  1658. master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
  1659. uint16_t first_ptype, uint16_t last_ptype)
  1660. {
  1661. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst);
  1662. if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
  1663. {
  1664. tmit_msg->id = ++mst->max_message_id;
  1665. struct GNUNET_PSYC_MessageMethod *pmeth
  1666. = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
  1667. if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
  1668. {
  1669. pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
  1670. }
  1671. else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
  1672. {
  1673. pmeth->state_delta = GNUNET_htonll (tmit_msg->id
  1674. - mst->max_state_message_id);
  1675. }
  1676. else
  1677. {
  1678. pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
  1679. }
  1680. }
  1681. }
  1682. /**
  1683. * Queue a message from a channel slave for sending to the multicast group.
  1684. */
  1685. static void
  1686. slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
  1687. uint16_t first_ptype, uint16_t last_ptype)
  1688. {
  1689. if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
  1690. {
  1691. struct GNUNET_PSYC_MessageMethod *pmeth
  1692. = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
  1693. pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
  1694. tmit_msg->id = ++slv->max_request_id;
  1695. }
  1696. }
  1697. /**
  1698. * Queue PSYC message parts for sending to multicast.
  1699. *
  1700. * @param chn Channel to send to.
  1701. * @param client Client the message originates from.
  1702. * @param data_size Size of @a data.
  1703. * @param data Concatenated message parts.
  1704. * @param first_ptype First message part type in @a data.
  1705. * @param last_ptype Last message part type in @a data.
  1706. */
  1707. static struct TransmitMessage *
  1708. queue_message (struct Channel *chn,
  1709. struct GNUNET_SERVER_Client *client,
  1710. size_t data_size,
  1711. const void *data,
  1712. uint16_t first_ptype, uint16_t last_ptype)
  1713. {
  1714. struct TransmitMessage *
  1715. tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + data_size);
  1716. memcpy (&tmit_msg[1], data, data_size);
  1717. tmit_msg->client = client;
  1718. tmit_msg->size = data_size;
  1719. tmit_msg->state = chn->tmit_state;
  1720. /* FIXME: separate queue per message ID */
  1721. GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
  1722. chn->is_master
  1723. ? master_queue_message ((struct Master *) chn, tmit_msg,
  1724. first_ptype, last_ptype)
  1725. : slave_queue_message ((struct Slave *) chn, tmit_msg,
  1726. first_ptype, last_ptype);
  1727. return tmit_msg;
  1728. }
  1729. /**
  1730. * Cancel transmission of current message.
  1731. *
  1732. * @param chn Channel to send to.
  1733. * @param client Client the message originates from.
  1734. */
  1735. static void
  1736. transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
  1737. {
  1738. uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
  1739. struct GNUNET_MessageHeader msg;
  1740. msg.size = htons (sizeof (msg));
  1741. msg.type = htons (type);
  1742. queue_message (chn, client, sizeof (msg), &msg, type, type);
  1743. transmit_message (chn);
  1744. /* FIXME: cleanup */
  1745. }
  1746. /**
  1747. * Incoming message from a master or slave client.
  1748. */
  1749. static void
  1750. client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
  1751. const struct GNUNET_MessageHeader *msg)
  1752. {
  1753. struct Channel *
  1754. chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
  1755. GNUNET_assert (NULL != chn);
  1756. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1757. "%p Received message from client.\n", chn);
  1758. GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
  1759. if (GNUNET_YES != chn->is_ready)
  1760. {
  1761. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  1762. "%p Channel is not ready yet, disconnecting client.\n", chn);
  1763. GNUNET_break (0);
  1764. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  1765. return;
  1766. }
  1767. uint16_t size = ntohs (msg->size);
  1768. if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
  1769. {
  1770. GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", chn);
  1771. GNUNET_break (0);
  1772. transmit_cancel (chn, client);
  1773. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  1774. return;
  1775. }
  1776. uint16_t first_ptype = 0, last_ptype = 0;
  1777. if (GNUNET_SYSERR
  1778. == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
  1779. (const char *) &msg[1],
  1780. &first_ptype, &last_ptype))
  1781. {
  1782. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  1783. "%p Received invalid message part from client.\n", chn);
  1784. GNUNET_break (0);
  1785. transmit_cancel (chn, client);
  1786. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  1787. return;
  1788. }
  1789. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1790. "%p Received message with first part type %u and last part type %u.\n",
  1791. chn, first_ptype, last_ptype);
  1792. queue_message (chn, client, size - sizeof (*msg), &msg[1],
  1793. first_ptype, last_ptype);
  1794. transmit_message (chn);
  1795. /* FIXME: send a few ACKs even before transmit_notify is called */
  1796. GNUNET_SERVER_receive_done (client, GNUNET_OK);
  1797. };
  1798. struct MembershipStoreClosure
  1799. {
  1800. struct GNUNET_SERVER_Client *client;
  1801. struct Channel *chn;
  1802. uint64_t op_id;
  1803. };
  1804. /**
  1805. * Received result of GNUNET_PSYCSTORE_membership_store()
  1806. */
  1807. static void
  1808. store_recv_membership_store_result (void *cls, int64_t result, const char *err_msg)
  1809. {
  1810. struct MembershipStoreClosure *mcls = cls;
  1811. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1812. "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%s)\n",
  1813. mcls->chn, result, err_msg);
  1814. client_send_result (mcls->client, mcls->op_id, result, err_msg);
  1815. }
  1816. /**
  1817. * Client requests to add/remove a slave in the membership database.
  1818. */
  1819. static void
  1820. client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
  1821. const struct GNUNET_MessageHeader *msg)
  1822. {
  1823. struct Channel *
  1824. chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
  1825. GNUNET_assert (NULL != chn);
  1826. const struct ChannelMembershipStoreRequest *
  1827. req = (const struct ChannelMembershipStoreRequest *) msg;
  1828. struct MembershipStoreClosure *mcls = GNUNET_malloc (sizeof (*mcls));
  1829. mcls->client = client;
  1830. mcls->chn = chn;
  1831. mcls->op_id = req->op_id;
  1832. uint64_t announced_at = GNUNET_ntohll (req->announced_at);
  1833. uint64_t effective_since = GNUNET_ntohll (req->effective_since);
  1834. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1835. "%p Received membership store request from client.\n", chn);
  1836. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1837. "%p did_join: %u, announced_at: %" PRIu64 ", effective_since: %" PRIu64 "\n",
  1838. chn, req->did_join, announced_at, effective_since);
  1839. GNUNET_PSYCSTORE_membership_store (store, &chn->pub_key, &req->slave_key,
  1840. req->did_join, announced_at, effective_since,
  1841. 0, /* FIXME: group_generation */
  1842. &store_recv_membership_store_result, mcls);
  1843. GNUNET_SERVER_receive_done (client, GNUNET_OK);
  1844. }
  1845. static int
  1846. store_recv_fragment_history (void *cls,
  1847. struct GNUNET_MULTICAST_MessageHeader *msg,
  1848. enum GNUNET_PSYCSTORE_MessageFlags flags)
  1849. {
  1850. struct OperationClosure *opcls = cls;
  1851. struct Channel *chn = opcls->chn;
  1852. client_send_mcast_msg (chn, msg, GNUNET_PSYC_MESSAGE_HISTORIC);
  1853. return GNUNET_YES;
  1854. }
  1855. /**
  1856. * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
  1857. */
  1858. static void
  1859. store_recv_fragment_history_result (void *cls, int64_t result, const char *err_msg)
  1860. {
  1861. struct OperationClosure *opcls = cls;
  1862. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1863. "%p History replay #%" PRIu64 ": "
  1864. "PSYCSTORE returned %" PRId64 " (%s)\n",
  1865. opcls->chn, opcls->op_id, result, err_msg);
  1866. client_send_result (opcls->client, opcls->op_id, result, err_msg);
  1867. }
  1868. /**
  1869. * Client requests channel history from PSYCstore.
  1870. */
  1871. static void
  1872. client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
  1873. const struct GNUNET_MessageHeader *msg)
  1874. {
  1875. struct Channel *
  1876. chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
  1877. GNUNET_assert (NULL != chn);
  1878. const struct HistoryRequest *
  1879. req = (const struct HistoryRequest *) msg;
  1880. struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
  1881. opcls->client = client;
  1882. opcls->chn = chn;
  1883. opcls->op_id = req->op_id;
  1884. if (0 == req->message_limit)
  1885. GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
  1886. GNUNET_ntohll (req->start_message_id),
  1887. GNUNET_ntohll (req->end_message_id),
  1888. &store_recv_fragment_history,
  1889. &store_recv_fragment_history_result, opcls);
  1890. else
  1891. GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
  1892. GNUNET_ntohll (req->message_limit),
  1893. &store_recv_fragment_history,
  1894. &store_recv_fragment_history_result,
  1895. opcls);
  1896. GNUNET_SERVER_receive_done (client, GNUNET_OK);
  1897. }
  1898. /**
  1899. * Received state var from PSYCstore, send it to client.
  1900. */
  1901. static int
  1902. store_recv_state_var (void *cls, const char *name,
  1903. const void *value, size_t value_size)
  1904. {
  1905. struct OperationClosure *opcls = cls;
  1906. struct OperationResult *op;
  1907. if (NULL != name)
  1908. {
  1909. uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
  1910. struct GNUNET_PSYC_MessageModifier *mod;
  1911. op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + name_size + value_size);
  1912. op->header.size = htons (sizeof (*op) + sizeof (*mod) + name_size + value_size);
  1913. op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
  1914. op->op_id = opcls->op_id;
  1915. mod = (struct GNUNET_PSYC_MessageModifier *) &op[1];
  1916. mod->header.size = htons (sizeof (*mod) + name_size + value_size);
  1917. mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
  1918. mod->name_size = htons (name_size);
  1919. mod->value_size = htonl (value_size);
  1920. mod->oper = htons (GNUNET_ENV_OP_ASSIGN);
  1921. memcpy (&mod[1], name, name_size);
  1922. memcpy (((char *) &mod[1]) + name_size, value, value_size);
  1923. }
  1924. else
  1925. {
  1926. struct GNUNET_MessageHeader *mod;
  1927. op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + value_size);
  1928. op->header.size = htons (sizeof (*op) + sizeof (*mod) + value_size);
  1929. op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
  1930. op->op_id = opcls->op_id;
  1931. mod = (struct GNUNET_MessageHeader *) &op[1];
  1932. mod->size = htons (sizeof (*mod) + value_size);
  1933. mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
  1934. memcpy (&mod[1], value, value_size);
  1935. }
  1936. GNUNET_SERVER_notification_context_add (nc, opcls->client);
  1937. GNUNET_SERVER_notification_context_unicast (nc, opcls->client, &op->header,
  1938. GNUNET_NO);
  1939. return GNUNET_YES;
  1940. }
  1941. /**
  1942. * Received result of GNUNET_PSYCSTORE_state_get()
  1943. * or GNUNET_PSYCSTORE_state_get_prefix()
  1944. */
  1945. static void
  1946. store_recv_state_result (void *cls, int64_t result, const char *err_msg)
  1947. {
  1948. struct OperationClosure *opcls = cls;
  1949. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1950. "%p History replay #%" PRIu64 ": "
  1951. "PSYCSTORE returned %" PRId64 " (%s)\n",
  1952. opcls->chn, opcls->op_id, result, err_msg);
  1953. client_send_result (opcls->client, opcls->op_id, result, err_msg);
  1954. }
  1955. /**
  1956. * Client requests best matching state variable from PSYCstore.
  1957. */
  1958. static void
  1959. client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
  1960. const struct GNUNET_MessageHeader *msg)
  1961. {
  1962. struct Channel *
  1963. chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
  1964. GNUNET_assert (NULL != chn);
  1965. const struct StateRequest *
  1966. req = (const struct StateRequest *) msg;
  1967. uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
  1968. const char *name = (const char *) &req[1];
  1969. if (0 == name_size || '\0' != name[name_size - 1])
  1970. {
  1971. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  1972. return;
  1973. }
  1974. struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
  1975. opcls->client = client;
  1976. opcls->chn = chn;
  1977. opcls->op_id = req->op_id;
  1978. GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
  1979. &store_recv_state_var,
  1980. &store_recv_state_result, opcls);
  1981. GNUNET_SERVER_receive_done (client, GNUNET_OK);
  1982. }
  1983. /**
  1984. * Client requests state variables with a given prefix from PSYCstore.
  1985. */
  1986. static void
  1987. client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
  1988. const struct GNUNET_MessageHeader *msg)
  1989. {
  1990. struct Channel *
  1991. chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
  1992. GNUNET_assert (NULL != chn);
  1993. const struct StateRequest *
  1994. req = (const struct StateRequest *) msg;
  1995. uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
  1996. const char *name = (const char *) &req[1];
  1997. if (0 == name_size || '\0' != name[name_size - 1])
  1998. {
  1999. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  2000. return;
  2001. }
  2002. struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
  2003. opcls->client = client;
  2004. opcls->chn = chn;
  2005. opcls->op_id = req->op_id;
  2006. GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
  2007. &store_recv_state_var,
  2008. &store_recv_state_result, opcls);
  2009. GNUNET_SERVER_receive_done (client, GNUNET_OK);
  2010. }
  2011. static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
  2012. { &client_recv_master_start, NULL,
  2013. GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
  2014. { &client_recv_slave_join, NULL,
  2015. GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
  2016. { &client_recv_join_decision, NULL,
  2017. GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
  2018. { &client_recv_psyc_message, NULL,
  2019. GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
  2020. { &client_recv_membership_store, NULL,
  2021. GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
  2022. { &client_recv_history_replay, NULL,
  2023. GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
  2024. { &client_recv_state_get, NULL,
  2025. GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
  2026. { &client_recv_state_get_prefix, NULL,
  2027. GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
  2028. { NULL, NULL, 0, 0 }
  2029. };
  2030. /**
  2031. * Initialize the PSYC service.
  2032. *
  2033. * @param cls Closure.
  2034. * @param server The initialized server.
  2035. * @param c Configuration to use.
  2036. */
  2037. static void
  2038. run (void *cls, struct GNUNET_SERVER_Handle *server,
  2039. const struct GNUNET_CONFIGURATION_Handle *c)
  2040. {
  2041. cfg = c;
  2042. store = GNUNET_PSYCSTORE_connect (cfg);
  2043. stats = GNUNET_STATISTICS_create ("psyc", cfg);
  2044. masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
  2045. slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
  2046. channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
  2047. recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
  2048. nc = GNUNET_SERVER_notification_context_create (server, 1);
  2049. GNUNET_SERVER_add_handlers (server, server_handlers);
  2050. GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
  2051. GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
  2052. &shutdown_task, NULL);
  2053. }
  2054. /**
  2055. * The main function for the service.
  2056. *
  2057. * @param argc number of arguments from the command line
  2058. * @param argv command line arguments
  2059. * @return 0 ok, 1 on error
  2060. */
  2061. int
  2062. main (int argc, char *const *argv)
  2063. {
  2064. return (GNUNET_OK ==
  2065. GNUNET_SERVICE_run (argc, argv, "psyc",
  2066. GNUNET_SERVICE_OPTION_NONE,
  2067. &run, NULL)) ? 0 : 1;
  2068. }
  2069. /* end of gnunet-service-psyc.c */