scalarproduct_api.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491
  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2013, 2014, 2016 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file scalarproduct/scalarproduct_api.c
  18. * @brief API for the scalarproduct
  19. * @author Christian Fuchs
  20. * @author Gaurav Kukreja
  21. * @author Christian Grothoff
  22. */
  23. #include "platform.h"
  24. #include "gnunet_util_lib.h"
  25. #include "gnunet_statistics_service.h"
  26. #include "gnunet_scalarproduct_service.h"
  27. #include "gnunet_protocols.h"
  28. #include "scalarproduct.h"
  29. #define LOG(kind, ...) GNUNET_log_from (kind, "scalarproduct-api", __VA_ARGS__)
  30. /**
  31. * The abstraction function for our internal callback
  32. *
  33. * @param h computation handle
  34. * @param msg response we got, NULL on errors
  35. * @param status processing status code
  36. */
  37. typedef void
  38. (*GNUNET_SCALARPRODUCT_ResponseMessageHandler) (
  39. struct GNUNET_SCALARPRODUCT_ComputationHandle *h,
  40. const struct ClientResponseMessage *msg,
  41. enum GNUNET_SCALARPRODUCT_ResponseStatus status);
  42. /**
  43. * A handle returned for each computation
  44. */
  45. struct GNUNET_SCALARPRODUCT_ComputationHandle
  46. {
  47. /**
  48. * Our configuration.
  49. */
  50. const struct GNUNET_CONFIGURATION_Handle *cfg;
  51. /**
  52. * Current connection to the scalarproduct service.
  53. */
  54. struct GNUNET_MQ_Handle *mq;
  55. /**
  56. * Function to call after transmission of the request (Bob).
  57. */
  58. GNUNET_SCALARPRODUCT_ContinuationWithStatus cont_status;
  59. /**
  60. * Function to call after transmission of the request (Alice).
  61. */
  62. GNUNET_SCALARPRODUCT_DatumProcessor cont_datum;
  63. /**
  64. * Closure for @e cont_status or @e cont_datum.
  65. */
  66. void *cont_cls;
  67. /**
  68. * API internal callback for results and failures to be forwarded to
  69. * the client.
  70. */
  71. GNUNET_SCALARPRODUCT_ResponseMessageHandler response_proc;
  72. /**
  73. * The shared session key identifying this computation
  74. */
  75. struct GNUNET_HashCode key;
  76. };
  77. /**
  78. * Called when a response is received from the service. Perform basic
  79. * check that the message is well-formed.
  80. *
  81. * @param cls Pointer to the Master Context
  82. * @param message Pointer to the data received in response
  83. * @return #GNUNET_OK if @a message is well-formed
  84. */
  85. static int
  86. check_response (void *cls,
  87. const struct ClientResponseMessage *message)
  88. {
  89. if (ntohs (message->header.size) !=
  90. ntohl (message->product_length) + sizeof(struct ClientResponseMessage))
  91. {
  92. GNUNET_break (0);
  93. return GNUNET_SYSERR;
  94. }
  95. return GNUNET_OK;
  96. }
  97. /**
  98. * Handles the STATUS received from the service for a response, does
  99. * not contain a payload. Called when we participate as "Bob" via
  100. * #GNUNET_SCALARPRODUCT_accept_computation().
  101. *
  102. * @param h our Handle
  103. * @param msg the response received
  104. * @param status the condition the request was terminated with (eg: disconnect)
  105. */
  106. static void
  107. process_status_message (struct GNUNET_SCALARPRODUCT_ComputationHandle *h,
  108. const struct ClientResponseMessage *msg,
  109. enum GNUNET_SCALARPRODUCT_ResponseStatus status)
  110. {
  111. if (NULL != h->cont_status)
  112. h->cont_status (h->cont_cls,
  113. status);
  114. GNUNET_SCALARPRODUCT_cancel (h);
  115. }
  116. /**
  117. * Called when a response is received from the service. After basic
  118. * check, the handler in `h->response_proc` is called. This functions
  119. * handles the response to the client which used the API.
  120. *
  121. * @param cls Pointer to the Master Context
  122. * @param msg Pointer to the data received in response
  123. */
  124. static void
  125. handle_response (void *cls,
  126. const struct ClientResponseMessage *message)
  127. {
  128. struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls;
  129. enum GNUNET_SCALARPRODUCT_ResponseStatus status;
  130. status = (enum GNUNET_SCALARPRODUCT_ResponseStatus) ntohl (message->status);
  131. h->response_proc (h,
  132. message,
  133. status);
  134. }
  135. /**
  136. * Check if the keys for all given elements are unique.
  137. *
  138. * @param elements elements to check
  139. * @param element_count size of the @a elements array
  140. * @return #GNUNET_OK if all keys are unique
  141. */
  142. static int
  143. check_unique (const struct GNUNET_SCALARPRODUCT_Element *elements,
  144. uint32_t element_count)
  145. {
  146. struct GNUNET_CONTAINER_MultiHashMap *map;
  147. int ok;
  148. ok = GNUNET_OK;
  149. map = GNUNET_CONTAINER_multihashmap_create (2 * element_count,
  150. GNUNET_YES);
  151. for (uint32_t i = 0; i < element_count; i++)
  152. if (GNUNET_OK !=
  153. GNUNET_CONTAINER_multihashmap_put (map,
  154. &elements[i].key,
  155. map,
  156. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
  157. {
  158. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  159. _ ("Keys given to SCALARPRODUCT not unique!\n"));
  160. ok = GNUNET_SYSERR;
  161. }
  162. GNUNET_CONTAINER_multihashmap_destroy (map);
  163. return ok;
  164. }
  165. /**
  166. * We encountered an error communicating with the set service while
  167. * performing a set operation. Report to the application.
  168. *
  169. * @param cls the `struct GNUNET_SCALARPRODUCT_ComputationHandle`
  170. * @param error error code
  171. */
  172. static void
  173. mq_error_handler (void *cls,
  174. enum GNUNET_MQ_Error error)
  175. {
  176. struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls;
  177. LOG (GNUNET_ERROR_TYPE_INFO,
  178. "Disconnected from SCALARPRODUCT service.\n");
  179. h->response_proc (h,
  180. NULL,
  181. GNUNET_SCALARPRODUCT_STATUS_DISCONNECTED);
  182. }
  183. /**
  184. * Used by Bob's client to cooperate with Alice,
  185. *
  186. * @param cfg the gnunet configuration handle
  187. * @param key Session key unique to the requesting client
  188. * @param elements Array of elements of the vector
  189. * @param element_count Number of elements in the @a elements vector
  190. * @param cont Callback function
  191. * @param cont_cls Closure for @a cont
  192. * @return a new handle for this computation
  193. */
  194. struct GNUNET_SCALARPRODUCT_ComputationHandle *
  195. GNUNET_SCALARPRODUCT_accept_computation (
  196. const struct GNUNET_CONFIGURATION_Handle *cfg,
  197. const struct GNUNET_HashCode *session_key,
  198. const struct GNUNET_SCALARPRODUCT_Element *elements,
  199. uint32_t element_count,
  200. GNUNET_SCALARPRODUCT_ContinuationWithStatus cont,
  201. void *cont_cls)
  202. {
  203. struct GNUNET_SCALARPRODUCT_ComputationHandle *h
  204. = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
  205. struct GNUNET_MQ_MessageHandler handlers[] = {
  206. GNUNET_MQ_hd_var_size (response,
  207. GNUNET_MESSAGE_TYPE_SCALARPRODUCT_RESULT,
  208. struct ClientResponseMessage,
  209. h),
  210. GNUNET_MQ_handler_end ()
  211. };
  212. struct GNUNET_MQ_Envelope *env;
  213. struct BobComputationMessage *msg;
  214. struct ComputationBobCryptodataMultipartMessage *mmsg;
  215. uint32_t size;
  216. uint16_t possible;
  217. uint16_t todo;
  218. uint32_t element_count_transfered;
  219. if (GNUNET_SYSERR == check_unique (elements,
  220. element_count))
  221. return NULL;
  222. h->cont_status = cont;
  223. h->cont_cls = cont_cls;
  224. h->response_proc = &process_status_message;
  225. h->cfg = cfg;
  226. h->key = *session_key;
  227. h->mq = GNUNET_CLIENT_connect (cfg,
  228. "scalarproduct-bob",
  229. handlers,
  230. &mq_error_handler,
  231. h);
  232. if (NULL == h->mq)
  233. {
  234. /* scalarproduct configuration error */
  235. GNUNET_break (0);
  236. GNUNET_free (h);
  237. return NULL;
  238. }
  239. possible = (GNUNET_MAX_MESSAGE_SIZE - 1 - sizeof(struct
  240. BobComputationMessage))
  241. / sizeof(struct GNUNET_SCALARPRODUCT_Element);
  242. todo = GNUNET_MIN (possible,
  243. element_count);
  244. size = todo * sizeof(struct GNUNET_SCALARPRODUCT_Element);
  245. env = GNUNET_MQ_msg_extra (msg,
  246. size,
  247. GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB);
  248. msg->element_count_total = htonl (element_count);
  249. msg->element_count_contained = htonl (todo);
  250. msg->session_key = *session_key;
  251. GNUNET_memcpy (&msg[1],
  252. elements,
  253. size);
  254. element_count_transfered = todo;
  255. GNUNET_MQ_send (h->mq,
  256. env);
  257. possible = (GNUNET_MAX_MESSAGE_SIZE - 1 - sizeof(*mmsg))
  258. / sizeof(struct GNUNET_SCALARPRODUCT_Element);
  259. while (element_count_transfered < element_count)
  260. {
  261. todo = GNUNET_MIN (possible,
  262. element_count - element_count_transfered);
  263. size = todo * sizeof(struct GNUNET_SCALARPRODUCT_Element);
  264. env = GNUNET_MQ_msg_extra (mmsg,
  265. size,
  266. GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MULTIPART_BOB);
  267. mmsg->element_count_contained = htonl (todo);
  268. GNUNET_memcpy (&mmsg[1],
  269. &elements[element_count_transfered],
  270. size);
  271. element_count_transfered += todo;
  272. GNUNET_MQ_send (h->mq,
  273. env);
  274. }
  275. return h;
  276. }
  277. /**
  278. * Handles the RESULT received from the service for a request, should
  279. * contain a result MPI value. Called when we participate as "Alice" via
  280. * #GNUNET_SCALARPRODUCT_start_computation().
  281. *
  282. * @param h our Handle
  283. * @param msg Pointer to the response received
  284. * @param status the condition the request was terminated with (eg: disconnect)
  285. */
  286. static void
  287. process_result_message (struct GNUNET_SCALARPRODUCT_ComputationHandle *h,
  288. const struct ClientResponseMessage *msg,
  289. enum GNUNET_SCALARPRODUCT_ResponseStatus status)
  290. {
  291. uint32_t product_len;
  292. gcry_mpi_t result = NULL;
  293. gcry_error_t rc;
  294. gcry_mpi_t num;
  295. size_t rsize;
  296. if (GNUNET_SCALARPRODUCT_STATUS_SUCCESS == status)
  297. {
  298. result = gcry_mpi_new (0);
  299. product_len = ntohl (msg->product_length);
  300. if (0 < product_len)
  301. {
  302. rsize = 0;
  303. if (0 != (rc = gcry_mpi_scan (&num, GCRYMPI_FMT_STD,
  304. &msg[1],
  305. product_len,
  306. &rsize)))
  307. {
  308. LOG_GCRY (GNUNET_ERROR_TYPE_ERROR,
  309. "gcry_mpi_scan",
  310. rc);
  311. gcry_mpi_release (result);
  312. result = NULL;
  313. status = GNUNET_SCALARPRODUCT_STATUS_INVALID_RESPONSE;
  314. }
  315. else
  316. {
  317. if (0 < (int32_t) ntohl (msg->range))
  318. gcry_mpi_add (result, result, num);
  319. else
  320. gcry_mpi_sub (result, result, num);
  321. gcry_mpi_release (num);
  322. }
  323. }
  324. }
  325. if (NULL != h->cont_datum)
  326. h->cont_datum (h->cont_cls,
  327. status,
  328. result);
  329. if (NULL != result)
  330. gcry_mpi_release (result);
  331. GNUNET_SCALARPRODUCT_cancel (h);
  332. }
  333. /**
  334. * Request by Alice's client for computing a scalar product
  335. *
  336. * @param cfg the gnunet configuration handle
  337. * @param session_key Session key should be unique to the requesting client
  338. * @param peer PeerID of the other peer
  339. * @param elements Array of elements of the vector
  340. * @param element_count Number of elements in the @a elements vector
  341. * @param cont Callback function
  342. * @param cont_cls Closure for @a cont
  343. * @return a new handle for this computation
  344. */
  345. struct GNUNET_SCALARPRODUCT_ComputationHandle *
  346. GNUNET_SCALARPRODUCT_start_computation (
  347. const struct GNUNET_CONFIGURATION_Handle *cfg,
  348. const struct GNUNET_HashCode *session_key,
  349. const struct GNUNET_PeerIdentity *peer,
  350. const struct GNUNET_SCALARPRODUCT_Element *elements,
  351. uint32_t element_count,
  352. GNUNET_SCALARPRODUCT_DatumProcessor cont,
  353. void *cont_cls)
  354. {
  355. struct GNUNET_SCALARPRODUCT_ComputationHandle *h
  356. = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
  357. struct GNUNET_MQ_MessageHandler handlers[] = {
  358. GNUNET_MQ_hd_var_size (response,
  359. GNUNET_MESSAGE_TYPE_SCALARPRODUCT_RESULT,
  360. struct ClientResponseMessage,
  361. h),
  362. GNUNET_MQ_handler_end ()
  363. };
  364. struct GNUNET_MQ_Envelope *env;
  365. struct AliceComputationMessage *msg;
  366. struct ComputationBobCryptodataMultipartMessage *mmsg;
  367. uint32_t size;
  368. uint16_t possible;
  369. uint16_t todo;
  370. uint32_t element_count_transfered;
  371. if (GNUNET_SYSERR == check_unique (elements,
  372. element_count))
  373. return NULL;
  374. h->mq = GNUNET_CLIENT_connect (cfg,
  375. "scalarproduct-alice",
  376. handlers,
  377. &mq_error_handler,
  378. h);
  379. if (NULL == h->mq)
  380. {
  381. /* missconfigured scalarproduct service */
  382. GNUNET_break (0);
  383. GNUNET_free (h);
  384. return NULL;
  385. }
  386. h->cont_datum = cont;
  387. h->cont_cls = cont_cls;
  388. h->response_proc = &process_result_message;
  389. h->cfg = cfg;
  390. h->key = *session_key;
  391. possible = (GNUNET_MAX_MESSAGE_SIZE - 1 - sizeof(struct
  392. AliceComputationMessage))
  393. / sizeof(struct GNUNET_SCALARPRODUCT_Element);
  394. todo = GNUNET_MIN (possible,
  395. element_count);
  396. size = todo * sizeof(struct GNUNET_SCALARPRODUCT_Element);
  397. env = GNUNET_MQ_msg_extra (msg,
  398. size,
  399. GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
  400. msg->element_count_total = htonl (element_count);
  401. msg->element_count_contained = htonl (todo);
  402. msg->reserved = htonl (0);
  403. msg->peer = *peer;
  404. msg->session_key = *session_key;
  405. GNUNET_memcpy (&msg[1],
  406. elements,
  407. size);
  408. GNUNET_MQ_send (h->mq,
  409. env);
  410. element_count_transfered = todo;
  411. possible = (GNUNET_MAX_MESSAGE_SIZE - 1 - sizeof(*mmsg))
  412. / sizeof(struct GNUNET_SCALARPRODUCT_Element);
  413. while (element_count_transfered < element_count)
  414. {
  415. todo = GNUNET_MIN (possible,
  416. element_count - element_count_transfered);
  417. size = todo * sizeof(struct GNUNET_SCALARPRODUCT_Element);
  418. env = GNUNET_MQ_msg_extra (mmsg,
  419. size,
  420. GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MULTIPART_ALICE);
  421. mmsg->element_count_contained = htonl (todo);
  422. GNUNET_memcpy (&mmsg[1],
  423. &elements[element_count_transfered],
  424. size);
  425. element_count_transfered += todo;
  426. GNUNET_MQ_send (h->mq,
  427. env);
  428. }
  429. return h;
  430. }
  431. /**
  432. * Cancel an ongoing computation or revoke our collaboration offer.
  433. * Closes the connection to the service
  434. *
  435. * @param h computation handle to terminate
  436. */
  437. void
  438. GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle *h)
  439. {
  440. if (NULL != h->mq)
  441. {
  442. GNUNET_MQ_destroy (h->mq);
  443. h->mq = NULL;
  444. }
  445. GNUNET_free (h);
  446. }
  447. /* end of scalarproduct_api.c */