setu_api.c 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897
  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2012-2016, 2020 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file set/setu_api.c
  18. * @brief api for the set union service
  19. * @author Florian Dold
  20. * @author Christian Grothoff
  21. */
  22. #include "platform.h"
  23. #include "gnunet_util_lib.h"
  24. #include "gnunet_protocols.h"
  25. #include "gnunet_setu_service.h"
  26. #include "setu.h"
  27. #define LOG(kind, ...) GNUNET_log_from (kind, "set-api", __VA_ARGS__)
  28. /**
  29. * Opaque handle to a set.
  30. */
  31. struct GNUNET_SETU_Handle
  32. {
  33. /**
  34. * Message queue for @e client.
  35. */
  36. struct GNUNET_MQ_Handle *mq;
  37. /**
  38. * Linked list of operations on the set.
  39. */
  40. struct GNUNET_SETU_OperationHandle *ops_head;
  41. /**
  42. * Linked list of operations on the set.
  43. */
  44. struct GNUNET_SETU_OperationHandle *ops_tail;
  45. /**
  46. * Should the set be destroyed once all operations are gone?
  47. * #GNUNET_SYSERR if #GNUNET_SETU_destroy() must raise this flag,
  48. * #GNUNET_YES if #GNUNET_SETU_destroy() did raise this flag.
  49. */
  50. int destroy_requested;
  51. /**
  52. * Has the set become invalid (e.g. service died)?
  53. */
  54. int invalid;
  55. };
  56. /**
  57. * Handle for a set operation request from another peer.
  58. */
  59. struct GNUNET_SETU_Request
  60. {
  61. /**
  62. * Id of the request, used to identify the request when
  63. * accepting/rejecting it.
  64. */
  65. uint32_t accept_id;
  66. /**
  67. * Has the request been accepted already?
  68. * #GNUNET_YES/#GNUNET_NO
  69. */
  70. int accepted;
  71. };
  72. /**
  73. * Handle to an operation. Only known to the service after committing
  74. * the handle with a set.
  75. */
  76. struct GNUNET_SETU_OperationHandle
  77. {
  78. /**
  79. * Function to be called when we have a result,
  80. * or an error.
  81. */
  82. GNUNET_SETU_ResultIterator result_cb;
  83. /**
  84. * Closure for @e result_cb.
  85. */
  86. void *result_cls;
  87. /**
  88. * Local set used for the operation,
  89. * NULL if no set has been provided by conclude yet.
  90. */
  91. struct GNUNET_SETU_Handle *set;
  92. /**
  93. * Message sent to the server on calling conclude,
  94. * NULL if conclude has been called.
  95. */
  96. struct GNUNET_MQ_Envelope *conclude_mqm;
  97. /**
  98. * Address of the request if in the conclude message,
  99. * used to patch the request id into the message when the set is known.
  100. */
  101. uint32_t *request_id_addr;
  102. /**
  103. * Handles are kept in a linked list.
  104. */
  105. struct GNUNET_SETU_OperationHandle *prev;
  106. /**
  107. * Handles are kept in a linked list.
  108. */
  109. struct GNUNET_SETU_OperationHandle *next;
  110. /**
  111. * Request ID to identify the operation within the set.
  112. */
  113. uint32_t request_id;
  114. };
  115. /**
  116. * Opaque handle to a listen operation.
  117. */
  118. struct GNUNET_SETU_ListenHandle
  119. {
  120. /**
  121. * Message queue for the client.
  122. */
  123. struct GNUNET_MQ_Handle*mq;
  124. /**
  125. * Configuration handle for the listener, stored
  126. * here to be able to reconnect transparently on
  127. * connection failure.
  128. */
  129. const struct GNUNET_CONFIGURATION_Handle *cfg;
  130. /**
  131. * Function to call on a new incoming request,
  132. * or on error.
  133. */
  134. GNUNET_SETU_ListenCallback listen_cb;
  135. /**
  136. * Closure for @e listen_cb.
  137. */
  138. void *listen_cls;
  139. /**
  140. * Application ID we listen for.
  141. */
  142. struct GNUNET_HashCode app_id;
  143. /**
  144. * Time to wait until we try to reconnect on failure.
  145. */
  146. struct GNUNET_TIME_Relative reconnect_backoff;
  147. /**
  148. * Task for reconnecting when the listener fails.
  149. */
  150. struct GNUNET_SCHEDULER_Task *reconnect_task;
  151. };
  152. /**
  153. * Check that the given @a msg is well-formed.
  154. *
  155. * @param cls closure
  156. * @param msg message to check
  157. * @return #GNUNET_OK if message is well-formed
  158. */
  159. static int
  160. check_result (void *cls,
  161. const struct GNUNET_SETU_ResultMessage *msg)
  162. {
  163. /* minimum size was already checked, everything else is OK! */
  164. return GNUNET_OK;
  165. }
  166. /**
  167. * Handle result message for a set operation.
  168. *
  169. * @param cls the set
  170. * @param mh the message
  171. */
  172. static void
  173. handle_result (void *cls,
  174. const struct GNUNET_SETU_ResultMessage *msg)
  175. {
  176. struct GNUNET_SETU_Handle *set = cls;
  177. struct GNUNET_SETU_OperationHandle *oh;
  178. struct GNUNET_SETU_Element e;
  179. enum GNUNET_SETU_Status result_status;
  180. int destroy_set;
  181. GNUNET_assert (NULL != set->mq);
  182. result_status = (enum GNUNET_SETU_Status) ntohs (msg->result_status);
  183. LOG (GNUNET_ERROR_TYPE_DEBUG,
  184. "Got result message with status %d\n",
  185. result_status);
  186. oh = GNUNET_MQ_assoc_get (set->mq,
  187. ntohl (msg->request_id));
  188. if (NULL == oh)
  189. {
  190. /* 'oh' can be NULL if we canceled the operation, but the service
  191. did not get the cancel message yet. */
  192. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  193. "Ignoring result from canceled operation\n");
  194. return;
  195. }
  196. switch (result_status)
  197. {
  198. case GNUNET_SETU_STATUS_ADD_LOCAL:
  199. case GNUNET_SETU_STATUS_ADD_REMOTE:
  200. e.data = &msg[1];
  201. e.size = ntohs (msg->header.size)
  202. - sizeof(struct GNUNET_SETU_ResultMessage);
  203. e.element_type = ntohs (msg->element_type);
  204. if (NULL != oh->result_cb)
  205. oh->result_cb (oh->result_cls,
  206. &e,
  207. GNUNET_ntohll (msg->current_size),
  208. result_status);
  209. return;
  210. case GNUNET_SETU_STATUS_FAILURE:
  211. case GNUNET_SETU_STATUS_DONE:
  212. LOG (GNUNET_ERROR_TYPE_DEBUG,
  213. "Treating result as final status\n");
  214. GNUNET_MQ_assoc_remove (set->mq,
  215. ntohl (msg->request_id));
  216. GNUNET_CONTAINER_DLL_remove (set->ops_head,
  217. set->ops_tail,
  218. oh);
  219. /* Need to do this calculation _before_ the result callback,
  220. as IF the application still has a valid set handle, it
  221. may trigger destruction of the set during the callback. */
  222. destroy_set = (GNUNET_YES == set->destroy_requested) &&
  223. (NULL == set->ops_head);
  224. if (NULL != oh->result_cb)
  225. {
  226. oh->result_cb (oh->result_cls,
  227. NULL,
  228. GNUNET_ntohll (msg->current_size),
  229. result_status);
  230. }
  231. else
  232. {
  233. LOG (GNUNET_ERROR_TYPE_DEBUG,
  234. "No callback for final status\n");
  235. }
  236. if (destroy_set)
  237. GNUNET_SETU_destroy (set);
  238. GNUNET_free (oh);
  239. return;
  240. }
  241. }
  242. /**
  243. * Destroy the given set operation.
  244. *
  245. * @param oh set operation to destroy
  246. */
  247. static void
  248. set_operation_destroy (struct GNUNET_SETU_OperationHandle *oh)
  249. {
  250. struct GNUNET_SETU_Handle *set = oh->set;
  251. struct GNUNET_SETU_OperationHandle *h_assoc;
  252. if (NULL != oh->conclude_mqm)
  253. GNUNET_MQ_discard (oh->conclude_mqm);
  254. /* is the operation already commited? */
  255. if (NULL != set)
  256. {
  257. GNUNET_CONTAINER_DLL_remove (set->ops_head,
  258. set->ops_tail,
  259. oh);
  260. h_assoc = GNUNET_MQ_assoc_remove (set->mq,
  261. oh->request_id);
  262. GNUNET_assert ((NULL == h_assoc) ||
  263. (h_assoc == oh));
  264. }
  265. GNUNET_free (oh);
  266. }
  267. /**
  268. * Cancel the given set operation. We need to send an explicit cancel
  269. * message, as all operations one one set communicate using one
  270. * handle.
  271. *
  272. * @param oh set operation to cancel
  273. */
  274. void
  275. GNUNET_SETU_operation_cancel (struct GNUNET_SETU_OperationHandle *oh)
  276. {
  277. struct GNUNET_SETU_Handle *set = oh->set;
  278. struct GNUNET_SETU_CancelMessage *m;
  279. struct GNUNET_MQ_Envelope *mqm;
  280. LOG (GNUNET_ERROR_TYPE_DEBUG,
  281. "Cancelling SET operation\n");
  282. if (NULL != set)
  283. {
  284. mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SETU_CANCEL);
  285. m->request_id = htonl (oh->request_id);
  286. GNUNET_MQ_send (set->mq, mqm);
  287. }
  288. set_operation_destroy (oh);
  289. if ((NULL != set) &&
  290. (GNUNET_YES == set->destroy_requested) &&
  291. (NULL == set->ops_head))
  292. {
  293. LOG (GNUNET_ERROR_TYPE_DEBUG,
  294. "Destroying set after operation cancel\n");
  295. GNUNET_SETU_destroy (set);
  296. }
  297. }
  298. /**
  299. * We encountered an error communicating with the set service while
  300. * performing a set operation. Report to the application.
  301. *
  302. * @param cls the `struct GNUNET_SETU_Handle`
  303. * @param error error code
  304. */
  305. static void
  306. handle_client_set_error (void *cls,
  307. enum GNUNET_MQ_Error error)
  308. {
  309. struct GNUNET_SETU_Handle *set = cls;
  310. LOG (GNUNET_ERROR_TYPE_ERROR,
  311. "Handling client set error %d\n",
  312. error);
  313. while (NULL != set->ops_head)
  314. {
  315. if ((NULL != set->ops_head->result_cb) &&
  316. (GNUNET_NO == set->destroy_requested))
  317. set->ops_head->result_cb (set->ops_head->result_cls,
  318. NULL,
  319. 0,
  320. GNUNET_SETU_STATUS_FAILURE);
  321. set_operation_destroy (set->ops_head);
  322. }
  323. set->invalid = GNUNET_YES;
  324. }
  325. /**
  326. * Create an empty set, supporting the specified operation.
  327. *
  328. * @param cfg configuration to use for connecting to the
  329. * set service
  330. * @return a handle to the set
  331. */
  332. struct GNUNET_SETU_Handle *
  333. GNUNET_SETU_create (const struct GNUNET_CONFIGURATION_Handle *cfg)
  334. {
  335. struct GNUNET_SETU_Handle *set = GNUNET_new (struct GNUNET_SETU_Handle);
  336. struct GNUNET_MQ_MessageHandler mq_handlers[] = {
  337. GNUNET_MQ_hd_var_size (result,
  338. GNUNET_MESSAGE_TYPE_SETU_RESULT,
  339. struct GNUNET_SETU_ResultMessage,
  340. set),
  341. GNUNET_MQ_handler_end ()
  342. };
  343. struct GNUNET_MQ_Envelope *mqm;
  344. struct GNUNET_SETU_CreateMessage *create_msg;
  345. set->mq = GNUNET_CLIENT_connect (cfg,
  346. "setu",
  347. mq_handlers,
  348. &handle_client_set_error,
  349. set);
  350. if (NULL == set->mq)
  351. {
  352. GNUNET_free (set);
  353. return NULL;
  354. }
  355. mqm = GNUNET_MQ_msg (create_msg,
  356. GNUNET_MESSAGE_TYPE_SETU_CREATE);
  357. GNUNET_MQ_send (set->mq,
  358. mqm);
  359. return set;
  360. }
  361. /**
  362. * Add an element to the given set. After the element has been added
  363. * (in the sense of being transmitted to the set service), @a cont
  364. * will be called. Multiple calls to GNUNET_SETU_add_element() can be
  365. * queued.
  366. *
  367. * @param set set to add element to
  368. * @param element element to add to the set
  369. * @param cb continuation called after the element has been added
  370. * @param cb_cls closure for @a cb
  371. * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
  372. * set is invalid (e.g. the set service crashed)
  373. */
  374. int
  375. GNUNET_SETU_add_element (struct GNUNET_SETU_Handle *set,
  376. const struct GNUNET_SETU_Element *element,
  377. GNUNET_SCHEDULER_TaskCallback cb,
  378. void *cb_cls)
  379. {
  380. struct GNUNET_MQ_Envelope *mqm;
  381. struct GNUNET_SETU_ElementMessage *msg;
  382. LOG (GNUNET_ERROR_TYPE_DEBUG,
  383. "adding element of type %u to set %p\n",
  384. (unsigned int) element->element_type,
  385. set);
  386. GNUNET_assert (NULL != set);
  387. if (GNUNET_YES == set->invalid)
  388. {
  389. if (NULL != cb)
  390. cb (cb_cls);
  391. return GNUNET_SYSERR;
  392. }
  393. mqm = GNUNET_MQ_msg_extra (msg,
  394. element->size,
  395. GNUNET_MESSAGE_TYPE_SETU_ADD);
  396. msg->element_type = htons (element->element_type);
  397. GNUNET_memcpy (&msg[1],
  398. element->data,
  399. element->size);
  400. GNUNET_MQ_notify_sent (mqm,
  401. cb,
  402. cb_cls);
  403. GNUNET_MQ_send (set->mq,
  404. mqm);
  405. return GNUNET_OK;
  406. }
  407. /**
  408. * Destroy the set handle if no operations are left, mark the set
  409. * for destruction otherwise.
  410. *
  411. * @param set set handle to destroy
  412. */
  413. void
  414. GNUNET_SETU_destroy (struct GNUNET_SETU_Handle *set)
  415. {
  416. /* destroying set while iterator is active is currently
  417. not supported; we should expand the API to allow
  418. clients to explicitly cancel the iteration! */
  419. GNUNET_assert (NULL != set);
  420. if ((NULL != set->ops_head) ||
  421. (GNUNET_SYSERR == set->destroy_requested))
  422. {
  423. LOG (GNUNET_ERROR_TYPE_DEBUG,
  424. "Set operations are pending, delaying set destruction\n");
  425. set->destroy_requested = GNUNET_YES;
  426. return;
  427. }
  428. LOG (GNUNET_ERROR_TYPE_DEBUG,
  429. "Really destroying set\n");
  430. if (NULL != set->mq)
  431. {
  432. GNUNET_MQ_destroy (set->mq);
  433. set->mq = NULL;
  434. }
  435. GNUNET_free (set);
  436. }
  437. /**
  438. * Prepare a set operation to be evaluated with another peer.
  439. * The evaluation will not start until the client provides
  440. * a local set with #GNUNET_SETU_commit().
  441. *
  442. * @param other_peer peer with the other set
  443. * @param app_id hash for the application using the set
  444. * @param context_msg additional information for the request
  445. * @param result_cb called on error or success
  446. * @param result_cls closure for @e result_cb
  447. * @return a handle to cancel the operation
  448. */
  449. struct GNUNET_SETU_OperationHandle *
  450. GNUNET_SETU_prepare (const struct GNUNET_PeerIdentity *other_peer,
  451. const struct GNUNET_HashCode *app_id,
  452. const struct GNUNET_MessageHeader *context_msg,
  453. const struct GNUNET_SETU_Option options[],
  454. GNUNET_SETU_ResultIterator result_cb,
  455. void *result_cls)
  456. {
  457. struct GNUNET_MQ_Envelope *mqm;
  458. struct GNUNET_SETU_OperationHandle *oh;
  459. struct GNUNET_SETU_EvaluateMessage *msg;
  460. LOG (GNUNET_ERROR_TYPE_DEBUG,
  461. "Client prepares set union operation\n");
  462. oh = GNUNET_new (struct GNUNET_SETU_OperationHandle);
  463. oh->result_cb = result_cb;
  464. oh->result_cls = result_cls;
  465. mqm = GNUNET_MQ_msg_nested_mh (msg,
  466. GNUNET_MESSAGE_TYPE_SETU_EVALUATE,
  467. context_msg);
  468. msg->app_id = *app_id;
  469. msg->target_peer = *other_peer;
  470. for (const struct GNUNET_SETU_Option *opt = options; opt->type != 0; opt++)
  471. {
  472. switch (opt->type)
  473. {
  474. case GNUNET_SETU_OPTION_BYZANTINE:
  475. msg->byzantine = GNUNET_YES;
  476. msg->byzantine_lower_bound = htonl (opt->v.num);
  477. break;
  478. case GNUNET_SETU_OPTION_FORCE_FULL:
  479. msg->force_full = GNUNET_YES;
  480. break;
  481. case GNUNET_SETU_OPTION_FORCE_DELTA:
  482. msg->force_delta = GNUNET_YES;
  483. break;
  484. case GNUNET_SETU_OPTION_SYMMETRIC:
  485. msg->symmetric = GNUNET_YES;
  486. break;
  487. default:
  488. LOG (GNUNET_ERROR_TYPE_ERROR,
  489. "Option with type %d not recognized\n",
  490. (int) opt->type);
  491. }
  492. }
  493. oh->conclude_mqm = mqm;
  494. oh->request_id_addr = &msg->request_id;
  495. return oh;
  496. }
  497. /**
  498. * Connect to the set service in order to listen for requests.
  499. *
  500. * @param cls the `struct GNUNET_SETU_ListenHandle *` to connect
  501. */
  502. static void
  503. listen_connect (void *cls);
  504. /**
  505. * Check validity of request message for a listen operation
  506. *
  507. * @param cls the listen handle
  508. * @param msg the message
  509. * @return #GNUNET_OK if the message is well-formed
  510. */
  511. static int
  512. check_request (void *cls,
  513. const struct GNUNET_SETU_RequestMessage *msg)
  514. {
  515. const struct GNUNET_MessageHeader *context_msg;
  516. if (ntohs (msg->header.size) == sizeof(*msg))
  517. return GNUNET_OK; /* no context message is OK */
  518. context_msg = GNUNET_MQ_extract_nested_mh (msg);
  519. if (NULL == context_msg)
  520. {
  521. /* malformed context message is NOT ok */
  522. GNUNET_break_op (0);
  523. return GNUNET_SYSERR;
  524. }
  525. return GNUNET_OK;
  526. }
  527. /**
  528. * Handle request message for a listen operation
  529. *
  530. * @param cls the listen handle
  531. * @param msg the message
  532. */
  533. static void
  534. handle_request (void *cls,
  535. const struct GNUNET_SETU_RequestMessage *msg)
  536. {
  537. struct GNUNET_SETU_ListenHandle *lh = cls;
  538. struct GNUNET_SETU_Request req;
  539. const struct GNUNET_MessageHeader *context_msg;
  540. struct GNUNET_MQ_Envelope *mqm;
  541. struct GNUNET_SETU_RejectMessage *rmsg;
  542. LOG (GNUNET_ERROR_TYPE_DEBUG,
  543. "Processing incoming operation request with id %u\n",
  544. ntohl (msg->accept_id));
  545. /* we got another valid request => reset the backoff */
  546. lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
  547. req.accept_id = ntohl (msg->accept_id);
  548. req.accepted = GNUNET_NO;
  549. context_msg = GNUNET_MQ_extract_nested_mh (msg);
  550. /* calling #GNUNET_SETU_accept() in the listen cb will set req->accepted */
  551. lh->listen_cb (lh->listen_cls,
  552. &msg->peer_id,
  553. context_msg,
  554. &req);
  555. if (GNUNET_YES == req.accepted)
  556. return; /* the accept-case is handled in #GNUNET_SETU_accept() */
  557. LOG (GNUNET_ERROR_TYPE_DEBUG,
  558. "Rejected request %u\n",
  559. ntohl (msg->accept_id));
  560. mqm = GNUNET_MQ_msg (rmsg,
  561. GNUNET_MESSAGE_TYPE_SETU_REJECT);
  562. rmsg->accept_reject_id = msg->accept_id;
  563. GNUNET_MQ_send (lh->mq,
  564. mqm);
  565. }
  566. /**
  567. * Our connection with the set service encountered an error,
  568. * re-initialize with exponential back-off.
  569. *
  570. * @param cls the `struct GNUNET_SETU_ListenHandle *`
  571. * @param error reason for the disconnect
  572. */
  573. static void
  574. handle_client_listener_error (void *cls,
  575. enum GNUNET_MQ_Error error)
  576. {
  577. struct GNUNET_SETU_ListenHandle *lh = cls;
  578. LOG (GNUNET_ERROR_TYPE_DEBUG,
  579. "Listener broke down (%d), re-connecting\n",
  580. (int) error);
  581. GNUNET_MQ_destroy (lh->mq);
  582. lh->mq = NULL;
  583. lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
  584. &listen_connect,
  585. lh);
  586. lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
  587. }
  588. /**
  589. * Connect to the set service in order to listen for requests.
  590. *
  591. * @param cls the `struct GNUNET_SETU_ListenHandle *` to connect
  592. */
  593. static void
  594. listen_connect (void *cls)
  595. {
  596. struct GNUNET_SETU_ListenHandle *lh = cls;
  597. struct GNUNET_MQ_MessageHandler mq_handlers[] = {
  598. GNUNET_MQ_hd_var_size (request,
  599. GNUNET_MESSAGE_TYPE_SETU_REQUEST,
  600. struct GNUNET_SETU_RequestMessage,
  601. lh),
  602. GNUNET_MQ_handler_end ()
  603. };
  604. struct GNUNET_MQ_Envelope *mqm;
  605. struct GNUNET_SETU_ListenMessage *msg;
  606. lh->reconnect_task = NULL;
  607. GNUNET_assert (NULL == lh->mq);
  608. lh->mq = GNUNET_CLIENT_connect (lh->cfg,
  609. "setu",
  610. mq_handlers,
  611. &handle_client_listener_error,
  612. lh);
  613. if (NULL == lh->mq)
  614. return;
  615. mqm = GNUNET_MQ_msg (msg,
  616. GNUNET_MESSAGE_TYPE_SETU_LISTEN);
  617. msg->app_id = lh->app_id;
  618. GNUNET_MQ_send (lh->mq,
  619. mqm);
  620. }
  621. /**
  622. * Wait for set operation requests for the given application id
  623. *
  624. * @param cfg configuration to use for connecting to
  625. * the set service, needs to be valid for the lifetime of the listen handle
  626. * @param app_id id of the application that handles set operation requests
  627. * @param listen_cb called for each incoming request matching the operation
  628. * and application id
  629. * @param listen_cls handle for @a listen_cb
  630. * @return a handle that can be used to cancel the listen operation
  631. */
  632. struct GNUNET_SETU_ListenHandle *
  633. GNUNET_SETU_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
  634. const struct GNUNET_HashCode *app_id,
  635. GNUNET_SETU_ListenCallback listen_cb,
  636. void *listen_cls)
  637. {
  638. struct GNUNET_SETU_ListenHandle *lh;
  639. LOG (GNUNET_ERROR_TYPE_DEBUG,
  640. "Starting listener for app %s\n",
  641. GNUNET_h2s (app_id));
  642. lh = GNUNET_new (struct GNUNET_SETU_ListenHandle);
  643. lh->listen_cb = listen_cb;
  644. lh->listen_cls = listen_cls;
  645. lh->cfg = cfg;
  646. lh->app_id = *app_id;
  647. lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
  648. listen_connect (lh);
  649. if (NULL == lh->mq)
  650. {
  651. GNUNET_free (lh);
  652. return NULL;
  653. }
  654. return lh;
  655. }
  656. /**
  657. * Cancel the given listen operation.
  658. *
  659. * @param lh handle for the listen operation
  660. */
  661. void
  662. GNUNET_SETU_listen_cancel (struct GNUNET_SETU_ListenHandle *lh)
  663. {
  664. LOG (GNUNET_ERROR_TYPE_DEBUG,
  665. "Canceling listener %s\n",
  666. GNUNET_h2s (&lh->app_id));
  667. if (NULL != lh->mq)
  668. {
  669. GNUNET_MQ_destroy (lh->mq);
  670. lh->mq = NULL;
  671. }
  672. if (NULL != lh->reconnect_task)
  673. {
  674. GNUNET_SCHEDULER_cancel (lh->reconnect_task);
  675. lh->reconnect_task = NULL;
  676. }
  677. GNUNET_free (lh);
  678. }
  679. /**
  680. * Accept a request we got via #GNUNET_SETU_listen. Must be called during
  681. * #GNUNET_SETU_listen, as the 'struct GNUNET_SETU_Request' becomes invalid
  682. * afterwards.
  683. * Call #GNUNET_SETU_commit to provide the local set to use for the operation,
  684. * and to begin the exchange with the remote peer.
  685. *
  686. * @param request request to accept
  687. * @param result_mode specified how results will be returned,
  688. * see `enum GNUNET_SETU_ResultMode`.
  689. * @param result_cb callback for the results
  690. * @param result_cls closure for @a result_cb
  691. * @return a handle to cancel the operation
  692. */
  693. struct GNUNET_SETU_OperationHandle *
  694. GNUNET_SETU_accept (struct GNUNET_SETU_Request *request,
  695. const struct GNUNET_SETU_Option options[],
  696. GNUNET_SETU_ResultIterator result_cb,
  697. void *result_cls)
  698. {
  699. struct GNUNET_MQ_Envelope *mqm;
  700. struct GNUNET_SETU_OperationHandle *oh;
  701. struct GNUNET_SETU_AcceptMessage *msg;
  702. GNUNET_assert (GNUNET_NO == request->accepted);
  703. LOG (GNUNET_ERROR_TYPE_DEBUG,
  704. "Client accepts set union operation with id %u\n",
  705. request->accept_id);
  706. request->accepted = GNUNET_YES;
  707. mqm = GNUNET_MQ_msg (msg,
  708. GNUNET_MESSAGE_TYPE_SETU_ACCEPT);
  709. msg->accept_reject_id = htonl (request->accept_id);
  710. for (const struct GNUNET_SETU_Option *opt = options; opt->type != 0; opt++)
  711. {
  712. switch (opt->type)
  713. {
  714. case GNUNET_SETU_OPTION_BYZANTINE:
  715. msg->byzantine = GNUNET_YES;
  716. msg->byzantine_lower_bound = htonl (opt->v.num);
  717. break;
  718. case GNUNET_SETU_OPTION_FORCE_FULL:
  719. msg->force_full = GNUNET_YES;
  720. break;
  721. case GNUNET_SETU_OPTION_FORCE_DELTA:
  722. msg->force_delta = GNUNET_YES;
  723. break;
  724. case GNUNET_SETU_OPTION_SYMMETRIC:
  725. msg->symmetric = GNUNET_YES;
  726. break;
  727. default:
  728. LOG (GNUNET_ERROR_TYPE_ERROR,
  729. "Option with type %d not recognized\n",
  730. (int) opt->type);
  731. }
  732. }
  733. oh = GNUNET_new (struct GNUNET_SETU_OperationHandle);
  734. oh->result_cb = result_cb;
  735. oh->result_cls = result_cls;
  736. oh->conclude_mqm = mqm;
  737. oh->request_id_addr = &msg->request_id;
  738. return oh;
  739. }
  740. /**
  741. * Commit a set to be used with a set operation.
  742. * This function is called once we have fully constructed
  743. * the set that we want to use for the operation. At this
  744. * time, the P2P protocol can then begin to exchange the
  745. * set information and call the result callback with the
  746. * result information.
  747. *
  748. * @param oh handle to the set operation
  749. * @param set the set to use for the operation
  750. * @return #GNUNET_OK on success, #GNUNET_SYSERR if the
  751. * set is invalid (e.g. the set service crashed)
  752. */
  753. int
  754. GNUNET_SETU_commit (struct GNUNET_SETU_OperationHandle *oh,
  755. struct GNUNET_SETU_Handle *set)
  756. {
  757. if (NULL != oh->set)
  758. {
  759. /* Some other set was already committed for this
  760. * operation, there is a logic bug in the client of this API */
  761. GNUNET_break (0);
  762. return GNUNET_OK;
  763. }
  764. GNUNET_assert (NULL != set);
  765. if (GNUNET_YES == set->invalid)
  766. return GNUNET_SYSERR;
  767. LOG (GNUNET_ERROR_TYPE_DEBUG,
  768. "Client commits to SET\n");
  769. GNUNET_assert (NULL != oh->conclude_mqm);
  770. oh->set = set;
  771. GNUNET_CONTAINER_DLL_insert (set->ops_head,
  772. set->ops_tail,
  773. oh);
  774. oh->request_id = GNUNET_MQ_assoc_add (set->mq,
  775. oh);
  776. *oh->request_id_addr = htonl (oh->request_id);
  777. GNUNET_MQ_send (set->mq,
  778. oh->conclude_mqm);
  779. oh->conclude_mqm = NULL;
  780. oh->request_id_addr = NULL;
  781. return GNUNET_OK;
  782. }
  783. /**
  784. * Hash a set element.
  785. *
  786. * @param element the element that should be hashed
  787. * @param[out] ret_hash a pointer to where the hash of @a element
  788. * should be stored
  789. */
  790. void
  791. GNUNET_SETU_element_hash (const struct GNUNET_SETU_Element *element,
  792. struct GNUNET_HashCode *ret_hash)
  793. {
  794. struct GNUNET_HashContext *ctx = GNUNET_CRYPTO_hash_context_start ();
  795. /* It's not guaranteed that the element data is always after the element header,
  796. so we need to hash the chunks separately. */
  797. GNUNET_CRYPTO_hash_context_read (ctx,
  798. &element->size,
  799. sizeof(uint16_t));
  800. GNUNET_CRYPTO_hash_context_read (ctx,
  801. &element->element_type,
  802. sizeof(uint16_t));
  803. GNUNET_CRYPTO_hash_context_read (ctx,
  804. element->data,
  805. element->size);
  806. GNUNET_CRYPTO_hash_context_finish (ctx,
  807. ret_hash);
  808. }
  809. /* end of setu_api.c */