peerstore_api.c 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939
  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2013-2016, 2019 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file peerstore/peerstore_api.c
  18. * @brief API for peerstore
  19. * @author Omar Tarabai
  20. * @author Christian Grothoff
  21. */
  22. #include "platform.h"
  23. #include "gnunet_util_lib.h"
  24. #include "peerstore.h"
  25. #include "peerstore_common.h"
  26. #define LOG(kind, ...) GNUNET_log_from (kind, "peerstore-api", __VA_ARGS__)
  27. /******************************************************************************/
  28. /************************ DATA STRUCTURES ****************************/
  29. /******************************************************************************/
  30. /**
  31. * Handle to the PEERSTORE service.
  32. */
  33. struct GNUNET_PEERSTORE_Handle
  34. {
  35. /**
  36. * Our configuration.
  37. */
  38. const struct GNUNET_CONFIGURATION_Handle *cfg;
  39. /**
  40. * Message queue
  41. */
  42. struct GNUNET_MQ_Handle *mq;
  43. /**
  44. * Head of active STORE requests.
  45. */
  46. struct GNUNET_PEERSTORE_StoreContext *store_head;
  47. /**
  48. * Tail of active STORE requests.
  49. */
  50. struct GNUNET_PEERSTORE_StoreContext *store_tail;
  51. /**
  52. * Head of active ITERATE requests.
  53. */
  54. struct GNUNET_PEERSTORE_IterateContext *iterate_head;
  55. /**
  56. * Tail of active ITERATE requests.
  57. */
  58. struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
  59. /**
  60. * Hashmap of watch requests
  61. */
  62. struct GNUNET_CONTAINER_MultiHashMap *watches;
  63. /**
  64. * ID of the task trying to reconnect to the service.
  65. */
  66. struct GNUNET_SCHEDULER_Task *reconnect_task;
  67. /**
  68. * Delay until we try to reconnect.
  69. */
  70. struct GNUNET_TIME_Relative reconnect_delay;
  71. /**
  72. * Are we in the process of disconnecting but need to sync first?
  73. */
  74. int disconnecting;
  75. };
  76. /**
  77. * Context for a store request
  78. */
  79. struct GNUNET_PEERSTORE_StoreContext
  80. {
  81. /**
  82. * Kept in a DLL.
  83. */
  84. struct GNUNET_PEERSTORE_StoreContext *next;
  85. /**
  86. * Kept in a DLL.
  87. */
  88. struct GNUNET_PEERSTORE_StoreContext *prev;
  89. /**
  90. * Handle to the PEERSTORE service.
  91. */
  92. struct GNUNET_PEERSTORE_Handle *h;
  93. /**
  94. * Continuation called with service response
  95. */
  96. GNUNET_PEERSTORE_Continuation cont;
  97. /**
  98. * Closure for @e cont
  99. */
  100. void *cont_cls;
  101. /**
  102. * Which subsystem does the store?
  103. */
  104. char *sub_system;
  105. /**
  106. * Key for the store operation.
  107. */
  108. char *key;
  109. /**
  110. * Contains @e size bytes.
  111. */
  112. void *value;
  113. /**
  114. * Peer the store is for.
  115. */
  116. struct GNUNET_PeerIdentity peer;
  117. /**
  118. * Number of bytes in @e value.
  119. */
  120. size_t size;
  121. /**
  122. * When does the value expire?
  123. */
  124. struct GNUNET_TIME_Absolute expiry;
  125. /**
  126. * Options for the store operation.
  127. */
  128. enum GNUNET_PEERSTORE_StoreOption options;
  129. };
  130. /**
  131. * Context for a iterate request
  132. */
  133. struct GNUNET_PEERSTORE_IterateContext
  134. {
  135. /**
  136. * Kept in a DLL.
  137. */
  138. struct GNUNET_PEERSTORE_IterateContext *next;
  139. /**
  140. * Kept in a DLL.
  141. */
  142. struct GNUNET_PEERSTORE_IterateContext *prev;
  143. /**
  144. * Handle to the PEERSTORE service.
  145. */
  146. struct GNUNET_PEERSTORE_Handle *h;
  147. /**
  148. * Which subsystem does the store?
  149. */
  150. char *sub_system;
  151. /**
  152. * Peer the store is for.
  153. */
  154. struct GNUNET_PeerIdentity peer;
  155. /**
  156. * Key for the store operation.
  157. */
  158. char *key;
  159. /**
  160. * Callback with each matching record
  161. */
  162. GNUNET_PEERSTORE_Processor callback;
  163. /**
  164. * Closure for @e callback
  165. */
  166. void *callback_cls;
  167. /**
  168. * #GNUNET_YES if we are currently processing records.
  169. */
  170. int iterating;
  171. };
  172. /**
  173. * Context for a watch request
  174. */
  175. struct GNUNET_PEERSTORE_WatchContext
  176. {
  177. /**
  178. * Kept in a DLL.
  179. */
  180. struct GNUNET_PEERSTORE_WatchContext *next;
  181. /**
  182. * Kept in a DLL.
  183. */
  184. struct GNUNET_PEERSTORE_WatchContext *prev;
  185. /**
  186. * Handle to the PEERSTORE service.
  187. */
  188. struct GNUNET_PEERSTORE_Handle *h;
  189. /**
  190. * Callback with each record received
  191. */
  192. GNUNET_PEERSTORE_Processor callback;
  193. /**
  194. * Closure for @e callback
  195. */
  196. void *callback_cls;
  197. /**
  198. * Hash of the combined key
  199. */
  200. struct GNUNET_HashCode keyhash;
  201. };
  202. /******************************************************************************/
  203. /******************* DECLARATIONS *********************/
  204. /******************************************************************************/
  205. /**
  206. * Close the existing connection to PEERSTORE and reconnect.
  207. *
  208. * @param cls a `struct GNUNET_PEERSTORE_Handle *h`
  209. */
  210. static void
  211. reconnect (void *cls);
  212. /**
  213. * Disconnect from the peerstore service.
  214. *
  215. * @param h peerstore handle to disconnect
  216. */
  217. static void
  218. disconnect (struct GNUNET_PEERSTORE_Handle *h)
  219. {
  220. struct GNUNET_PEERSTORE_IterateContext *next;
  221. for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic;
  222. ic = next)
  223. {
  224. next = ic->next;
  225. if (GNUNET_YES == ic->iterating)
  226. {
  227. GNUNET_PEERSTORE_Processor icb;
  228. void *icb_cls;
  229. icb = ic->callback;
  230. icb_cls = ic->callback_cls;
  231. GNUNET_PEERSTORE_iterate_cancel (ic);
  232. if (NULL != icb)
  233. icb (icb_cls, NULL, "Iteration canceled due to reconnection");
  234. }
  235. }
  236. if (NULL != h->mq)
  237. {
  238. GNUNET_MQ_destroy (h->mq);
  239. h->mq = NULL;
  240. }
  241. }
  242. /**
  243. * Function that will schedule the job that will try
  244. * to connect us again to the client.
  245. *
  246. * @param h peerstore to reconnect
  247. */
  248. static void
  249. disconnect_and_schedule_reconnect (struct GNUNET_PEERSTORE_Handle *h)
  250. {
  251. GNUNET_assert (NULL == h->reconnect_task);
  252. disconnect (h);
  253. LOG (GNUNET_ERROR_TYPE_DEBUG,
  254. "Scheduling task to reconnect to PEERSTORE service in %s.\n",
  255. GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES));
  256. h->reconnect_task =
  257. GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
  258. h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
  259. }
  260. /**
  261. * Callback after MQ envelope is sent
  262. *
  263. * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
  264. */
  265. static void
  266. store_request_sent (void *cls)
  267. {
  268. struct GNUNET_PEERSTORE_StoreContext *sc = cls;
  269. GNUNET_PEERSTORE_Continuation cont;
  270. void *cont_cls;
  271. cont = sc->cont;
  272. cont_cls = sc->cont_cls;
  273. GNUNET_PEERSTORE_store_cancel (sc);
  274. if (NULL != cont)
  275. cont (cont_cls, GNUNET_OK);
  276. }
  277. /******************************************************************************/
  278. /******************* CONNECTION FUNCTIONS *********************/
  279. /******************************************************************************/
  280. /**
  281. * Function called when we had trouble talking to the service.
  282. */
  283. static void
  284. handle_client_error (void *cls, enum GNUNET_MQ_Error error)
  285. {
  286. struct GNUNET_PEERSTORE_Handle *h = cls;
  287. LOG (GNUNET_ERROR_TYPE_ERROR,
  288. "Received an error notification from MQ of type: %d\n",
  289. error);
  290. disconnect_and_schedule_reconnect (h);
  291. }
  292. /**
  293. * Iterator over previous watches to resend them
  294. *
  295. * @param cls the `struct GNUNET_PEERSTORE_Handle`
  296. * @param key key for the watch
  297. * @param value the `struct GNUNET_PEERSTORE_WatchContext *`
  298. * @return #GNUNET_YES (continue to iterate)
  299. */
  300. static int
  301. rewatch_it (void *cls, const struct GNUNET_HashCode *key, void *value)
  302. {
  303. struct GNUNET_PEERSTORE_Handle *h = cls;
  304. struct GNUNET_PEERSTORE_WatchContext *wc = value;
  305. struct StoreKeyHashMessage *hm;
  306. struct GNUNET_MQ_Envelope *ev;
  307. ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
  308. hm->keyhash = wc->keyhash;
  309. GNUNET_MQ_send (h->mq, ev);
  310. return GNUNET_YES;
  311. }
  312. /**
  313. * Iterator over watch requests to cancel them.
  314. *
  315. * @param cls unused
  316. * @param key key to the watch request
  317. * @param value watch context
  318. * @return #GNUNET_YES to continue iteration
  319. */
  320. static int
  321. destroy_watch (void *cls, const struct GNUNET_HashCode *key, void *value)
  322. {
  323. struct GNUNET_PEERSTORE_WatchContext *wc = value;
  324. GNUNET_PEERSTORE_watch_cancel (wc);
  325. return GNUNET_YES;
  326. }
  327. /**
  328. * Kill the connection to the service. This can be delayed in case of pending
  329. * STORE requests and the user explicitly asked to sync first. Otherwise it is
  330. * performed instantly.
  331. *
  332. * @param h Handle to the service.
  333. */
  334. static void
  335. final_disconnect (struct GNUNET_PEERSTORE_Handle *h)
  336. {
  337. if (NULL != h->mq)
  338. {
  339. GNUNET_MQ_destroy (h->mq);
  340. h->mq = NULL;
  341. }
  342. GNUNET_free (h);
  343. }
  344. /**
  345. * Connect to the PEERSTORE service.
  346. *
  347. * @param cfg configuration to use
  348. * @return NULL on error
  349. */
  350. struct GNUNET_PEERSTORE_Handle *
  351. GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
  352. {
  353. struct GNUNET_PEERSTORE_Handle *h;
  354. h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
  355. h->cfg = cfg;
  356. h->disconnecting = GNUNET_NO;
  357. reconnect (h);
  358. if (NULL == h->mq)
  359. {
  360. GNUNET_free (h);
  361. return NULL;
  362. }
  363. return h;
  364. }
  365. /**
  366. * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests
  367. * will be canceled.
  368. * Any pending STORE requests will depend on @e snyc_first flag.
  369. *
  370. * @param h handle to disconnect
  371. * @param sync_first send any pending STORE requests before disconnecting
  372. */
  373. void
  374. GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h, int sync_first)
  375. {
  376. struct GNUNET_PEERSTORE_IterateContext *ic;
  377. struct GNUNET_PEERSTORE_StoreContext *sc;
  378. LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
  379. if (NULL != h->watches)
  380. {
  381. GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
  382. GNUNET_CONTAINER_multihashmap_destroy (h->watches);
  383. h->watches = NULL;
  384. }
  385. while (NULL != (ic = h->iterate_head))
  386. {
  387. GNUNET_break (0);
  388. GNUNET_PEERSTORE_iterate_cancel (ic);
  389. }
  390. if (NULL != h->store_head)
  391. {
  392. if (GNUNET_YES == sync_first)
  393. {
  394. LOG (GNUNET_ERROR_TYPE_DEBUG,
  395. "Delaying disconnection due to pending store requests.\n");
  396. h->disconnecting = GNUNET_YES;
  397. return;
  398. }
  399. while (NULL != (sc = h->store_head))
  400. GNUNET_PEERSTORE_store_cancel (sc);
  401. }
  402. final_disconnect (h);
  403. }
  404. /******************************************************************************/
  405. /******************* STORE FUNCTIONS *********************/
  406. /******************************************************************************/
  407. /**
  408. * Cancel a store request
  409. *
  410. * @param sc Store request context
  411. */
  412. void
  413. GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
  414. {
  415. struct GNUNET_PEERSTORE_Handle *h = sc->h;
  416. GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
  417. GNUNET_free (sc->sub_system);
  418. GNUNET_free (sc->value);
  419. GNUNET_free (sc->key);
  420. GNUNET_free (sc);
  421. if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head))
  422. final_disconnect (h);
  423. }
  424. /**
  425. * Store a new entry in the PEERSTORE.
  426. * Note that stored entries can be lost in some cases
  427. * such as power failure.
  428. *
  429. * @param h Handle to the PEERSTORE service
  430. * @param sub_system name of the sub system
  431. * @param peer Peer Identity
  432. * @param key entry key
  433. * @param value entry value BLOB
  434. * @param size size of @e value
  435. * @param expiry absolute time after which the entry is (possibly) deleted
  436. * @param options options specific to the storage operation
  437. * @param cont Continuation function after the store request is sent
  438. * @param cont_cls Closure for @a cont
  439. */
  440. struct GNUNET_PEERSTORE_StoreContext *
  441. GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
  442. const char *sub_system,
  443. const struct GNUNET_PeerIdentity *peer,
  444. const char *key,
  445. const void *value,
  446. size_t size,
  447. struct GNUNET_TIME_Absolute expiry,
  448. enum GNUNET_PEERSTORE_StoreOption options,
  449. GNUNET_PEERSTORE_Continuation cont,
  450. void *cont_cls)
  451. {
  452. struct GNUNET_MQ_Envelope *ev;
  453. struct GNUNET_PEERSTORE_StoreContext *sc;
  454. LOG (GNUNET_ERROR_TYPE_DEBUG,
  455. "Storing value (size: %lu) for subsystem `%s', peer `%s', key `%s'\n",
  456. size,
  457. sub_system,
  458. GNUNET_i2s (peer),
  459. key);
  460. ev =
  461. PEERSTORE_create_record_mq_envelope (sub_system,
  462. peer,
  463. key,
  464. value,
  465. size,
  466. expiry,
  467. options,
  468. GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
  469. sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
  470. sc->sub_system = GNUNET_strdup (sub_system);
  471. sc->peer = *peer;
  472. sc->key = GNUNET_strdup (key);
  473. sc->value = GNUNET_memdup (value, size);
  474. sc->size = size;
  475. sc->expiry = expiry;
  476. sc->options = options;
  477. sc->cont = cont;
  478. sc->cont_cls = cont_cls;
  479. sc->h = h;
  480. GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
  481. GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
  482. GNUNET_MQ_send (h->mq, ev);
  483. return sc;
  484. }
  485. /******************************************************************************/
  486. /******************* ITERATE FUNCTIONS *********************/
  487. /******************************************************************************/
  488. /**
  489. * When a response for iterate request is received
  490. *
  491. * @param cls a `struct GNUNET_PEERSTORE_Handle *`
  492. * @param msg message received
  493. */
  494. static void
  495. handle_iterate_end (void *cls, const struct GNUNET_MessageHeader *msg)
  496. {
  497. struct GNUNET_PEERSTORE_Handle *h = cls;
  498. struct GNUNET_PEERSTORE_IterateContext *ic;
  499. GNUNET_PEERSTORE_Processor callback;
  500. void *callback_cls;
  501. ic = h->iterate_head;
  502. if (NULL == ic)
  503. {
  504. LOG (GNUNET_ERROR_TYPE_ERROR,
  505. _ ("Unexpected iteration response, this should not happen.\n"));
  506. disconnect_and_schedule_reconnect (h);
  507. return;
  508. }
  509. callback = ic->callback;
  510. callback_cls = ic->callback_cls;
  511. ic->iterating = GNUNET_NO;
  512. GNUNET_PEERSTORE_iterate_cancel (ic);
  513. if (NULL != callback)
  514. callback (callback_cls, NULL, NULL);
  515. h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
  516. }
  517. /**
  518. * When a response for iterate request is received, check the
  519. * message is well-formed.
  520. *
  521. * @param cls a `struct GNUNET_PEERSTORE_Handle *`
  522. * @param msg message received
  523. */
  524. static int
  525. check_iterate_result (void *cls, const struct StoreRecordMessage *msg)
  526. {
  527. /* we defer validation to #handle_iterate_result */
  528. return GNUNET_OK;
  529. }
  530. /**
  531. * When a response for iterate request is received
  532. *
  533. * @param cls a `struct GNUNET_PEERSTORE_Handle *`
  534. * @param msg message received
  535. */
  536. static void
  537. handle_iterate_result (void *cls, const struct StoreRecordMessage *msg)
  538. {
  539. struct GNUNET_PEERSTORE_Handle *h = cls;
  540. struct GNUNET_PEERSTORE_IterateContext *ic;
  541. GNUNET_PEERSTORE_Processor callback;
  542. void *callback_cls;
  543. struct GNUNET_PEERSTORE_Record *record;
  544. ic = h->iterate_head;
  545. if (NULL == ic)
  546. {
  547. LOG (GNUNET_ERROR_TYPE_ERROR,
  548. _ ("Unexpected iteration response, this should not happen.\n"));
  549. disconnect_and_schedule_reconnect (h);
  550. return;
  551. }
  552. ic->iterating = GNUNET_YES;
  553. callback = ic->callback;
  554. callback_cls = ic->callback_cls;
  555. if (NULL == callback)
  556. return;
  557. record = PEERSTORE_parse_record_message (msg);
  558. if (NULL == record)
  559. {
  560. callback (callback_cls,
  561. NULL,
  562. _ ("Received a malformed response from service."));
  563. }
  564. else
  565. {
  566. callback (callback_cls, record, NULL);
  567. PEERSTORE_destroy_record (record);
  568. }
  569. }
  570. /**
  571. * Cancel an iterate request
  572. * Please do not call after the iterate request is done
  573. *
  574. * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
  575. */
  576. void
  577. GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
  578. {
  579. if (GNUNET_NO == ic->iterating)
  580. {
  581. GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head, ic->h->iterate_tail, ic);
  582. GNUNET_free (ic->sub_system);
  583. GNUNET_free (ic->key);
  584. GNUNET_free (ic);
  585. }
  586. else
  587. ic->callback = NULL;
  588. }
  589. /**
  590. * Iterate over records matching supplied key information
  591. *
  592. * @param h handle to the PEERSTORE service
  593. * @param sub_system name of sub system
  594. * @param peer Peer identity (can be NULL)
  595. * @param key entry key string (can be NULL)
  596. * @param callback function called with each matching record, all NULL's on end
  597. * @param callback_cls closure for @a callback
  598. * @return Handle to iteration request
  599. */
  600. struct GNUNET_PEERSTORE_IterateContext *
  601. GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
  602. const char *sub_system,
  603. const struct GNUNET_PeerIdentity *peer,
  604. const char *key,
  605. GNUNET_PEERSTORE_Processor callback,
  606. void *callback_cls)
  607. {
  608. struct GNUNET_MQ_Envelope *ev;
  609. struct GNUNET_PEERSTORE_IterateContext *ic;
  610. ev =
  611. PEERSTORE_create_record_mq_envelope (sub_system,
  612. peer,
  613. key,
  614. NULL,
  615. 0,
  616. GNUNET_TIME_UNIT_FOREVER_ABS,
  617. 0,
  618. GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
  619. ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
  620. ic->callback = callback;
  621. ic->callback_cls = callback_cls;
  622. ic->h = h;
  623. ic->sub_system = GNUNET_strdup (sub_system);
  624. if (NULL != peer)
  625. ic->peer = *peer;
  626. if (NULL != key)
  627. ic->key = GNUNET_strdup (key);
  628. GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head, h->iterate_tail, ic);
  629. LOG (GNUNET_ERROR_TYPE_DEBUG,
  630. "Sending an iterate request for sub system `%s'\n",
  631. sub_system);
  632. GNUNET_MQ_send (h->mq, ev);
  633. return ic;
  634. }
  635. /******************************************************************************/
  636. /******************* WATCH FUNCTIONS *********************/
  637. /******************************************************************************/
  638. /**
  639. * When a watch record is received, validate it is well-formed.
  640. *
  641. * @param cls a `struct GNUNET_PEERSTORE_Handle *`
  642. * @param msg message received
  643. */
  644. static int
  645. check_watch_record (void *cls, const struct StoreRecordMessage *msg)
  646. {
  647. /* we defer validation to #handle_watch_result */
  648. return GNUNET_OK;
  649. }
  650. /**
  651. * When a watch record is received, process it.
  652. *
  653. * @param cls a `struct GNUNET_PEERSTORE_Handle *`
  654. * @param msg message received
  655. */
  656. static void
  657. handle_watch_record (void *cls, const struct StoreRecordMessage *msg)
  658. {
  659. struct GNUNET_PEERSTORE_Handle *h = cls;
  660. struct GNUNET_PEERSTORE_Record *record;
  661. struct GNUNET_HashCode keyhash;
  662. struct GNUNET_PEERSTORE_WatchContext *wc;
  663. LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n");
  664. record = PEERSTORE_parse_record_message (msg);
  665. if (NULL == record)
  666. {
  667. disconnect_and_schedule_reconnect (h);
  668. return;
  669. }
  670. PEERSTORE_hash_key (record->sub_system, &record->peer, record->key, &keyhash);
  671. // FIXME: what if there are multiple watches for the same key?
  672. wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash);
  673. if (NULL == wc)
  674. {
  675. LOG (GNUNET_ERROR_TYPE_ERROR,
  676. _ ("Received a watch result for a non existing watch.\n"));
  677. PEERSTORE_destroy_record (record);
  678. disconnect_and_schedule_reconnect (h);
  679. return;
  680. }
  681. if (NULL != wc->callback)
  682. wc->callback (wc->callback_cls, record, NULL);
  683. h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
  684. PEERSTORE_destroy_record (record);
  685. }
  686. /**
  687. * Close the existing connection to PEERSTORE and reconnect.
  688. *
  689. * @param cls a `struct GNUNET_PEERSTORE_Handle *`
  690. */
  691. static void
  692. reconnect (void *cls)
  693. {
  694. struct GNUNET_PEERSTORE_Handle *h = cls;
  695. struct GNUNET_MQ_MessageHandler mq_handlers[] =
  696. { GNUNET_MQ_hd_fixed_size (iterate_end,
  697. GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
  698. struct GNUNET_MessageHeader,
  699. h),
  700. GNUNET_MQ_hd_var_size (iterate_result,
  701. GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD,
  702. struct StoreRecordMessage,
  703. h),
  704. GNUNET_MQ_hd_var_size (watch_record,
  705. GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD,
  706. struct StoreRecordMessage,
  707. h),
  708. GNUNET_MQ_handler_end () };
  709. struct GNUNET_MQ_Envelope *ev;
  710. h->reconnect_task = NULL;
  711. LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
  712. h->mq = GNUNET_CLIENT_connect (h->cfg,
  713. "peerstore",
  714. mq_handlers,
  715. &handle_client_error,
  716. h);
  717. if (NULL == h->mq)
  718. {
  719. h->reconnect_task =
  720. GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
  721. h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
  722. return;
  723. }
  724. LOG (GNUNET_ERROR_TYPE_DEBUG,
  725. "Resending pending requests after reconnect.\n");
  726. if (NULL != h->watches)
  727. GNUNET_CONTAINER_multihashmap_iterate (h->watches, &rewatch_it, h);
  728. for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic;
  729. ic = ic->next)
  730. {
  731. ev =
  732. PEERSTORE_create_record_mq_envelope (ic->sub_system,
  733. &ic->peer,
  734. ic->key,
  735. NULL,
  736. 0,
  737. GNUNET_TIME_UNIT_FOREVER_ABS,
  738. 0,
  739. GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
  740. GNUNET_MQ_send (h->mq, ev);
  741. }
  742. for (struct GNUNET_PEERSTORE_StoreContext *sc = h->store_head; NULL != sc;
  743. sc = sc->next)
  744. {
  745. ev =
  746. PEERSTORE_create_record_mq_envelope (sc->sub_system,
  747. &sc->peer,
  748. sc->key,
  749. sc->value,
  750. sc->size,
  751. sc->expiry,
  752. sc->options,
  753. GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
  754. GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
  755. GNUNET_MQ_send (h->mq, ev);
  756. }
  757. }
  758. /**
  759. * Cancel a watch request
  760. *
  761. * @param wc handle to the watch request
  762. */
  763. void
  764. GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
  765. {
  766. struct GNUNET_PEERSTORE_Handle *h = wc->h;
  767. struct GNUNET_MQ_Envelope *ev;
  768. struct StoreKeyHashMessage *hm;
  769. LOG (GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n");
  770. ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
  771. hm->keyhash = wc->keyhash;
  772. GNUNET_MQ_send (h->mq, ev);
  773. GNUNET_assert (
  774. GNUNET_YES ==
  775. GNUNET_CONTAINER_multihashmap_remove (h->watches, &wc->keyhash, wc));
  776. GNUNET_free (wc);
  777. }
  778. /**
  779. * Request watching a given key
  780. * User will be notified with any new values added to key
  781. *
  782. * @param h handle to the PEERSTORE service
  783. * @param sub_system name of sub system
  784. * @param peer Peer identity
  785. * @param key entry key string
  786. * @param callback function called with each new value
  787. * @param callback_cls closure for @a callback
  788. * @return Handle to watch request
  789. */
  790. struct GNUNET_PEERSTORE_WatchContext *
  791. GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
  792. const char *sub_system,
  793. const struct GNUNET_PeerIdentity *peer,
  794. const char *key,
  795. GNUNET_PEERSTORE_Processor callback,
  796. void *callback_cls)
  797. {
  798. struct GNUNET_MQ_Envelope *ev;
  799. struct StoreKeyHashMessage *hm;
  800. struct GNUNET_PEERSTORE_WatchContext *wc;
  801. ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
  802. PEERSTORE_hash_key (sub_system, peer, key, &hm->keyhash);
  803. wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
  804. wc->callback = callback;
  805. wc->callback_cls = callback_cls;
  806. wc->h = h;
  807. wc->keyhash = hm->keyhash;
  808. if (NULL == h->watches)
  809. h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
  810. GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (
  811. h->watches,
  812. &wc->keyhash,
  813. wc,
  814. GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
  815. LOG (GNUNET_ERROR_TYPE_DEBUG,
  816. "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
  817. sub_system,
  818. GNUNET_i2s (peer),
  819. key);
  820. GNUNET_MQ_send (h->mq, ev);
  821. return wc;
  822. }
  823. /* end of peerstore_api.c */