gnunet-service-set.c 63 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076
  1. /*
  2. This file is part of GNUnet
  3. Copyright (C) 2013-2017 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file set/gnunet-service-set.c
  18. * @brief two-peer set operations
  19. * @author Florian Dold
  20. * @author Christian Grothoff
  21. */
  22. #include "gnunet-service-set.h"
  23. #include "gnunet-service-set_union.h"
  24. #include "gnunet-service-set_intersection.h"
  25. #include "gnunet-service-set_protocol.h"
  26. #include "gnunet_statistics_service.h"
  27. /**
  28. * How long do we hold on to an incoming channel if there is
  29. * no local listener before giving up?
  30. */
  31. #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
  32. /**
  33. * Lazy copy requests made by a client.
  34. */
  35. struct LazyCopyRequest
  36. {
  37. /**
  38. * Kept in a DLL.
  39. */
  40. struct LazyCopyRequest *prev;
  41. /**
  42. * Kept in a DLL.
  43. */
  44. struct LazyCopyRequest *next;
  45. /**
  46. * Which set are we supposed to copy?
  47. */
  48. struct Set *source_set;
  49. /**
  50. * Cookie identifying the request.
  51. */
  52. uint32_t cookie;
  53. };
  54. /**
  55. * A listener is inhabited by a client, and waits for evaluation
  56. * requests from remote peers.
  57. */
  58. struct Listener
  59. {
  60. /**
  61. * Listeners are held in a doubly linked list.
  62. */
  63. struct Listener *next;
  64. /**
  65. * Listeners are held in a doubly linked list.
  66. */
  67. struct Listener *prev;
  68. /**
  69. * Head of DLL of operations this listener is responsible for.
  70. * Once the client has accepted/declined the operation, the
  71. * operation is moved to the respective set's operation DLLS.
  72. */
  73. struct Operation *op_head;
  74. /**
  75. * Tail of DLL of operations this listener is responsible for.
  76. * Once the client has accepted/declined the operation, the
  77. * operation is moved to the respective set's operation DLLS.
  78. */
  79. struct Operation *op_tail;
  80. /**
  81. * Client that owns the listener.
  82. * Only one client may own a listener.
  83. */
  84. struct ClientState *cs;
  85. /**
  86. * The port we are listening on with CADET.
  87. */
  88. struct GNUNET_CADET_Port *open_port;
  89. /**
  90. * Application ID for the operation, used to distinguish
  91. * multiple operations of the same type with the same peer.
  92. */
  93. struct GNUNET_HashCode app_id;
  94. /**
  95. * The type of the operation.
  96. */
  97. enum GNUNET_SET_OperationType operation;
  98. };
  99. /**
  100. * Handle to the cadet service, used to listen for and connect to
  101. * remote peers.
  102. */
  103. static struct GNUNET_CADET_Handle *cadet;
  104. /**
  105. * DLL of lazy copy requests by this client.
  106. */
  107. static struct LazyCopyRequest *lazy_copy_head;
  108. /**
  109. * DLL of lazy copy requests by this client.
  110. */
  111. static struct LazyCopyRequest *lazy_copy_tail;
  112. /**
  113. * Generator for unique cookie we set per lazy copy request.
  114. */
  115. static uint32_t lazy_copy_cookie;
  116. /**
  117. * Statistics handle.
  118. */
  119. struct GNUNET_STATISTICS_Handle *_GSS_statistics;
  120. /**
  121. * Listeners are held in a doubly linked list.
  122. */
  123. static struct Listener *listener_head;
  124. /**
  125. * Listeners are held in a doubly linked list.
  126. */
  127. static struct Listener *listener_tail;
  128. /**
  129. * Number of active clients.
  130. */
  131. static unsigned int num_clients;
  132. /**
  133. * Are we in shutdown? if #GNUNET_YES and the number of clients
  134. * drops to zero, disconnect from CADET.
  135. */
  136. static int in_shutdown;
  137. /**
  138. * Counter for allocating unique IDs for clients, used to identify
  139. * incoming operation requests from remote peers, that the client can
  140. * choose to accept or refuse. 0 must not be used (reserved for
  141. * uninitialized).
  142. */
  143. static uint32_t suggest_id;
  144. /**
  145. * Get the incoming socket associated with the given id.
  146. *
  147. * @param listener the listener to look in
  148. * @param id id to look for
  149. * @return the incoming socket associated with the id,
  150. * or NULL if there is none
  151. */
  152. static struct Operation *
  153. get_incoming (uint32_t id)
  154. {
  155. for (struct Listener *listener = listener_head;
  156. NULL != listener;
  157. listener = listener->next)
  158. {
  159. for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
  160. if (op->suggest_id == id)
  161. return op;
  162. }
  163. return NULL;
  164. }
  165. /**
  166. * Destroy an incoming request from a remote peer
  167. *
  168. * @param op remote request to destroy
  169. */
  170. static void
  171. incoming_destroy (struct Operation *op)
  172. {
  173. struct Listener *listener;
  174. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  175. "Destroying incoming operation %p\n",
  176. op);
  177. if (NULL != (listener = op->listener))
  178. {
  179. GNUNET_CONTAINER_DLL_remove (listener->op_head,
  180. listener->op_tail,
  181. op);
  182. op->listener = NULL;
  183. }
  184. if (NULL != op->timeout_task)
  185. {
  186. GNUNET_SCHEDULER_cancel (op->timeout_task);
  187. op->timeout_task = NULL;
  188. }
  189. _GSS_operation_destroy2 (op);
  190. }
  191. /**
  192. * Context for the #garbage_collect_cb().
  193. */
  194. struct GarbageContext
  195. {
  196. /**
  197. * Map for which we are garbage collecting removed elements.
  198. */
  199. struct GNUNET_CONTAINER_MultiHashMap *map;
  200. /**
  201. * Lowest generation for which an operation is still pending.
  202. */
  203. unsigned int min_op_generation;
  204. /**
  205. * Largest generation for which an operation is still pending.
  206. */
  207. unsigned int max_op_generation;
  208. };
  209. /**
  210. * Function invoked to check if an element can be removed from
  211. * the set's history because it is no longer needed.
  212. *
  213. * @param cls the `struct GarbageContext *`
  214. * @param key key of the element in the map
  215. * @param value the `struct ElementEntry *`
  216. * @return #GNUNET_OK (continue to iterate)
  217. */
  218. static int
  219. garbage_collect_cb (void *cls,
  220. const struct GNUNET_HashCode *key,
  221. void *value)
  222. {
  223. //struct GarbageContext *gc = cls;
  224. //struct ElementEntry *ee = value;
  225. //if (GNUNET_YES != ee->removed)
  226. // return GNUNET_OK;
  227. //if ( (gc->max_op_generation < ee->generation_added) ||
  228. // (ee->generation_removed > gc->min_op_generation) )
  229. //{
  230. // GNUNET_assert (GNUNET_YES ==
  231. // GNUNET_CONTAINER_multihashmap_remove (gc->map,
  232. // key,
  233. // ee));
  234. // GNUNET_free (ee);
  235. //}
  236. return GNUNET_OK;
  237. }
  238. /**
  239. * Collect and destroy elements that are not needed anymore, because
  240. * their lifetime (as determined by their generation) does not overlap
  241. * with any active set operation.
  242. *
  243. * @param set set to garbage collect
  244. */
  245. static void
  246. collect_generation_garbage (struct Set *set)
  247. {
  248. struct GarbageContext gc;
  249. gc.min_op_generation = UINT_MAX;
  250. gc.max_op_generation = 0;
  251. for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
  252. {
  253. gc.min_op_generation = GNUNET_MIN (gc.min_op_generation,
  254. op->generation_created);
  255. gc.max_op_generation = GNUNET_MAX (gc.max_op_generation,
  256. op->generation_created);
  257. }
  258. gc.map = set->content->elements;
  259. GNUNET_CONTAINER_multihashmap_iterate (set->content->elements,
  260. &garbage_collect_cb,
  261. &gc);
  262. }
  263. /**
  264. * Is @a generation in the range of exclusions?
  265. *
  266. * @param generation generation to query
  267. * @param excluded array of generations where the element is excluded
  268. * @param excluded_size length of the @a excluded array
  269. * @return #GNUNET_YES if @a generation is in any of the ranges
  270. */
  271. static int
  272. is_excluded_generation (unsigned int generation,
  273. struct GenerationRange *excluded,
  274. unsigned int excluded_size)
  275. {
  276. for (unsigned int i = 0; i < excluded_size; i++)
  277. if ( (generation >= excluded[i].start) &&
  278. (generation < excluded[i].end) )
  279. return GNUNET_YES;
  280. return GNUNET_NO;
  281. }
  282. /**
  283. * Is element @a ee part of the set during @a query_generation?
  284. *
  285. * @param ee element to test
  286. * @param query_generation generation to query
  287. * @param excluded array of generations where the element is excluded
  288. * @param excluded_size length of the @a excluded array
  289. * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
  290. */
  291. static int
  292. is_element_of_generation (struct ElementEntry *ee,
  293. unsigned int query_generation,
  294. struct GenerationRange *excluded,
  295. unsigned int excluded_size)
  296. {
  297. struct MutationEvent *mut;
  298. int is_present;
  299. GNUNET_assert (NULL != ee->mutations);
  300. if (GNUNET_YES ==
  301. is_excluded_generation (query_generation,
  302. excluded,
  303. excluded_size))
  304. {
  305. GNUNET_break (0);
  306. return GNUNET_NO;
  307. }
  308. is_present = GNUNET_NO;
  309. /* Could be made faster with binary search, but lists
  310. are small, so why bother. */
  311. for (unsigned int i = 0; i < ee->mutations_size; i++)
  312. {
  313. mut = &ee->mutations[i];
  314. if (mut->generation > query_generation)
  315. {
  316. /* The mutation doesn't apply to our generation
  317. anymore. We can'b break here, since mutations aren't
  318. sorted by generation. */
  319. continue;
  320. }
  321. if (GNUNET_YES ==
  322. is_excluded_generation (mut->generation,
  323. excluded,
  324. excluded_size))
  325. {
  326. /* The generation is excluded (because it belongs to another
  327. fork via a lazy copy) and thus mutations aren't considered
  328. for membership testing. */
  329. continue;
  330. }
  331. /* This would be an inconsistency in how we manage mutations. */
  332. if ( (GNUNET_YES == is_present) &&
  333. (GNUNET_YES == mut->added) )
  334. GNUNET_assert (0);
  335. /* Likewise. */
  336. if ( (GNUNET_NO == is_present) &&
  337. (GNUNET_NO == mut->added) )
  338. GNUNET_assert (0);
  339. is_present = mut->added;
  340. }
  341. return is_present;
  342. }
  343. /**
  344. * Is element @a ee part of the set used by @a op?
  345. *
  346. * @param ee element to test
  347. * @param op operation the defines the set and its generation
  348. * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
  349. */
  350. int
  351. _GSS_is_element_of_operation (struct ElementEntry *ee,
  352. struct Operation *op)
  353. {
  354. return is_element_of_generation (ee,
  355. op->generation_created,
  356. op->set->excluded_generations,
  357. op->set->excluded_generations_size);
  358. }
  359. /**
  360. * Destroy the given operation. Used for any operation where both
  361. * peers were known and that thus actually had a vt and channel. Must
  362. * not be used for operations where 'listener' is still set and we do
  363. * not know the other peer.
  364. *
  365. * Call the implementation-specific cancel function of the operation.
  366. * Disconnects from the remote peer. Does not disconnect the client,
  367. * as there may be multiple operations per set.
  368. *
  369. * @param op operation to destroy
  370. * @param gc #GNUNET_YES to perform garbage collection on the set
  371. */
  372. void
  373. _GSS_operation_destroy (struct Operation *op,
  374. int gc)
  375. {
  376. struct Set *set = op->set;
  377. struct GNUNET_CADET_Channel *channel;
  378. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  379. "Destroying operation %p\n",
  380. op);
  381. GNUNET_assert (NULL == op->listener);
  382. if (NULL != op->state)
  383. {
  384. set->vt->cancel (op);
  385. op->state = NULL;
  386. }
  387. if (NULL != set)
  388. {
  389. GNUNET_CONTAINER_DLL_remove (set->ops_head,
  390. set->ops_tail,
  391. op);
  392. op->set = NULL;
  393. }
  394. if (NULL != op->context_msg)
  395. {
  396. GNUNET_free (op->context_msg);
  397. op->context_msg = NULL;
  398. }
  399. if (NULL != (channel = op->channel))
  400. {
  401. /* This will free op; called conditionally as this helper function
  402. is also called from within the channel disconnect handler. */
  403. op->channel = NULL;
  404. GNUNET_CADET_channel_destroy (channel);
  405. }
  406. if ( (NULL != set) &&
  407. (GNUNET_YES == gc) )
  408. collect_generation_garbage (set);
  409. /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
  410. * there was a channel end handler that will free 'op' on the call stack. */
  411. }
  412. /**
  413. * Callback called when a client connects to the service.
  414. *
  415. * @param cls closure for the service
  416. * @param c the new client that connected to the service
  417. * @param mq the message queue used to send messages to the client
  418. * @return @a `struct ClientState`
  419. */
  420. static void *
  421. client_connect_cb (void *cls,
  422. struct GNUNET_SERVICE_Client *c,
  423. struct GNUNET_MQ_Handle *mq)
  424. {
  425. struct ClientState *cs;
  426. num_clients++;
  427. cs = GNUNET_new (struct ClientState);
  428. cs->client = c;
  429. cs->mq = mq;
  430. return cs;
  431. }
  432. /**
  433. * Iterator over hash map entries to free element entries.
  434. *
  435. * @param cls closure
  436. * @param key current key code
  437. * @param value a `struct ElementEntry *` to be free'd
  438. * @return #GNUNET_YES (continue to iterate)
  439. */
  440. static int
  441. destroy_elements_iterator (void *cls,
  442. const struct GNUNET_HashCode *key,
  443. void *value)
  444. {
  445. struct ElementEntry *ee = value;
  446. GNUNET_free_non_null (ee->mutations);
  447. GNUNET_free (ee);
  448. return GNUNET_YES;
  449. }
  450. /**
  451. * Clean up after a client has disconnected
  452. *
  453. * @param cls closure, unused
  454. * @param client the client to clean up after
  455. * @param internal_cls the `struct ClientState`
  456. */
  457. static void
  458. client_disconnect_cb (void *cls,
  459. struct GNUNET_SERVICE_Client *client,
  460. void *internal_cls)
  461. {
  462. struct ClientState *cs = internal_cls;
  463. struct Operation *op;
  464. struct Listener *listener;
  465. struct Set *set;
  466. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  467. "Client disconnected, cleaning up\n");
  468. if (NULL != (set = cs->set))
  469. {
  470. struct SetContent *content = set->content;
  471. struct PendingMutation *pm;
  472. struct PendingMutation *pm_current;
  473. struct LazyCopyRequest *lcr;
  474. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  475. "Destroying client's set\n");
  476. /* Destroy pending set operations */
  477. while (NULL != set->ops_head)
  478. _GSS_operation_destroy (set->ops_head,
  479. GNUNET_NO);
  480. /* Destroy operation-specific state */
  481. GNUNET_assert (NULL != set->state);
  482. set->vt->destroy_set (set->state);
  483. set->state = NULL;
  484. /* Clean up ongoing iterations */
  485. if (NULL != set->iter)
  486. {
  487. GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
  488. set->iter = NULL;
  489. set->iteration_id++;
  490. }
  491. /* discard any pending mutations that reference this set */
  492. pm = content->pending_mutations_head;
  493. while (NULL != pm)
  494. {
  495. pm_current = pm;
  496. pm = pm->next;
  497. if (pm_current->set == set)
  498. {
  499. GNUNET_CONTAINER_DLL_remove (content->pending_mutations_head,
  500. content->pending_mutations_tail,
  501. pm_current);
  502. GNUNET_free (pm_current);
  503. }
  504. }
  505. /* free set content (or at least decrement RC) */
  506. set->content = NULL;
  507. GNUNET_assert (0 != content->refcount);
  508. content->refcount--;
  509. if (0 == content->refcount)
  510. {
  511. GNUNET_assert (NULL != content->elements);
  512. GNUNET_CONTAINER_multihashmap_iterate (content->elements,
  513. &destroy_elements_iterator,
  514. NULL);
  515. GNUNET_CONTAINER_multihashmap_destroy (content->elements);
  516. content->elements = NULL;
  517. GNUNET_free (content);
  518. }
  519. GNUNET_free_non_null (set->excluded_generations);
  520. set->excluded_generations = NULL;
  521. /* remove set from pending copy requests */
  522. lcr = lazy_copy_head;
  523. while (NULL != lcr)
  524. {
  525. struct LazyCopyRequest *lcr_current = lcr;
  526. lcr = lcr->next;
  527. if (lcr_current->source_set == set)
  528. {
  529. GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
  530. lazy_copy_tail,
  531. lcr_current);
  532. GNUNET_free (lcr_current);
  533. }
  534. }
  535. GNUNET_free (set);
  536. }
  537. if (NULL != (listener = cs->listener))
  538. {
  539. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  540. "Destroying client's listener\n");
  541. GNUNET_CADET_close_port (listener->open_port);
  542. listener->open_port = NULL;
  543. while (NULL != (op = listener->op_head))
  544. {
  545. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  546. "Destroying incoming operation `%u' from peer `%s'\n",
  547. (unsigned int) op->client_request_id,
  548. GNUNET_i2s (&op->peer));
  549. incoming_destroy (op);
  550. }
  551. GNUNET_CONTAINER_DLL_remove (listener_head,
  552. listener_tail,
  553. listener);
  554. GNUNET_free (listener);
  555. }
  556. GNUNET_free (cs);
  557. num_clients--;
  558. if ( (GNUNET_YES == in_shutdown) &&
  559. (0 == num_clients) )
  560. {
  561. if (NULL != cadet)
  562. {
  563. GNUNET_CADET_disconnect (cadet);
  564. cadet = NULL;
  565. }
  566. }
  567. }
  568. /**
  569. * Check a request for a set operation from another peer.
  570. *
  571. * @param cls the operation state
  572. * @param msg the received message
  573. * @return #GNUNET_OK if the channel should be kept alive,
  574. * #GNUNET_SYSERR to destroy the channel
  575. */
  576. static int
  577. check_incoming_msg (void *cls,
  578. const struct OperationRequestMessage *msg)
  579. {
  580. struct Operation *op = cls;
  581. struct Listener *listener = op->listener;
  582. const struct GNUNET_MessageHeader *nested_context;
  583. /* double operation request */
  584. if (0 != op->suggest_id)
  585. {
  586. GNUNET_break_op (0);
  587. return GNUNET_SYSERR;
  588. }
  589. /* This should be equivalent to the previous condition, but can't hurt to check twice */
  590. if (NULL == op->listener)
  591. {
  592. GNUNET_break (0);
  593. return GNUNET_SYSERR;
  594. }
  595. if (listener->operation != (enum GNUNET_SET_OperationType) ntohl (msg->operation))
  596. {
  597. GNUNET_break_op (0);
  598. return GNUNET_SYSERR;
  599. }
  600. nested_context = GNUNET_MQ_extract_nested_mh (msg);
  601. if ( (NULL != nested_context) &&
  602. (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
  603. {
  604. GNUNET_break_op (0);
  605. return GNUNET_SYSERR;
  606. }
  607. return GNUNET_OK;
  608. }
  609. /**
  610. * Handle a request for a set operation from another peer. Checks if we
  611. * have a listener waiting for such a request (and in that case initiates
  612. * asking the listener about accepting the connection). If no listener
  613. * is waiting, we queue the operation request in hope that a listener
  614. * shows up soon (before timeout).
  615. *
  616. * This msg is expected as the first and only msg handled through the
  617. * non-operation bound virtual table, acceptance of this operation replaces
  618. * our virtual table and subsequent msgs would be routed differently (as
  619. * we then know what type of operation this is).
  620. *
  621. * @param cls the operation state
  622. * @param msg the received message
  623. * @return #GNUNET_OK if the channel should be kept alive,
  624. * #GNUNET_SYSERR to destroy the channel
  625. */
  626. static void
  627. handle_incoming_msg (void *cls,
  628. const struct OperationRequestMessage *msg)
  629. {
  630. struct Operation *op = cls;
  631. struct Listener *listener = op->listener;
  632. const struct GNUNET_MessageHeader *nested_context;
  633. struct GNUNET_MQ_Envelope *env;
  634. struct GNUNET_SET_RequestMessage *cmsg;
  635. nested_context = GNUNET_MQ_extract_nested_mh (msg);
  636. /* Make a copy of the nested_context (application-specific context
  637. information that is opaque to set) so we can pass it to the
  638. listener later on */
  639. if (NULL != nested_context)
  640. op->context_msg = GNUNET_copy_message (nested_context);
  641. op->remote_element_count = ntohl (msg->element_count);
  642. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  643. "Received P2P operation request (op %u, port %s) for active listener\n",
  644. (uint32_t) ntohl (msg->operation),
  645. GNUNET_h2s (&op->listener->app_id));
  646. GNUNET_assert (0 == op->suggest_id);
  647. if (0 == suggest_id)
  648. suggest_id++;
  649. op->suggest_id = suggest_id++;
  650. GNUNET_assert (NULL != op->timeout_task);
  651. GNUNET_SCHEDULER_cancel (op->timeout_task);
  652. op->timeout_task = NULL;
  653. env = GNUNET_MQ_msg_nested_mh (cmsg,
  654. GNUNET_MESSAGE_TYPE_SET_REQUEST,
  655. op->context_msg);
  656. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  657. "Suggesting incoming request with accept id %u to listener %p of client %p\n",
  658. op->suggest_id,
  659. listener,
  660. listener->cs);
  661. cmsg->accept_id = htonl (op->suggest_id);
  662. cmsg->peer_id = op->peer;
  663. GNUNET_MQ_send (listener->cs->mq,
  664. env);
  665. /* NOTE: GNUNET_CADET_receive_done() will be called in
  666. #handle_client_accept() */
  667. }
  668. /**
  669. * Add an element to @a set as specified by @a msg
  670. *
  671. * @param set set to manipulate
  672. * @param msg message specifying the change
  673. */
  674. static void
  675. execute_add (struct Set *set,
  676. const struct GNUNET_SET_ElementMessage *msg)
  677. {
  678. struct GNUNET_SET_Element el;
  679. struct ElementEntry *ee;
  680. struct GNUNET_HashCode hash;
  681. GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type));
  682. el.size = ntohs (msg->header.size) - sizeof (*msg);
  683. el.data = &msg[1];
  684. el.element_type = ntohs (msg->element_type);
  685. GNUNET_SET_element_hash (&el,
  686. &hash);
  687. ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
  688. &hash);
  689. if (NULL == ee)
  690. {
  691. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  692. "Client inserts element %s of size %u\n",
  693. GNUNET_h2s (&hash),
  694. el.size);
  695. ee = GNUNET_malloc (el.size + sizeof (*ee));
  696. ee->element.size = el.size;
  697. GNUNET_memcpy (&ee[1],
  698. el.data,
  699. el.size);
  700. ee->element.data = &ee[1];
  701. ee->element.element_type = el.element_type;
  702. ee->remote = GNUNET_NO;
  703. ee->mutations = NULL;
  704. ee->mutations_size = 0;
  705. ee->element_hash = hash;
  706. GNUNET_break (GNUNET_YES ==
  707. GNUNET_CONTAINER_multihashmap_put (set->content->elements,
  708. &ee->element_hash,
  709. ee,
  710. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
  711. }
  712. else if (GNUNET_YES ==
  713. is_element_of_generation (ee,
  714. set->current_generation,
  715. set->excluded_generations,
  716. set->excluded_generations_size))
  717. {
  718. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  719. "Client inserted element %s of size %u twice (ignored)\n",
  720. GNUNET_h2s (&hash),
  721. el.size);
  722. /* same element inserted twice */
  723. return;
  724. }
  725. {
  726. struct MutationEvent mut = {
  727. .generation = set->current_generation,
  728. .added = GNUNET_YES
  729. };
  730. GNUNET_array_append (ee->mutations,
  731. ee->mutations_size,
  732. mut);
  733. }
  734. set->vt->add (set->state,
  735. ee);
  736. }
  737. /**
  738. * Remove an element from @a set as specified by @a msg
  739. *
  740. * @param set set to manipulate
  741. * @param msg message specifying the change
  742. */
  743. static void
  744. execute_remove (struct Set *set,
  745. const struct GNUNET_SET_ElementMessage *msg)
  746. {
  747. struct GNUNET_SET_Element el;
  748. struct ElementEntry *ee;
  749. struct GNUNET_HashCode hash;
  750. GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (msg->header.type));
  751. el.size = ntohs (msg->header.size) - sizeof (*msg);
  752. el.data = &msg[1];
  753. el.element_type = ntohs (msg->element_type);
  754. GNUNET_SET_element_hash (&el, &hash);
  755. ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
  756. &hash);
  757. if (NULL == ee)
  758. {
  759. /* Client tried to remove non-existing element. */
  760. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  761. "Client removes non-existing element of size %u\n",
  762. el.size);
  763. return;
  764. }
  765. if (GNUNET_NO ==
  766. is_element_of_generation (ee,
  767. set->current_generation,
  768. set->excluded_generations,
  769. set->excluded_generations_size))
  770. {
  771. /* Client tried to remove element twice */
  772. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  773. "Client removed element of size %u twice (ignored)\n",
  774. el.size);
  775. return;
  776. }
  777. else
  778. {
  779. struct MutationEvent mut = {
  780. .generation = set->current_generation,
  781. .added = GNUNET_NO
  782. };
  783. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  784. "Client removes element of size %u\n",
  785. el.size);
  786. GNUNET_array_append (ee->mutations,
  787. ee->mutations_size,
  788. mut);
  789. }
  790. set->vt->remove (set->state,
  791. ee);
  792. }
  793. /**
  794. * Perform a mutation on a set as specified by the @a msg
  795. *
  796. * @param set the set to mutate
  797. * @param msg specification of what to change
  798. */
  799. static void
  800. execute_mutation (struct Set *set,
  801. const struct GNUNET_SET_ElementMessage *msg)
  802. {
  803. switch (ntohs (msg->header.type))
  804. {
  805. case GNUNET_MESSAGE_TYPE_SET_ADD:
  806. execute_add (set, msg);
  807. break;
  808. case GNUNET_MESSAGE_TYPE_SET_REMOVE:
  809. execute_remove (set, msg);
  810. break;
  811. default:
  812. GNUNET_break (0);
  813. }
  814. }
  815. /**
  816. * Execute mutations that were delayed on a set because of
  817. * pending operations.
  818. *
  819. * @param set the set to execute mutations on
  820. */
  821. static void
  822. execute_delayed_mutations (struct Set *set)
  823. {
  824. struct PendingMutation *pm;
  825. if (0 != set->content->iterator_count)
  826. return; /* still cannot do this */
  827. while (NULL != (pm = set->content->pending_mutations_head))
  828. {
  829. GNUNET_CONTAINER_DLL_remove (set->content->pending_mutations_head,
  830. set->content->pending_mutations_tail,
  831. pm);
  832. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  833. "Executing pending mutation on %p.\n",
  834. pm->set);
  835. execute_mutation (pm->set,
  836. pm->msg);
  837. GNUNET_free (pm->msg);
  838. GNUNET_free (pm);
  839. }
  840. }
  841. /**
  842. * Send the next element of a set to the set's client. The next element is given by
  843. * the set's current hashmap iterator. The set's iterator will be set to NULL if there
  844. * are no more elements in the set. The caller must ensure that the set's iterator is
  845. * valid.
  846. *
  847. * The client will acknowledge each received element with a
  848. * #GNUNET_MESSAGE_TYPE_SET_ITER_ACK message. Our
  849. * #handle_client_iter_ack() will then trigger the next transmission.
  850. * Note that the #GNUNET_MESSAGE_TYPE_SET_ITER_DONE is not acknowledged.
  851. *
  852. * @param set set that should send its next element to its client
  853. */
  854. static void
  855. send_client_element (struct Set *set)
  856. {
  857. int ret;
  858. struct ElementEntry *ee;
  859. struct GNUNET_MQ_Envelope *ev;
  860. struct GNUNET_SET_IterResponseMessage *msg;
  861. GNUNET_assert (NULL != set->iter);
  862. do {
  863. ret = GNUNET_CONTAINER_multihashmap_iterator_next (set->iter,
  864. NULL,
  865. (const void **) &ee);
  866. if (GNUNET_NO == ret)
  867. {
  868. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  869. "Iteration on %p done.\n",
  870. set);
  871. ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
  872. GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
  873. set->iter = NULL;
  874. set->iteration_id++;
  875. GNUNET_assert (set->content->iterator_count > 0);
  876. set->content->iterator_count--;
  877. execute_delayed_mutations (set);
  878. GNUNET_MQ_send (set->cs->mq,
  879. ev);
  880. return;
  881. }
  882. GNUNET_assert (NULL != ee);
  883. } while (GNUNET_NO ==
  884. is_element_of_generation (ee,
  885. set->iter_generation,
  886. set->excluded_generations,
  887. set->excluded_generations_size));
  888. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  889. "Sending iteration element on %p.\n",
  890. set);
  891. ev = GNUNET_MQ_msg_extra (msg,
  892. ee->element.size,
  893. GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT);
  894. GNUNET_memcpy (&msg[1],
  895. ee->element.data,
  896. ee->element.size);
  897. msg->element_type = htons (ee->element.element_type);
  898. msg->iteration_id = htons (set->iteration_id);
  899. GNUNET_MQ_send (set->cs->mq,
  900. ev);
  901. }
  902. /**
  903. * Called when a client wants to iterate the elements of a set.
  904. * Checks if we have a set associated with the client and if we
  905. * can right now start an iteration. If all checks out, starts
  906. * sending the elements of the set to the client.
  907. *
  908. * @param cls client that sent the message
  909. * @param m message sent by the client
  910. */
  911. static void
  912. handle_client_iterate (void *cls,
  913. const struct GNUNET_MessageHeader *m)
  914. {
  915. struct ClientState *cs = cls;
  916. struct Set *set;
  917. if (NULL == (set = cs->set))
  918. {
  919. /* attempt to iterate over a non existing set */
  920. GNUNET_break (0);
  921. GNUNET_SERVICE_client_drop (cs->client);
  922. return;
  923. }
  924. if (NULL != set->iter)
  925. {
  926. /* Only one concurrent iterate-action allowed per set */
  927. GNUNET_break (0);
  928. GNUNET_SERVICE_client_drop (cs->client);
  929. return;
  930. }
  931. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  932. "Iterating set %p in gen %u with %u content elements\n",
  933. (void *) set,
  934. set->current_generation,
  935. GNUNET_CONTAINER_multihashmap_size (set->content->elements));
  936. GNUNET_SERVICE_client_continue (cs->client);
  937. set->content->iterator_count++;
  938. set->iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->content->elements);
  939. set->iter_generation = set->current_generation;
  940. send_client_element (set);
  941. }
  942. /**
  943. * Called when a client wants to create a new set. This is typically
  944. * the first request from a client, and includes the type of set
  945. * operation to be performed.
  946. *
  947. * @param cls client that sent the message
  948. * @param m message sent by the client
  949. */
  950. static void
  951. handle_client_create_set (void *cls,
  952. const struct GNUNET_SET_CreateMessage *msg)
  953. {
  954. struct ClientState *cs = cls;
  955. struct Set *set;
  956. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  957. "Client created new set (operation %u)\n",
  958. (uint32_t) ntohl (msg->operation));
  959. if (NULL != cs->set)
  960. {
  961. /* There can only be one set per client */
  962. GNUNET_break (0);
  963. GNUNET_SERVICE_client_drop (cs->client);
  964. return;
  965. }
  966. set = GNUNET_new (struct Set);
  967. switch (ntohl (msg->operation))
  968. {
  969. case GNUNET_SET_OPERATION_INTERSECTION:
  970. set->vt = _GSS_intersection_vt ();
  971. break;
  972. case GNUNET_SET_OPERATION_UNION:
  973. set->vt = _GSS_union_vt ();
  974. break;
  975. default:
  976. GNUNET_free (set);
  977. GNUNET_break (0);
  978. GNUNET_SERVICE_client_drop (cs->client);
  979. return;
  980. }
  981. set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
  982. set->state = set->vt->create ();
  983. if (NULL == set->state)
  984. {
  985. /* initialization failed (i.e. out of memory) */
  986. GNUNET_free (set);
  987. GNUNET_SERVICE_client_drop (cs->client);
  988. return;
  989. }
  990. set->content = GNUNET_new (struct SetContent);
  991. set->content->refcount = 1;
  992. set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
  993. GNUNET_YES);
  994. set->cs = cs;
  995. cs->set = set;
  996. GNUNET_SERVICE_client_continue (cs->client);
  997. }
  998. /**
  999. * Timeout happens iff:
  1000. * - we suggested an operation to our listener,
  1001. * but did not receive a response in time
  1002. * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
  1003. *
  1004. * @param cls channel context
  1005. * @param tc context information (why was this task triggered now)
  1006. */
  1007. static void
  1008. incoming_timeout_cb (void *cls)
  1009. {
  1010. struct Operation *op = cls;
  1011. op->timeout_task = NULL;
  1012. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1013. "Remote peer's incoming request timed out\n");
  1014. incoming_destroy (op);
  1015. }
  1016. /**
  1017. * Method called whenever another peer has added us to a channel the
  1018. * other peer initiated. Only called (once) upon reception of data
  1019. * from a channel we listen on.
  1020. *
  1021. * The channel context represents the operation itself and gets added
  1022. * to a DLL, from where it gets looked up when our local listener
  1023. * client responds to a proposed/suggested operation or connects and
  1024. * associates with this operation.
  1025. *
  1026. * @param cls closure
  1027. * @param channel new handle to the channel
  1028. * @param source peer that started the channel
  1029. * @return initial channel context for the channel
  1030. * returns NULL on error
  1031. */
  1032. static void *
  1033. channel_new_cb (void *cls,
  1034. struct GNUNET_CADET_Channel *channel,
  1035. const struct GNUNET_PeerIdentity *source)
  1036. {
  1037. struct Listener *listener = cls;
  1038. struct Operation *op;
  1039. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1040. "New incoming channel\n");
  1041. op = GNUNET_new (struct Operation);
  1042. op->listener = listener;
  1043. op->peer = *source;
  1044. op->channel = channel;
  1045. op->mq = GNUNET_CADET_get_mq (op->channel);
  1046. op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
  1047. UINT32_MAX);
  1048. op->timeout_task
  1049. = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
  1050. &incoming_timeout_cb,
  1051. op);
  1052. GNUNET_CONTAINER_DLL_insert (listener->op_head,
  1053. listener->op_tail,
  1054. op);
  1055. return op;
  1056. }
  1057. /**
  1058. * Function called whenever a channel is destroyed. Should clean up
  1059. * any associated state. It must NOT call
  1060. * GNUNET_CADET_channel_destroy() on the channel.
  1061. *
  1062. * The peer_disconnect function is part of a a virtual table set initially either
  1063. * when a peer creates a new channel with us, or once we create
  1064. * a new channel ourselves (evaluate).
  1065. *
  1066. * Once we know the exact type of operation (union/intersection), the vt is
  1067. * replaced with an operation specific instance (_GSS_[op]_vt).
  1068. *
  1069. * @param channel_ctx place where local state associated
  1070. * with the channel is stored
  1071. * @param channel connection to the other end (henceforth invalid)
  1072. */
  1073. static void
  1074. channel_end_cb (void *channel_ctx,
  1075. const struct GNUNET_CADET_Channel *channel)
  1076. {
  1077. struct Operation *op = channel_ctx;
  1078. op->channel = NULL;
  1079. _GSS_operation_destroy2 (op);
  1080. }
  1081. /**
  1082. * This function probably should not exist
  1083. * and be replaced by inlining more specific
  1084. * logic in the various places where it is called.
  1085. */
  1086. void
  1087. _GSS_operation_destroy2 (struct Operation *op)
  1088. {
  1089. struct GNUNET_CADET_Channel *channel;
  1090. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1091. "channel_end_cb called\n");
  1092. if (NULL != (channel = op->channel))
  1093. {
  1094. /* This will free op; called conditionally as this helper function
  1095. is also called from within the channel disconnect handler. */
  1096. op->channel = NULL;
  1097. GNUNET_CADET_channel_destroy (channel);
  1098. }
  1099. if (NULL != op->listener)
  1100. {
  1101. incoming_destroy (op);
  1102. return;
  1103. }
  1104. if (NULL != op->set)
  1105. op->set->vt->channel_death (op);
  1106. else
  1107. _GSS_operation_destroy (op,
  1108. GNUNET_YES);
  1109. GNUNET_free (op);
  1110. }
  1111. /**
  1112. * Function called whenever an MQ-channel's transmission window size changes.
  1113. *
  1114. * The first callback in an outgoing channel will be with a non-zero value
  1115. * and will mean the channel is connected to the destination.
  1116. *
  1117. * For an incoming channel it will be called immediately after the
  1118. * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
  1119. *
  1120. * @param cls Channel closure.
  1121. * @param channel Connection to the other end (henceforth invalid).
  1122. * @param window_size New window size. If the is more messages than buffer size
  1123. * this value will be negative..
  1124. */
  1125. static void
  1126. channel_window_cb (void *cls,
  1127. const struct GNUNET_CADET_Channel *channel,
  1128. int window_size)
  1129. {
  1130. /* FIXME: not implemented, we could do flow control here... */
  1131. }
  1132. /**
  1133. * Called when a client wants to create a new listener.
  1134. *
  1135. * @param cls client that sent the message
  1136. * @param msg message sent by the client
  1137. */
  1138. static void
  1139. handle_client_listen (void *cls,
  1140. const struct GNUNET_SET_ListenMessage *msg)
  1141. {
  1142. struct ClientState *cs = cls;
  1143. struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
  1144. GNUNET_MQ_hd_var_size (incoming_msg,
  1145. GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
  1146. struct OperationRequestMessage,
  1147. NULL),
  1148. GNUNET_MQ_hd_var_size (union_p2p_ibf,
  1149. GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
  1150. struct IBFMessage,
  1151. NULL),
  1152. GNUNET_MQ_hd_var_size (union_p2p_elements,
  1153. GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
  1154. struct GNUNET_SET_ElementMessage,
  1155. NULL),
  1156. GNUNET_MQ_hd_var_size (union_p2p_offer,
  1157. GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
  1158. struct GNUNET_MessageHeader,
  1159. NULL),
  1160. GNUNET_MQ_hd_var_size (union_p2p_inquiry,
  1161. GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
  1162. struct InquiryMessage,
  1163. NULL),
  1164. GNUNET_MQ_hd_var_size (union_p2p_demand,
  1165. GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
  1166. struct GNUNET_MessageHeader,
  1167. NULL),
  1168. GNUNET_MQ_hd_fixed_size (union_p2p_done,
  1169. GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
  1170. struct GNUNET_MessageHeader,
  1171. NULL),
  1172. GNUNET_MQ_hd_fixed_size (union_p2p_over,
  1173. GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
  1174. struct GNUNET_MessageHeader,
  1175. NULL),
  1176. GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
  1177. GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
  1178. struct GNUNET_MessageHeader,
  1179. NULL),
  1180. GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
  1181. GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
  1182. struct GNUNET_MessageHeader,
  1183. NULL),
  1184. GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
  1185. GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
  1186. struct StrataEstimatorMessage,
  1187. NULL),
  1188. GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
  1189. GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
  1190. struct StrataEstimatorMessage,
  1191. NULL),
  1192. GNUNET_MQ_hd_var_size (union_p2p_full_element,
  1193. GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
  1194. struct GNUNET_SET_ElementMessage,
  1195. NULL),
  1196. GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
  1197. GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
  1198. struct IntersectionElementInfoMessage,
  1199. NULL),
  1200. GNUNET_MQ_hd_var_size (intersection_p2p_bf,
  1201. GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
  1202. struct BFMessage,
  1203. NULL),
  1204. GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
  1205. GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
  1206. struct IntersectionDoneMessage,
  1207. NULL),
  1208. GNUNET_MQ_handler_end ()
  1209. };
  1210. struct Listener *listener;
  1211. if (NULL != cs->listener)
  1212. {
  1213. /* max. one active listener per client! */
  1214. GNUNET_break (0);
  1215. GNUNET_SERVICE_client_drop (cs->client);
  1216. return;
  1217. }
  1218. listener = GNUNET_new (struct Listener);
  1219. listener->cs = cs;
  1220. cs->listener = listener;
  1221. listener->app_id = msg->app_id;
  1222. listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
  1223. GNUNET_CONTAINER_DLL_insert (listener_head,
  1224. listener_tail,
  1225. listener);
  1226. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1227. "New listener created (op %u, port %s)\n",
  1228. listener->operation,
  1229. GNUNET_h2s (&listener->app_id));
  1230. listener->open_port
  1231. = GNUNET_CADET_open_port (cadet,
  1232. &msg->app_id,
  1233. &channel_new_cb,
  1234. listener,
  1235. &channel_window_cb,
  1236. &channel_end_cb,
  1237. cadet_handlers);
  1238. GNUNET_SERVICE_client_continue (cs->client);
  1239. }
  1240. /**
  1241. * Called when the listening client rejects an operation
  1242. * request by another peer.
  1243. *
  1244. * @param cls client that sent the message
  1245. * @param msg message sent by the client
  1246. */
  1247. static void
  1248. handle_client_reject (void *cls,
  1249. const struct GNUNET_SET_RejectMessage *msg)
  1250. {
  1251. struct ClientState *cs = cls;
  1252. struct Operation *op;
  1253. op = get_incoming (ntohl (msg->accept_reject_id));
  1254. if (NULL == op)
  1255. {
  1256. /* no matching incoming operation for this reject;
  1257. could be that the other peer already disconnected... */
  1258. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  1259. "Client rejected unknown operation %u\n",
  1260. (unsigned int) ntohl (msg->accept_reject_id));
  1261. GNUNET_SERVICE_client_continue (cs->client);
  1262. return;
  1263. }
  1264. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1265. "Peer request (op %u, app %s) rejected by client\n",
  1266. op->listener->operation,
  1267. GNUNET_h2s (&cs->listener->app_id));
  1268. _GSS_operation_destroy2 (op);
  1269. GNUNET_SERVICE_client_continue (cs->client);
  1270. }
  1271. /**
  1272. * Called when a client wants to add or remove an element to a set it inhabits.
  1273. *
  1274. * @param cls client that sent the message
  1275. * @param msg message sent by the client
  1276. */
  1277. static int
  1278. check_client_mutation (void *cls,
  1279. const struct GNUNET_SET_ElementMessage *msg)
  1280. {
  1281. /* NOTE: Technically, we should probably check with the
  1282. block library whether the element we are given is well-formed */
  1283. return GNUNET_OK;
  1284. }
  1285. /**
  1286. * Called when a client wants to add or remove an element to a set it inhabits.
  1287. *
  1288. * @param cls client that sent the message
  1289. * @param msg message sent by the client
  1290. */
  1291. static void
  1292. handle_client_mutation (void *cls,
  1293. const struct GNUNET_SET_ElementMessage *msg)
  1294. {
  1295. struct ClientState *cs = cls;
  1296. struct Set *set;
  1297. if (NULL == (set = cs->set))
  1298. {
  1299. /* client without a set requested an operation */
  1300. GNUNET_break (0);
  1301. GNUNET_SERVICE_client_drop (cs->client);
  1302. return;
  1303. }
  1304. GNUNET_SERVICE_client_continue (cs->client);
  1305. if (0 != set->content->iterator_count)
  1306. {
  1307. struct PendingMutation *pm;
  1308. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1309. "Scheduling mutation on set\n");
  1310. pm = GNUNET_new (struct PendingMutation);
  1311. pm->msg = (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header);
  1312. pm->set = set;
  1313. GNUNET_CONTAINER_DLL_insert_tail (set->content->pending_mutations_head,
  1314. set->content->pending_mutations_tail,
  1315. pm);
  1316. return;
  1317. }
  1318. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1319. "Executing mutation on set\n");
  1320. execute_mutation (set,
  1321. msg);
  1322. }
  1323. /**
  1324. * Advance the current generation of a set,
  1325. * adding exclusion ranges if necessary.
  1326. *
  1327. * @param set the set where we want to advance the generation
  1328. */
  1329. static void
  1330. advance_generation (struct Set *set)
  1331. {
  1332. struct GenerationRange r;
  1333. if (set->current_generation == set->content->latest_generation)
  1334. {
  1335. set->content->latest_generation++;
  1336. set->current_generation++;
  1337. return;
  1338. }
  1339. GNUNET_assert (set->current_generation < set->content->latest_generation);
  1340. r.start = set->current_generation + 1;
  1341. r.end = set->content->latest_generation + 1;
  1342. set->content->latest_generation = r.end;
  1343. set->current_generation = r.end;
  1344. GNUNET_array_append (set->excluded_generations,
  1345. set->excluded_generations_size,
  1346. r);
  1347. }
  1348. /**
  1349. * Called when a client wants to initiate a set operation with another
  1350. * peer. Initiates the CADET connection to the listener and sends the
  1351. * request.
  1352. *
  1353. * @param cls client that sent the message
  1354. * @param msg message sent by the client
  1355. * @return #GNUNET_OK if the message is well-formed
  1356. */
  1357. static int
  1358. check_client_evaluate (void *cls,
  1359. const struct GNUNET_SET_EvaluateMessage *msg)
  1360. {
  1361. /* FIXME: suboptimal, even if the context below could be NULL,
  1362. there are malformed messages this does not check for... */
  1363. return GNUNET_OK;
  1364. }
  1365. /**
  1366. * Called when a client wants to initiate a set operation with another
  1367. * peer. Initiates the CADET connection to the listener and sends the
  1368. * request.
  1369. *
  1370. * @param cls client that sent the message
  1371. * @param msg message sent by the client
  1372. */
  1373. static void
  1374. handle_client_evaluate (void *cls,
  1375. const struct GNUNET_SET_EvaluateMessage *msg)
  1376. {
  1377. struct ClientState *cs = cls;
  1378. struct Operation *op = GNUNET_new (struct Operation);
  1379. const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
  1380. GNUNET_MQ_hd_var_size (incoming_msg,
  1381. GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
  1382. struct OperationRequestMessage,
  1383. op),
  1384. GNUNET_MQ_hd_var_size (union_p2p_ibf,
  1385. GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
  1386. struct IBFMessage,
  1387. op),
  1388. GNUNET_MQ_hd_var_size (union_p2p_elements,
  1389. GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
  1390. struct GNUNET_SET_ElementMessage,
  1391. op),
  1392. GNUNET_MQ_hd_var_size (union_p2p_offer,
  1393. GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
  1394. struct GNUNET_MessageHeader,
  1395. op),
  1396. GNUNET_MQ_hd_var_size (union_p2p_inquiry,
  1397. GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
  1398. struct InquiryMessage,
  1399. op),
  1400. GNUNET_MQ_hd_var_size (union_p2p_demand,
  1401. GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
  1402. struct GNUNET_MessageHeader,
  1403. op),
  1404. GNUNET_MQ_hd_fixed_size (union_p2p_done,
  1405. GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
  1406. struct GNUNET_MessageHeader,
  1407. op),
  1408. GNUNET_MQ_hd_fixed_size (union_p2p_over,
  1409. GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER,
  1410. struct GNUNET_MessageHeader,
  1411. op),
  1412. GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
  1413. GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
  1414. struct GNUNET_MessageHeader,
  1415. op),
  1416. GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
  1417. GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
  1418. struct GNUNET_MessageHeader,
  1419. op),
  1420. GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
  1421. GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
  1422. struct StrataEstimatorMessage,
  1423. op),
  1424. GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
  1425. GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
  1426. struct StrataEstimatorMessage,
  1427. op),
  1428. GNUNET_MQ_hd_var_size (union_p2p_full_element,
  1429. GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
  1430. struct GNUNET_SET_ElementMessage,
  1431. op),
  1432. GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
  1433. GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
  1434. struct IntersectionElementInfoMessage,
  1435. op),
  1436. GNUNET_MQ_hd_var_size (intersection_p2p_bf,
  1437. GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
  1438. struct BFMessage,
  1439. op),
  1440. GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
  1441. GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
  1442. struct IntersectionDoneMessage,
  1443. op),
  1444. GNUNET_MQ_handler_end ()
  1445. };
  1446. struct Set *set;
  1447. const struct GNUNET_MessageHeader *context;
  1448. if (NULL == (set = cs->set))
  1449. {
  1450. GNUNET_break (0);
  1451. GNUNET_free (op);
  1452. GNUNET_SERVICE_client_drop (cs->client);
  1453. return;
  1454. }
  1455. op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
  1456. UINT32_MAX);
  1457. op->peer = msg->target_peer;
  1458. op->result_mode = ntohl (msg->result_mode);
  1459. op->client_request_id = ntohl (msg->request_id);
  1460. op->byzantine = msg->byzantine;
  1461. op->byzantine_lower_bound = msg->byzantine_lower_bound;
  1462. op->force_full = msg->force_full;
  1463. op->force_delta = msg->force_delta;
  1464. context = GNUNET_MQ_extract_nested_mh (msg);
  1465. /* Advance generation values, so that
  1466. mutations won't interfer with the running operation. */
  1467. op->set = set;
  1468. op->generation_created = set->current_generation;
  1469. advance_generation (set);
  1470. GNUNET_CONTAINER_DLL_insert (set->ops_head,
  1471. set->ops_tail,
  1472. op);
  1473. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1474. "Creating new CADET channel to port %s for set operation type %u\n",
  1475. GNUNET_h2s (&msg->app_id),
  1476. set->operation);
  1477. op->channel = GNUNET_CADET_channel_create (cadet,
  1478. op,
  1479. &msg->target_peer,
  1480. &msg->app_id,
  1481. GNUNET_CADET_OPTION_RELIABLE,
  1482. &channel_window_cb,
  1483. &channel_end_cb,
  1484. cadet_handlers);
  1485. op->mq = GNUNET_CADET_get_mq (op->channel);
  1486. op->state = set->vt->evaluate (op,
  1487. context);
  1488. if (NULL == op->state)
  1489. {
  1490. GNUNET_break (0);
  1491. GNUNET_SERVICE_client_drop (cs->client);
  1492. return;
  1493. }
  1494. GNUNET_SERVICE_client_continue (cs->client);
  1495. }
  1496. /**
  1497. * Handle an ack from a client, and send the next element. Note
  1498. * that we only expect acks for set elements, not after the
  1499. * #GNUNET_MESSAGE_TYPE_SET_ITER_DONE message.
  1500. *
  1501. * @param cls client the client
  1502. * @param ack the message
  1503. */
  1504. static void
  1505. handle_client_iter_ack (void *cls,
  1506. const struct GNUNET_SET_IterAckMessage *ack)
  1507. {
  1508. struct ClientState *cs = cls;
  1509. struct Set *set;
  1510. if (NULL == (set = cs->set))
  1511. {
  1512. /* client without a set acknowledged receiving a value */
  1513. GNUNET_break (0);
  1514. GNUNET_SERVICE_client_drop (cs->client);
  1515. return;
  1516. }
  1517. if (NULL == set->iter)
  1518. {
  1519. /* client sent an ack, but we were not expecting one (as
  1520. set iteration has finished) */
  1521. GNUNET_break (0);
  1522. GNUNET_SERVICE_client_drop (cs->client);
  1523. return;
  1524. }
  1525. GNUNET_SERVICE_client_continue (cs->client);
  1526. if (ntohl (ack->send_more))
  1527. {
  1528. send_client_element (set);
  1529. }
  1530. else
  1531. {
  1532. GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
  1533. set->iter = NULL;
  1534. set->iteration_id++;
  1535. }
  1536. }
  1537. /**
  1538. * Handle a request from the client to copy a set.
  1539. *
  1540. * @param cls the client
  1541. * @param mh the message
  1542. */
  1543. static void
  1544. handle_client_copy_lazy_prepare (void *cls,
  1545. const struct GNUNET_MessageHeader *mh)
  1546. {
  1547. struct ClientState *cs = cls;
  1548. struct Set *set;
  1549. struct LazyCopyRequest *cr;
  1550. struct GNUNET_MQ_Envelope *ev;
  1551. struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
  1552. if (NULL == (set = cs->set))
  1553. {
  1554. /* client without a set requested an operation */
  1555. GNUNET_break (0);
  1556. GNUNET_SERVICE_client_drop (cs->client);
  1557. return;
  1558. }
  1559. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1560. "Client requested creation of lazy copy\n");
  1561. cr = GNUNET_new (struct LazyCopyRequest);
  1562. cr->cookie = ++lazy_copy_cookie;
  1563. cr->source_set = set;
  1564. GNUNET_CONTAINER_DLL_insert (lazy_copy_head,
  1565. lazy_copy_tail,
  1566. cr);
  1567. ev = GNUNET_MQ_msg (resp_msg,
  1568. GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE);
  1569. resp_msg->cookie = cr->cookie;
  1570. GNUNET_MQ_send (set->cs->mq,
  1571. ev);
  1572. GNUNET_SERVICE_client_continue (cs->client);
  1573. }
  1574. /**
  1575. * Handle a request from the client to connect to a copy of a set.
  1576. *
  1577. * @param cls the client
  1578. * @param msg the message
  1579. */
  1580. static void
  1581. handle_client_copy_lazy_connect (void *cls,
  1582. const struct GNUNET_SET_CopyLazyConnectMessage *msg)
  1583. {
  1584. struct ClientState *cs = cls;
  1585. struct LazyCopyRequest *cr;
  1586. struct Set *set;
  1587. int found;
  1588. if (NULL != cs->set)
  1589. {
  1590. /* There can only be one set per client */
  1591. GNUNET_break (0);
  1592. GNUNET_SERVICE_client_drop (cs->client);
  1593. return;
  1594. }
  1595. found = GNUNET_NO;
  1596. for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
  1597. {
  1598. if (cr->cookie == msg->cookie)
  1599. {
  1600. found = GNUNET_YES;
  1601. break;
  1602. }
  1603. }
  1604. if (GNUNET_NO == found)
  1605. {
  1606. /* client asked for copy with cookie we don't know */
  1607. GNUNET_break (0);
  1608. GNUNET_SERVICE_client_drop (cs->client);
  1609. return;
  1610. }
  1611. GNUNET_CONTAINER_DLL_remove (lazy_copy_head,
  1612. lazy_copy_tail,
  1613. cr);
  1614. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1615. "Client %p requested use of lazy copy\n",
  1616. cs);
  1617. set = GNUNET_new (struct Set);
  1618. switch (cr->source_set->operation)
  1619. {
  1620. case GNUNET_SET_OPERATION_INTERSECTION:
  1621. set->vt = _GSS_intersection_vt ();
  1622. break;
  1623. case GNUNET_SET_OPERATION_UNION:
  1624. set->vt = _GSS_union_vt ();
  1625. break;
  1626. default:
  1627. GNUNET_assert (0);
  1628. return;
  1629. }
  1630. if (NULL == set->vt->copy_state)
  1631. {
  1632. /* Lazy copy not supported for this set operation */
  1633. GNUNET_break (0);
  1634. GNUNET_free (set);
  1635. GNUNET_free (cr);
  1636. GNUNET_SERVICE_client_drop (cs->client);
  1637. return;
  1638. }
  1639. set->operation = cr->source_set->operation;
  1640. set->state = set->vt->copy_state (cr->source_set->state);
  1641. set->content = cr->source_set->content;
  1642. set->content->refcount++;
  1643. set->current_generation = cr->source_set->current_generation;
  1644. set->excluded_generations_size = cr->source_set->excluded_generations_size;
  1645. set->excluded_generations
  1646. = GNUNET_memdup (cr->source_set->excluded_generations,
  1647. set->excluded_generations_size * sizeof (struct GenerationRange));
  1648. /* Advance the generation of the new set, so that mutations to the
  1649. of the cloned set and the source set are independent. */
  1650. advance_generation (set);
  1651. set->cs = cs;
  1652. cs->set = set;
  1653. GNUNET_free (cr);
  1654. GNUNET_SERVICE_client_continue (cs->client);
  1655. }
  1656. /**
  1657. * Handle a request from the client to cancel a running set operation.
  1658. *
  1659. * @param cls the client
  1660. * @param msg the message
  1661. */
  1662. static void
  1663. handle_client_cancel (void *cls,
  1664. const struct GNUNET_SET_CancelMessage *msg)
  1665. {
  1666. struct ClientState *cs = cls;
  1667. struct Set *set;
  1668. struct Operation *op;
  1669. int found;
  1670. if (NULL == (set = cs->set))
  1671. {
  1672. /* client without a set requested an operation */
  1673. GNUNET_break (0);
  1674. GNUNET_SERVICE_client_drop (cs->client);
  1675. return;
  1676. }
  1677. found = GNUNET_NO;
  1678. for (op = set->ops_head; NULL != op; op = op->next)
  1679. {
  1680. if (op->client_request_id == ntohl (msg->request_id))
  1681. {
  1682. found = GNUNET_YES;
  1683. break;
  1684. }
  1685. }
  1686. if (GNUNET_NO == found)
  1687. {
  1688. /* It may happen that the operation was already destroyed due to
  1689. * the other peer disconnecting. The client may not know about this
  1690. * yet and try to cancel the (just barely non-existent) operation.
  1691. * So this is not a hard error.
  1692. */
  1693. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  1694. "Client canceled non-existent op %u\n",
  1695. (uint32_t) ntohl (msg->request_id));
  1696. }
  1697. else
  1698. {
  1699. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1700. "Client requested cancel for op %u\n",
  1701. (uint32_t) ntohl (msg->request_id));
  1702. _GSS_operation_destroy (op,
  1703. GNUNET_YES);
  1704. }
  1705. GNUNET_SERVICE_client_continue (cs->client);
  1706. }
  1707. /**
  1708. * Handle a request from the client to accept a set operation that
  1709. * came from a remote peer. We forward the accept to the associated
  1710. * operation for handling
  1711. *
  1712. * @param cls the client
  1713. * @param msg the message
  1714. */
  1715. static void
  1716. handle_client_accept (void *cls,
  1717. const struct GNUNET_SET_AcceptMessage *msg)
  1718. {
  1719. struct ClientState *cs = cls;
  1720. struct Set *set;
  1721. struct Operation *op;
  1722. struct GNUNET_SET_ResultMessage *result_message;
  1723. struct GNUNET_MQ_Envelope *ev;
  1724. struct Listener *listener;
  1725. if (NULL == (set = cs->set))
  1726. {
  1727. /* client without a set requested to accept */
  1728. GNUNET_break (0);
  1729. GNUNET_SERVICE_client_drop (cs->client);
  1730. return;
  1731. }
  1732. op = get_incoming (ntohl (msg->accept_reject_id));
  1733. if (NULL == op)
  1734. {
  1735. /* It is not an error if the set op does not exist -- it may
  1736. * have been destroyed when the partner peer disconnected. */
  1737. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  1738. "Client %p accepted request %u of listener %p that is no longer active\n",
  1739. cs,
  1740. ntohl (msg->accept_reject_id),
  1741. cs->listener);
  1742. ev = GNUNET_MQ_msg (result_message,
  1743. GNUNET_MESSAGE_TYPE_SET_RESULT);
  1744. result_message->request_id = msg->request_id;
  1745. result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
  1746. GNUNET_MQ_send (set->cs->mq,
  1747. ev);
  1748. GNUNET_SERVICE_client_continue (cs->client);
  1749. return;
  1750. }
  1751. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1752. "Client accepting request %u\n",
  1753. (uint32_t) ntohl (msg->accept_reject_id));
  1754. listener = op->listener;
  1755. op->listener = NULL;
  1756. GNUNET_CONTAINER_DLL_remove (listener->op_head,
  1757. listener->op_tail,
  1758. op);
  1759. op->set = set;
  1760. GNUNET_CONTAINER_DLL_insert (set->ops_head,
  1761. set->ops_tail,
  1762. op);
  1763. op->client_request_id = ntohl (msg->request_id);
  1764. op->result_mode = ntohl (msg->result_mode);
  1765. op->byzantine = msg->byzantine;
  1766. op->byzantine_lower_bound = msg->byzantine_lower_bound;
  1767. op->force_full = msg->force_full;
  1768. op->force_delta = msg->force_delta;
  1769. /* Advance generation values, so that future mutations do not
  1770. interfer with the running operation. */
  1771. op->generation_created = set->current_generation;
  1772. advance_generation (set);
  1773. GNUNET_assert (NULL == op->state);
  1774. op->state = set->vt->accept (op);
  1775. if (NULL == op->state)
  1776. {
  1777. GNUNET_break (0);
  1778. GNUNET_SERVICE_client_drop (cs->client);
  1779. return;
  1780. }
  1781. /* Now allow CADET to continue, as we did not do this in
  1782. #handle_incoming_msg (as we wanted to first see if the
  1783. local client would accept the request). */
  1784. GNUNET_CADET_receive_done (op->channel);
  1785. GNUNET_SERVICE_client_continue (cs->client);
  1786. }
  1787. /**
  1788. * Called to clean up, after a shutdown has been requested.
  1789. *
  1790. * @param cls closure, NULL
  1791. */
  1792. static void
  1793. shutdown_task (void *cls)
  1794. {
  1795. /* Delay actual shutdown to allow service to disconnect clients */
  1796. in_shutdown = GNUNET_YES;
  1797. if (0 == num_clients)
  1798. {
  1799. if (NULL != cadet)
  1800. {
  1801. GNUNET_CADET_disconnect (cadet);
  1802. cadet = NULL;
  1803. }
  1804. }
  1805. GNUNET_STATISTICS_destroy (_GSS_statistics,
  1806. GNUNET_YES);
  1807. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1808. "handled shutdown request\n");
  1809. }
  1810. /**
  1811. * Function called by the service's run
  1812. * method to run service-specific setup code.
  1813. *
  1814. * @param cls closure
  1815. * @param cfg configuration to use
  1816. * @param service the initialized service
  1817. */
  1818. static void
  1819. run (void *cls,
  1820. const struct GNUNET_CONFIGURATION_Handle *cfg,
  1821. struct GNUNET_SERVICE_Handle *service)
  1822. {
  1823. /* FIXME: need to modify SERVICE (!) API to allow
  1824. us to run a shutdown task *after* clients were
  1825. forcefully disconnected! */
  1826. GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
  1827. NULL);
  1828. _GSS_statistics = GNUNET_STATISTICS_create ("set",
  1829. cfg);
  1830. cadet = GNUNET_CADET_connect (cfg);
  1831. if (NULL == cadet)
  1832. {
  1833. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  1834. _("Could not connect to CADET service\n"));
  1835. GNUNET_SCHEDULER_shutdown ();
  1836. return;
  1837. }
  1838. }
  1839. /**
  1840. * Define "main" method using service macro.
  1841. */
  1842. GNUNET_SERVICE_MAIN
  1843. ("set",
  1844. GNUNET_SERVICE_OPTION_NONE,
  1845. &run,
  1846. &client_connect_cb,
  1847. &client_disconnect_cb,
  1848. NULL,
  1849. GNUNET_MQ_hd_fixed_size (client_accept,
  1850. GNUNET_MESSAGE_TYPE_SET_ACCEPT,
  1851. struct GNUNET_SET_AcceptMessage,
  1852. NULL),
  1853. GNUNET_MQ_hd_fixed_size (client_iter_ack,
  1854. GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
  1855. struct GNUNET_SET_IterAckMessage,
  1856. NULL),
  1857. GNUNET_MQ_hd_var_size (client_mutation,
  1858. GNUNET_MESSAGE_TYPE_SET_ADD,
  1859. struct GNUNET_SET_ElementMessage,
  1860. NULL),
  1861. GNUNET_MQ_hd_fixed_size (client_create_set,
  1862. GNUNET_MESSAGE_TYPE_SET_CREATE,
  1863. struct GNUNET_SET_CreateMessage,
  1864. NULL),
  1865. GNUNET_MQ_hd_fixed_size (client_iterate,
  1866. GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
  1867. struct GNUNET_MessageHeader,
  1868. NULL),
  1869. GNUNET_MQ_hd_var_size (client_evaluate,
  1870. GNUNET_MESSAGE_TYPE_SET_EVALUATE,
  1871. struct GNUNET_SET_EvaluateMessage,
  1872. NULL),
  1873. GNUNET_MQ_hd_fixed_size (client_listen,
  1874. GNUNET_MESSAGE_TYPE_SET_LISTEN,
  1875. struct GNUNET_SET_ListenMessage,
  1876. NULL),
  1877. GNUNET_MQ_hd_fixed_size (client_reject,
  1878. GNUNET_MESSAGE_TYPE_SET_REJECT,
  1879. struct GNUNET_SET_RejectMessage,
  1880. NULL),
  1881. GNUNET_MQ_hd_var_size (client_mutation,
  1882. GNUNET_MESSAGE_TYPE_SET_REMOVE,
  1883. struct GNUNET_SET_ElementMessage,
  1884. NULL),
  1885. GNUNET_MQ_hd_fixed_size (client_cancel,
  1886. GNUNET_MESSAGE_TYPE_SET_CANCEL,
  1887. struct GNUNET_SET_CancelMessage,
  1888. NULL),
  1889. GNUNET_MQ_hd_fixed_size (client_copy_lazy_prepare,
  1890. GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE,
  1891. struct GNUNET_MessageHeader,
  1892. NULL),
  1893. GNUNET_MQ_hd_fixed_size (client_copy_lazy_connect,
  1894. GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT,
  1895. struct GNUNET_SET_CopyLazyConnectMessage,
  1896. NULL),
  1897. GNUNET_MQ_handler_end ());
  1898. /* end of gnunet-service-set.c */