peerstore_api.c 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914
  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2013-2016 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. */
  15. /**
  16. * @file peerstore/peerstore_api.c
  17. * @brief API for peerstore
  18. * @author Omar Tarabai
  19. * @author Christian Grothoff
  20. */
  21. #include "platform.h"
  22. #include "gnunet_util_lib.h"
  23. #include "peerstore.h"
  24. #include "peerstore_common.h"
  25. #define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__)
  26. /******************************************************************************/
  27. /************************ DATA STRUCTURES ****************************/
  28. /******************************************************************************/
  29. /**
  30. * Handle to the PEERSTORE service.
  31. */
  32. struct GNUNET_PEERSTORE_Handle
  33. {
  34. /**
  35. * Our configuration.
  36. */
  37. const struct GNUNET_CONFIGURATION_Handle *cfg;
  38. /**
  39. * Message queue
  40. */
  41. struct GNUNET_MQ_Handle *mq;
  42. /**
  43. * Head of active STORE requests.
  44. */
  45. struct GNUNET_PEERSTORE_StoreContext *store_head;
  46. /**
  47. * Tail of active STORE requests.
  48. */
  49. struct GNUNET_PEERSTORE_StoreContext *store_tail;
  50. /**
  51. * Head of active ITERATE requests.
  52. */
  53. struct GNUNET_PEERSTORE_IterateContext *iterate_head;
  54. /**
  55. * Tail of active ITERATE requests.
  56. */
  57. struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
  58. /**
  59. * Hashmap of watch requests
  60. */
  61. struct GNUNET_CONTAINER_MultiHashMap *watches;
  62. /**
  63. * Are we in the process of disconnecting but need to sync first?
  64. */
  65. int disconnecting;
  66. };
  67. /**
  68. * Context for a store request
  69. */
  70. struct GNUNET_PEERSTORE_StoreContext
  71. {
  72. /**
  73. * Kept in a DLL.
  74. */
  75. struct GNUNET_PEERSTORE_StoreContext *next;
  76. /**
  77. * Kept in a DLL.
  78. */
  79. struct GNUNET_PEERSTORE_StoreContext *prev;
  80. /**
  81. * Handle to the PEERSTORE service.
  82. */
  83. struct GNUNET_PEERSTORE_Handle *h;
  84. /**
  85. * Continuation called with service response
  86. */
  87. GNUNET_PEERSTORE_Continuation cont;
  88. /**
  89. * Closure for @e cont
  90. */
  91. void *cont_cls;
  92. /**
  93. * Which subsystem does the store?
  94. */
  95. char *sub_system;
  96. /**
  97. * Key for the store operation.
  98. */
  99. char *key;
  100. /**
  101. * Contains @e size bytes.
  102. */
  103. void *value;
  104. /**
  105. * Peer the store is for.
  106. */
  107. struct GNUNET_PeerIdentity peer;
  108. /**
  109. * Number of bytes in @e value.
  110. */
  111. size_t size;
  112. /**
  113. * When does the value expire?
  114. */
  115. struct GNUNET_TIME_Absolute expiry;
  116. /**
  117. * Options for the store operation.
  118. */
  119. enum GNUNET_PEERSTORE_StoreOption options;
  120. };
  121. /**
  122. * Context for a iterate request
  123. */
  124. struct GNUNET_PEERSTORE_IterateContext
  125. {
  126. /**
  127. * Kept in a DLL.
  128. */
  129. struct GNUNET_PEERSTORE_IterateContext *next;
  130. /**
  131. * Kept in a DLL.
  132. */
  133. struct GNUNET_PEERSTORE_IterateContext *prev;
  134. /**
  135. * Handle to the PEERSTORE service.
  136. */
  137. struct GNUNET_PEERSTORE_Handle *h;
  138. /**
  139. * Which subsystem does the store?
  140. */
  141. char *sub_system;
  142. /**
  143. * Peer the store is for.
  144. */
  145. struct GNUNET_PeerIdentity peer;
  146. /**
  147. * Key for the store operation.
  148. */
  149. char *key;
  150. /**
  151. * Callback with each matching record
  152. */
  153. GNUNET_PEERSTORE_Processor callback;
  154. /**
  155. * Closure for @e callback
  156. */
  157. void *callback_cls;
  158. /**
  159. * #GNUNET_YES if we are currently processing records.
  160. */
  161. int iterating;
  162. };
  163. /**
  164. * Context for a watch request
  165. */
  166. struct GNUNET_PEERSTORE_WatchContext
  167. {
  168. /**
  169. * Kept in a DLL.
  170. */
  171. struct GNUNET_PEERSTORE_WatchContext *next;
  172. /**
  173. * Kept in a DLL.
  174. */
  175. struct GNUNET_PEERSTORE_WatchContext *prev;
  176. /**
  177. * Handle to the PEERSTORE service.
  178. */
  179. struct GNUNET_PEERSTORE_Handle *h;
  180. /**
  181. * Callback with each record received
  182. */
  183. GNUNET_PEERSTORE_Processor callback;
  184. /**
  185. * Closure for @e callback
  186. */
  187. void *callback_cls;
  188. /**
  189. * Hash of the combined key
  190. */
  191. struct GNUNET_HashCode keyhash;
  192. };
  193. /******************************************************************************/
  194. /******************* DECLARATIONS *********************/
  195. /******************************************************************************/
  196. /**
  197. * Close the existing connection to PEERSTORE and reconnect.
  198. *
  199. * @param h handle to the service
  200. */
  201. static void
  202. reconnect (struct GNUNET_PEERSTORE_Handle *h);
  203. /**
  204. * Callback after MQ envelope is sent
  205. *
  206. * @param cls a `struct GNUNET_PEERSTORE_StoreContext *`
  207. */
  208. static void
  209. store_request_sent (void *cls)
  210. {
  211. struct GNUNET_PEERSTORE_StoreContext *sc = cls;
  212. GNUNET_PEERSTORE_Continuation cont;
  213. void *cont_cls;
  214. cont = sc->cont;
  215. cont_cls = sc->cont_cls;
  216. GNUNET_PEERSTORE_store_cancel (sc);
  217. if (NULL != cont)
  218. cont (cont_cls, GNUNET_OK);
  219. }
  220. /******************************************************************************/
  221. /******************* CONNECTION FUNCTIONS *********************/
  222. /******************************************************************************/
  223. /**
  224. * Function called when we had trouble talking to the service.
  225. */
  226. static void
  227. handle_client_error (void *cls,
  228. enum GNUNET_MQ_Error error)
  229. {
  230. struct GNUNET_PEERSTORE_Handle *h = cls;
  231. LOG (GNUNET_ERROR_TYPE_ERROR,
  232. "Received an error notification from MQ of type: %d\n",
  233. error);
  234. reconnect (h);
  235. }
  236. /**
  237. * Iterator over previous watches to resend them
  238. *
  239. * @param cls the `struct GNUNET_PEERSTORE_Handle`
  240. * @param key key for the watch
  241. * @param value the `struct GNUNET_PEERSTORE_WatchContext *`
  242. * @return #GNUNET_YES (continue to iterate)
  243. */
  244. static int
  245. rewatch_it (void *cls,
  246. const struct GNUNET_HashCode *key,
  247. void *value)
  248. {
  249. struct GNUNET_PEERSTORE_Handle *h = cls;
  250. struct GNUNET_PEERSTORE_WatchContext *wc = value;
  251. struct StoreKeyHashMessage *hm;
  252. struct GNUNET_MQ_Envelope *ev;
  253. ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
  254. hm->keyhash = wc->keyhash;
  255. GNUNET_MQ_send (h->mq, ev);
  256. return GNUNET_YES;
  257. }
  258. /**
  259. * Iterator over watch requests to cancel them.
  260. *
  261. * @param cls unsused
  262. * @param key key to the watch request
  263. * @param value watch context
  264. * @return #GNUNET_YES to continue iteration
  265. */
  266. static int
  267. destroy_watch (void *cls,
  268. const struct GNUNET_HashCode *key,
  269. void *value)
  270. {
  271. struct GNUNET_PEERSTORE_WatchContext *wc = value;
  272. GNUNET_PEERSTORE_watch_cancel (wc);
  273. return GNUNET_YES;
  274. }
  275. /**
  276. * Kill the connection to the service. This can be delayed in case of pending
  277. * STORE requests and the user explicitly asked to sync first. Otherwise it is
  278. * performed instantly.
  279. *
  280. * @param h Handle to the service.
  281. */
  282. static void
  283. do_disconnect (struct GNUNET_PEERSTORE_Handle *h)
  284. {
  285. if (NULL != h->mq)
  286. {
  287. GNUNET_MQ_destroy (h->mq);
  288. h->mq = NULL;
  289. }
  290. GNUNET_free (h);
  291. }
  292. /**
  293. * Connect to the PEERSTORE service.
  294. *
  295. * @param cfg configuration to use
  296. * @return NULL on error
  297. */
  298. struct GNUNET_PEERSTORE_Handle *
  299. GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
  300. {
  301. struct GNUNET_PEERSTORE_Handle *h;
  302. h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
  303. h->cfg = cfg;
  304. h->disconnecting = GNUNET_NO;
  305. reconnect (h);
  306. if (NULL == h->mq)
  307. {
  308. GNUNET_free (h);
  309. return NULL;
  310. }
  311. return h;
  312. }
  313. /**
  314. * Disconnect from the PEERSTORE service. Any pending ITERATE and WATCH requests
  315. * will be canceled.
  316. * Any pending STORE requests will depend on @e snyc_first flag.
  317. *
  318. * @param h handle to disconnect
  319. * @param sync_first send any pending STORE requests before disconnecting
  320. */
  321. void
  322. GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h,
  323. int sync_first)
  324. {
  325. struct GNUNET_PEERSTORE_IterateContext *ic;
  326. struct GNUNET_PEERSTORE_StoreContext *sc;
  327. LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
  328. if (NULL != h->watches)
  329. {
  330. GNUNET_CONTAINER_multihashmap_iterate (h->watches, &destroy_watch, NULL);
  331. GNUNET_CONTAINER_multihashmap_destroy (h->watches);
  332. h->watches = NULL;
  333. }
  334. while (NULL != (ic = h->iterate_head))
  335. {
  336. GNUNET_break (0);
  337. GNUNET_PEERSTORE_iterate_cancel (ic);
  338. }
  339. if (NULL != h->store_head)
  340. {
  341. if (GNUNET_YES == sync_first)
  342. {
  343. LOG (GNUNET_ERROR_TYPE_DEBUG,
  344. "Delaying disconnection due to pending store requests.\n");
  345. h->disconnecting = GNUNET_YES;
  346. return;
  347. }
  348. while (NULL != (sc = h->store_head))
  349. GNUNET_PEERSTORE_store_cancel (sc);
  350. }
  351. do_disconnect (h);
  352. }
  353. /******************************************************************************/
  354. /******************* STORE FUNCTIONS *********************/
  355. /******************************************************************************/
  356. /**
  357. * Cancel a store request
  358. *
  359. * @param sc Store request context
  360. */
  361. void
  362. GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
  363. {
  364. struct GNUNET_PEERSTORE_Handle *h = sc->h;
  365. GNUNET_CONTAINER_DLL_remove (sc->h->store_head, sc->h->store_tail, sc);
  366. GNUNET_free (sc->sub_system);
  367. GNUNET_free (sc->value);
  368. GNUNET_free (sc->key);
  369. GNUNET_free (sc);
  370. if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head))
  371. do_disconnect (h);
  372. }
  373. /**
  374. * Store a new entry in the PEERSTORE.
  375. * Note that stored entries can be lost in some cases
  376. * such as power failure.
  377. *
  378. * @param h Handle to the PEERSTORE service
  379. * @param sub_system name of the sub system
  380. * @param peer Peer Identity
  381. * @param key entry key
  382. * @param value entry value BLOB
  383. * @param size size of @e value
  384. * @param expiry absolute time after which the entry is (possibly) deleted
  385. * @param options options specific to the storage operation
  386. * @param cont Continuation function after the store request is sent
  387. * @param cont_cls Closure for @a cont
  388. */
  389. struct GNUNET_PEERSTORE_StoreContext *
  390. GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
  391. const char *sub_system,
  392. const struct GNUNET_PeerIdentity *peer,
  393. const char *key,
  394. const void *value, size_t size,
  395. struct GNUNET_TIME_Absolute expiry,
  396. enum GNUNET_PEERSTORE_StoreOption options,
  397. GNUNET_PEERSTORE_Continuation cont,
  398. void *cont_cls)
  399. {
  400. struct GNUNET_MQ_Envelope *ev;
  401. struct GNUNET_PEERSTORE_StoreContext *sc;
  402. LOG (GNUNET_ERROR_TYPE_DEBUG,
  403. "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
  404. size, sub_system, GNUNET_i2s (peer), key);
  405. ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, value, size,
  406. expiry, options,
  407. GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
  408. sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
  409. sc->sub_system = GNUNET_strdup (sub_system);
  410. sc->peer = *peer;
  411. sc->key = GNUNET_strdup (key);
  412. sc->value = GNUNET_memdup (value, size);
  413. sc->size = size;
  414. sc->expiry = expiry;
  415. sc->options = options;
  416. sc->cont = cont;
  417. sc->cont_cls = cont_cls;
  418. sc->h = h;
  419. GNUNET_CONTAINER_DLL_insert_tail (h->store_head, h->store_tail, sc);
  420. GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
  421. GNUNET_MQ_send (h->mq, ev);
  422. return sc;
  423. }
  424. /******************************************************************************/
  425. /******************* ITERATE FUNCTIONS *********************/
  426. /******************************************************************************/
  427. /**
  428. * When a response for iterate request is received
  429. *
  430. * @param cls a `struct GNUNET_PEERSTORE_Handle *`
  431. * @param msg message received
  432. */
  433. static void
  434. handle_iterate_end (void *cls,
  435. const struct GNUNET_MessageHeader *msg)
  436. {
  437. struct GNUNET_PEERSTORE_Handle *h = cls;
  438. struct GNUNET_PEERSTORE_IterateContext *ic;
  439. GNUNET_PEERSTORE_Processor callback;
  440. void *callback_cls;
  441. ic = h->iterate_head;
  442. if (NULL == ic)
  443. {
  444. LOG (GNUNET_ERROR_TYPE_ERROR,
  445. _("Unexpected iteration response, this should not happen.\n"));
  446. reconnect (h);
  447. return;
  448. }
  449. callback = ic->callback;
  450. callback_cls = ic->callback_cls;
  451. ic->iterating = GNUNET_NO;
  452. GNUNET_PEERSTORE_iterate_cancel (ic);
  453. if (NULL != callback)
  454. callback (callback_cls, NULL, NULL);
  455. }
  456. /**
  457. * When a response for iterate request is received, check the
  458. * message is well-formed.
  459. *
  460. * @param cls a `struct GNUNET_PEERSTORE_Handle *`
  461. * @param msg message received
  462. */
  463. static int
  464. check_iterate_result (void *cls,
  465. const struct StoreRecordMessage *msg)
  466. {
  467. /* we defer validation to #handle_iterate_result */
  468. return GNUNET_OK;
  469. }
  470. /**
  471. * When a response for iterate request is received
  472. *
  473. * @param cls a `struct GNUNET_PEERSTORE_Handle *`
  474. * @param msg message received
  475. */
  476. static void
  477. handle_iterate_result (void *cls,
  478. const struct StoreRecordMessage *msg)
  479. {
  480. struct GNUNET_PEERSTORE_Handle *h = cls;
  481. struct GNUNET_PEERSTORE_IterateContext *ic;
  482. GNUNET_PEERSTORE_Processor callback;
  483. void *callback_cls;
  484. struct GNUNET_PEERSTORE_Record *record;
  485. ic = h->iterate_head;
  486. if (NULL == ic)
  487. {
  488. LOG (GNUNET_ERROR_TYPE_ERROR,
  489. _("Unexpected iteration response, this should not happen.\n"));
  490. reconnect (h);
  491. return;
  492. }
  493. ic->iterating = GNUNET_YES;
  494. callback = ic->callback;
  495. callback_cls = ic->callback_cls;
  496. if (NULL == callback)
  497. return;
  498. record = PEERSTORE_parse_record_message (msg);
  499. if (NULL == record)
  500. {
  501. callback (callback_cls,
  502. NULL,
  503. _("Received a malformed response from service."));
  504. }
  505. else
  506. {
  507. callback (callback_cls,
  508. record,
  509. NULL);
  510. PEERSTORE_destroy_record (record);
  511. }
  512. }
  513. /**
  514. * Cancel an iterate request
  515. * Please do not call after the iterate request is done
  516. *
  517. * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
  518. */
  519. void
  520. GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
  521. {
  522. if (GNUNET_NO == ic->iterating)
  523. {
  524. GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head,
  525. ic->h->iterate_tail,
  526. ic);
  527. GNUNET_free (ic->sub_system);
  528. GNUNET_free_non_null (ic->key);
  529. GNUNET_free (ic);
  530. }
  531. else
  532. ic->callback = NULL;
  533. }
  534. /**
  535. * Iterate over records matching supplied key information
  536. *
  537. * @param h handle to the PEERSTORE service
  538. * @param sub_system name of sub system
  539. * @param peer Peer identity (can be NULL)
  540. * @param key entry key string (can be NULL)
  541. * @param callback function called with each matching record, all NULL's on end
  542. * @param callback_cls closure for @a callback
  543. * @return Handle to iteration request
  544. */
  545. struct GNUNET_PEERSTORE_IterateContext *
  546. GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
  547. const char *sub_system,
  548. const struct GNUNET_PeerIdentity *peer,
  549. const char *key,
  550. GNUNET_PEERSTORE_Processor callback,
  551. void *callback_cls)
  552. {
  553. struct GNUNET_MQ_Envelope *ev;
  554. struct GNUNET_PEERSTORE_IterateContext *ic;
  555. ev = PEERSTORE_create_record_mq_envelope (sub_system,
  556. peer,
  557. key,
  558. NULL, 0,
  559. GNUNET_TIME_UNIT_FOREVER_ABS,
  560. 0,
  561. GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
  562. ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
  563. ic->callback = callback;
  564. ic->callback_cls = callback_cls;
  565. ic->h = h;
  566. ic->sub_system = GNUNET_strdup (sub_system);
  567. if (NULL != peer)
  568. ic->peer = *peer;
  569. if (NULL != key)
  570. ic->key = GNUNET_strdup (key);
  571. GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head,
  572. h->iterate_tail,
  573. ic);
  574. LOG (GNUNET_ERROR_TYPE_DEBUG,
  575. "Sending an iterate request for sub system `%s'\n",
  576. sub_system);
  577. GNUNET_MQ_send (h->mq, ev);
  578. return ic;
  579. }
  580. /******************************************************************************/
  581. /******************* WATCH FUNCTIONS *********************/
  582. /******************************************************************************/
  583. /**
  584. * When a watch record is received, validate it is well-formed.
  585. *
  586. * @param cls a `struct GNUNET_PEERSTORE_Handle *`
  587. * @param msg message received
  588. */
  589. static int
  590. check_watch_record (void *cls,
  591. const struct StoreRecordMessage *msg)
  592. {
  593. /* we defer validation to #handle_watch_result */
  594. return GNUNET_OK;
  595. }
  596. /**
  597. * When a watch record is received, process it.
  598. *
  599. * @param cls a `struct GNUNET_PEERSTORE_Handle *`
  600. * @param msg message received
  601. */
  602. static void
  603. handle_watch_record (void *cls,
  604. const struct StoreRecordMessage *msg)
  605. {
  606. struct GNUNET_PEERSTORE_Handle *h = cls;
  607. struct GNUNET_PEERSTORE_Record *record;
  608. struct GNUNET_HashCode keyhash;
  609. struct GNUNET_PEERSTORE_WatchContext *wc;
  610. LOG (GNUNET_ERROR_TYPE_DEBUG,
  611. "Received a watch record from service.\n");
  612. record = PEERSTORE_parse_record_message (msg);
  613. if (NULL == record)
  614. {
  615. reconnect (h);
  616. return;
  617. }
  618. PEERSTORE_hash_key (record->sub_system,
  619. &record->peer,
  620. record->key,
  621. &keyhash);
  622. // FIXME: what if there are multiple watches for the same key?
  623. wc = GNUNET_CONTAINER_multihashmap_get (h->watches,
  624. &keyhash);
  625. if (NULL == wc)
  626. {
  627. LOG (GNUNET_ERROR_TYPE_ERROR,
  628. _("Received a watch result for a non existing watch.\n"));
  629. PEERSTORE_destroy_record (record);
  630. reconnect (h);
  631. return;
  632. }
  633. if (NULL != wc->callback)
  634. wc->callback (wc->callback_cls,
  635. record,
  636. NULL);
  637. PEERSTORE_destroy_record (record);
  638. }
  639. /**
  640. * Close the existing connection to PEERSTORE and reconnect.
  641. *
  642. * @param h handle to the service
  643. */
  644. static void
  645. reconnect (struct GNUNET_PEERSTORE_Handle *h)
  646. {
  647. struct GNUNET_MQ_MessageHandler mq_handlers[] = {
  648. GNUNET_MQ_hd_fixed_size (iterate_end,
  649. GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
  650. struct GNUNET_MessageHeader,
  651. h),
  652. GNUNET_MQ_hd_var_size (iterate_result,
  653. GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD,
  654. struct StoreRecordMessage,
  655. h),
  656. GNUNET_MQ_hd_var_size (watch_record,
  657. GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD,
  658. struct StoreRecordMessage,
  659. h),
  660. GNUNET_MQ_handler_end ()
  661. };
  662. struct GNUNET_PEERSTORE_IterateContext *ic;
  663. struct GNUNET_PEERSTORE_IterateContext *next;
  664. GNUNET_PEERSTORE_Processor icb;
  665. void *icb_cls;
  666. struct GNUNET_PEERSTORE_StoreContext *sc;
  667. struct GNUNET_MQ_Envelope *ev;
  668. LOG (GNUNET_ERROR_TYPE_DEBUG,
  669. "Reconnecting...\n");
  670. for (ic = h->iterate_head; NULL != ic; ic = next)
  671. {
  672. next = ic->next;
  673. if (GNUNET_YES == ic->iterating)
  674. {
  675. icb = ic->callback;
  676. icb_cls = ic->callback_cls;
  677. GNUNET_PEERSTORE_iterate_cancel (ic);
  678. if (NULL != icb)
  679. icb (icb_cls,
  680. NULL,
  681. "Iteration canceled due to reconnection");
  682. }
  683. }
  684. if (NULL != h->mq)
  685. {
  686. GNUNET_MQ_destroy (h->mq);
  687. h->mq = NULL;
  688. }
  689. h->mq = GNUNET_CLIENT_connect (h->cfg,
  690. "peerstore",
  691. mq_handlers,
  692. &handle_client_error,
  693. h);
  694. if (NULL == h->mq)
  695. return;
  696. LOG (GNUNET_ERROR_TYPE_DEBUG,
  697. "Resending pending requests after reconnect.\n");
  698. if (NULL != h->watches)
  699. GNUNET_CONTAINER_multihashmap_iterate (h->watches,
  700. &rewatch_it,
  701. h);
  702. for (ic = h->iterate_head; NULL != ic; ic = ic->next)
  703. {
  704. ev = PEERSTORE_create_record_mq_envelope (ic->sub_system,
  705. &ic->peer,
  706. ic->key,
  707. NULL, 0,
  708. GNUNET_TIME_UNIT_FOREVER_ABS,
  709. 0,
  710. GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
  711. GNUNET_MQ_send (h->mq, ev);
  712. }
  713. for (sc = h->store_head; NULL != sc; sc = sc->next)
  714. {
  715. ev = PEERSTORE_create_record_mq_envelope (sc->sub_system,
  716. &sc->peer,
  717. sc->key,
  718. sc->value,
  719. sc->size,
  720. sc->expiry,
  721. sc->options,
  722. GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
  723. GNUNET_MQ_notify_sent (ev,
  724. &store_request_sent,
  725. sc);
  726. GNUNET_MQ_send (h->mq,
  727. ev);
  728. }
  729. }
  730. /**
  731. * Cancel a watch request
  732. *
  733. * @param wc handle to the watch request
  734. */
  735. void
  736. GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
  737. {
  738. struct GNUNET_PEERSTORE_Handle *h = wc->h;
  739. struct GNUNET_MQ_Envelope *ev;
  740. struct StoreKeyHashMessage *hm;
  741. LOG (GNUNET_ERROR_TYPE_DEBUG,
  742. "Canceling watch.\n");
  743. ev = GNUNET_MQ_msg (hm,
  744. GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
  745. hm->keyhash = wc->keyhash;
  746. GNUNET_MQ_send (h->mq, ev);
  747. GNUNET_CONTAINER_multihashmap_remove (h->watches,
  748. &wc->keyhash,
  749. wc);
  750. GNUNET_free (wc);
  751. }
  752. /**
  753. * Request watching a given key
  754. * User will be notified with any new values added to key
  755. *
  756. * @param h handle to the PEERSTORE service
  757. * @param sub_system name of sub system
  758. * @param peer Peer identity
  759. * @param key entry key string
  760. * @param callback function called with each new value
  761. * @param callback_cls closure for @a callback
  762. * @return Handle to watch request
  763. */
  764. struct GNUNET_PEERSTORE_WatchContext *
  765. GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
  766. const char *sub_system,
  767. const struct GNUNET_PeerIdentity *peer,
  768. const char *key,
  769. GNUNET_PEERSTORE_Processor callback,
  770. void *callback_cls)
  771. {
  772. struct GNUNET_MQ_Envelope *ev;
  773. struct StoreKeyHashMessage *hm;
  774. struct GNUNET_PEERSTORE_WatchContext *wc;
  775. ev = GNUNET_MQ_msg (hm,
  776. GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
  777. PEERSTORE_hash_key (sub_system,
  778. peer,
  779. key,
  780. &hm->keyhash);
  781. wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
  782. wc->callback = callback;
  783. wc->callback_cls = callback_cls;
  784. wc->h = h;
  785. wc->keyhash = hm->keyhash;
  786. if (NULL == h->watches)
  787. h->watches = GNUNET_CONTAINER_multihashmap_create (5,
  788. GNUNET_NO);
  789. GNUNET_assert (GNUNET_OK ==
  790. GNUNET_CONTAINER_multihashmap_put (h->watches,
  791. &wc->keyhash,
  792. wc,
  793. GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
  794. LOG (GNUNET_ERROR_TYPE_DEBUG,
  795. "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
  796. sub_system,
  797. GNUNET_i2s (peer),
  798. key);
  799. GNUNET_MQ_send (h->mq,
  800. ev);
  801. return wc;
  802. }
  803. /* end of peerstore_api.c */