datastore_api.c 46 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488
  1. /*
  2. This file is part of GNUnet
  3. (C) 2004-2013 Christian Grothoff (and other contributing authors)
  4. GNUnet is free software; you can redistribute it and/or modify
  5. it under the terms of the GNU General Public License as published
  6. by the Free Software Foundation; either version 3, or (at your
  7. option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. General Public License for more details.
  12. You should have received a copy of the GNU General Public License
  13. along with GNUnet; see the file COPYING. If not, write to the
  14. Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  15. Boston, MA 02111-1307, USA.
  16. */
  17. /**
  18. * @file datastore/datastore_api.c
  19. * @brief Management for the datastore for files stored on a GNUnet node. Implements
  20. * a priority queue for requests (with timeouts).
  21. * @author Christian Grothoff
  22. */
  23. #include "platform.h"
  24. #include "gnunet_arm_service.h"
  25. #include "gnunet_constants.h"
  26. #include "gnunet_datastore_service.h"
  27. #include "gnunet_statistics_service.h"
  28. #include "datastore.h"
  29. #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
  30. /**
  31. * Collect an instane number of statistics? May cause excessive IPC.
  32. */
  33. #define INSANE_STATISTICS GNUNET_NO
  34. /**
  35. * If a client stopped asking for more results, how many more do
  36. * we receive from the DB before killing the connection? Trade-off
  37. * between re-doing TCP handshakes and (needlessly) receiving
  38. * useless results.
  39. */
  40. #define MAX_EXCESS_RESULTS 8
  41. /**
  42. * Context for processing status messages.
  43. */
  44. struct StatusContext
  45. {
  46. /**
  47. * Continuation to call with the status.
  48. */
  49. GNUNET_DATASTORE_ContinuationWithStatus cont;
  50. /**
  51. * Closure for cont.
  52. */
  53. void *cont_cls;
  54. };
  55. /**
  56. * Context for processing result messages.
  57. */
  58. struct ResultContext
  59. {
  60. /**
  61. * Function to call with the result.
  62. */
  63. GNUNET_DATASTORE_DatumProcessor proc;
  64. /**
  65. * Closure for proc.
  66. */
  67. void *proc_cls;
  68. };
  69. /**
  70. * Context for a queue operation.
  71. */
  72. union QueueContext
  73. {
  74. struct StatusContext sc;
  75. struct ResultContext rc;
  76. };
  77. /**
  78. * Entry in our priority queue.
  79. */
  80. struct GNUNET_DATASTORE_QueueEntry
  81. {
  82. /**
  83. * This is a linked list.
  84. */
  85. struct GNUNET_DATASTORE_QueueEntry *next;
  86. /**
  87. * This is a linked list.
  88. */
  89. struct GNUNET_DATASTORE_QueueEntry *prev;
  90. /**
  91. * Handle to the master context.
  92. */
  93. struct GNUNET_DATASTORE_Handle *h;
  94. /**
  95. * Response processor (NULL if we are not waiting for a response).
  96. * This struct should be used for the closure, function-specific
  97. * arguments can be passed via 'qc'.
  98. */
  99. GNUNET_CLIENT_MessageHandler response_proc;
  100. /**
  101. * Function to call after transmission of the request.
  102. */
  103. GNUNET_DATASTORE_ContinuationWithStatus cont;
  104. /**
  105. * Closure for 'cont'.
  106. */
  107. void *cont_cls;
  108. /**
  109. * Context for the operation.
  110. */
  111. union QueueContext qc;
  112. /**
  113. * Task for timeout signalling.
  114. */
  115. struct GNUNET_SCHEDULER_Task * task;
  116. /**
  117. * Timeout for the current operation.
  118. */
  119. struct GNUNET_TIME_Absolute timeout;
  120. /**
  121. * Priority in the queue.
  122. */
  123. unsigned int priority;
  124. /**
  125. * Maximum allowed length of queue (otherwise
  126. * this request should be discarded).
  127. */
  128. unsigned int max_queue;
  129. /**
  130. * Number of bytes in the request message following
  131. * this struct. 32-bit value for nicer memory
  132. * access (and overall struct alignment).
  133. */
  134. uint32_t message_size;
  135. /**
  136. * Has this message been transmitted to the service?
  137. * Only ever GNUNET_YES for the head of the queue.
  138. * Note that the overall struct should end at a
  139. * multiple of 64 bits.
  140. */
  141. int was_transmitted;
  142. };
  143. /**
  144. * Handle to the datastore service.
  145. */
  146. struct GNUNET_DATASTORE_Handle
  147. {
  148. /**
  149. * Our configuration.
  150. */
  151. const struct GNUNET_CONFIGURATION_Handle *cfg;
  152. /**
  153. * Current connection to the datastore service.
  154. */
  155. struct GNUNET_CLIENT_Connection *client;
  156. /**
  157. * Handle for statistics.
  158. */
  159. struct GNUNET_STATISTICS_Handle *stats;
  160. /**
  161. * Current transmit handle.
  162. */
  163. struct GNUNET_CLIENT_TransmitHandle *th;
  164. /**
  165. * Current head of priority queue.
  166. */
  167. struct GNUNET_DATASTORE_QueueEntry *queue_head;
  168. /**
  169. * Current tail of priority queue.
  170. */
  171. struct GNUNET_DATASTORE_QueueEntry *queue_tail;
  172. /**
  173. * Task for trying to reconnect.
  174. */
  175. struct GNUNET_SCHEDULER_Task * reconnect_task;
  176. /**
  177. * How quickly should we retry? Used for exponential back-off on
  178. * connect-errors.
  179. */
  180. struct GNUNET_TIME_Relative retry_time;
  181. /**
  182. * Number of entries in the queue.
  183. */
  184. unsigned int queue_size;
  185. /**
  186. * Number of results we're receiving for the current query
  187. * after application stopped to care. Used to determine when
  188. * to reset the connection.
  189. */
  190. unsigned int result_count;
  191. /**
  192. * Are we currently trying to receive from the service?
  193. */
  194. int in_receive;
  195. /**
  196. * We should ignore the next message(s) from the service.
  197. */
  198. unsigned int skip_next_messages;
  199. };
  200. /**
  201. * Connect to the datastore service.
  202. *
  203. * @param cfg configuration to use
  204. * @return handle to use to access the service
  205. */
  206. struct GNUNET_DATASTORE_Handle *
  207. GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
  208. {
  209. struct GNUNET_CLIENT_Connection *c;
  210. struct GNUNET_DATASTORE_Handle *h;
  211. c = GNUNET_CLIENT_connect ("datastore", cfg);
  212. if (c == NULL)
  213. return NULL; /* oops */
  214. h = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_Handle) +
  215. GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
  216. h->client = c;
  217. h->cfg = cfg;
  218. h->stats = GNUNET_STATISTICS_create ("datastore-api", cfg);
  219. return h;
  220. }
  221. /**
  222. * Task used by 'transmit_drop' to disconnect the datastore.
  223. *
  224. * @param cls the datastore handle
  225. * @param tc scheduler context
  226. */
  227. static void
  228. disconnect_after_drop (void *cls,
  229. const struct GNUNET_SCHEDULER_TaskContext *tc)
  230. {
  231. struct GNUNET_DATASTORE_Handle *h = cls;
  232. GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
  233. }
  234. /**
  235. * Transmit DROP message to datastore service.
  236. *
  237. * @param cls the `struct GNUNET_DATASTORE_Handle`
  238. * @param size number of bytes that can be copied to @a buf
  239. * @param buf where to copy the drop message
  240. * @return number of bytes written to @a buf
  241. */
  242. static size_t
  243. transmit_drop (void *cls, size_t size, void *buf)
  244. {
  245. struct GNUNET_DATASTORE_Handle *h = cls;
  246. struct GNUNET_MessageHeader *hdr;
  247. if (buf == NULL)
  248. {
  249. LOG (GNUNET_ERROR_TYPE_WARNING,
  250. _("Failed to transmit request to drop database.\n"));
  251. GNUNET_SCHEDULER_add_continuation (&disconnect_after_drop, h,
  252. GNUNET_SCHEDULER_REASON_PREREQ_DONE);
  253. return 0;
  254. }
  255. GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
  256. hdr = buf;
  257. hdr->size = htons (sizeof (struct GNUNET_MessageHeader));
  258. hdr->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
  259. GNUNET_SCHEDULER_add_continuation (&disconnect_after_drop, h,
  260. GNUNET_SCHEDULER_REASON_PREREQ_DONE);
  261. return sizeof (struct GNUNET_MessageHeader);
  262. }
  263. /**
  264. * Disconnect from the datastore service (and free
  265. * associated resources).
  266. *
  267. * @param h handle to the datastore
  268. * @param drop set to #GNUNET_YES to delete all data in datastore (!)
  269. */
  270. void
  271. GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, int drop)
  272. {
  273. struct GNUNET_DATASTORE_QueueEntry *qe;
  274. LOG (GNUNET_ERROR_TYPE_DEBUG, "Datastore disconnect\n");
  275. if (NULL != h->th)
  276. {
  277. GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
  278. h->th = NULL;
  279. }
  280. if (h->client != NULL)
  281. {
  282. GNUNET_CLIENT_disconnect (h->client);
  283. h->client = NULL;
  284. }
  285. if (h->reconnect_task != NULL)
  286. {
  287. GNUNET_SCHEDULER_cancel (h->reconnect_task);
  288. h->reconnect_task = NULL;
  289. }
  290. while (NULL != (qe = h->queue_head))
  291. {
  292. GNUNET_assert (NULL != qe->response_proc);
  293. qe->response_proc (h, NULL);
  294. }
  295. if (GNUNET_YES == drop)
  296. {
  297. h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
  298. if (h->client != NULL)
  299. {
  300. if (NULL !=
  301. GNUNET_CLIENT_notify_transmit_ready (h->client,
  302. sizeof (struct
  303. GNUNET_MessageHeader),
  304. GNUNET_TIME_UNIT_MINUTES,
  305. GNUNET_YES, &transmit_drop, h))
  306. return;
  307. GNUNET_CLIENT_disconnect (h->client);
  308. h->client = NULL;
  309. }
  310. GNUNET_break (0);
  311. }
  312. GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO);
  313. h->stats = NULL;
  314. GNUNET_free (h);
  315. }
  316. /**
  317. * A request has timed out (before being transmitted to the service).
  318. *
  319. * @param cls the `struct GNUNET_DATASTORE_QueueEntry`
  320. * @param tc scheduler context
  321. */
  322. static void
  323. timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  324. {
  325. struct GNUNET_DATASTORE_QueueEntry *qe = cls;
  326. struct GNUNET_DATASTORE_Handle *h = qe->h;
  327. GNUNET_STATISTICS_update (h->stats,
  328. gettext_noop ("# queue entry timeouts"), 1,
  329. GNUNET_NO);
  330. qe->task = NULL;
  331. GNUNET_assert (GNUNET_NO == qe->was_transmitted);
  332. LOG (GNUNET_ERROR_TYPE_DEBUG,
  333. "Timeout of request in datastore queue\n");
  334. /* response_proc's expect request at the head of the queue! */
  335. GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe);
  336. GNUNET_CONTAINER_DLL_insert (h->queue_head, h->queue_tail, qe);
  337. GNUNET_assert (h->queue_head == qe);
  338. qe->response_proc (qe->h, NULL);
  339. }
  340. /**
  341. * Create a new entry for our priority queue (and possibly discard other entires if
  342. * the queue is getting too long).
  343. *
  344. * @param h handle to the datastore
  345. * @param msize size of the message to queue
  346. * @param queue_priority priority of the entry
  347. * @param max_queue_size at what queue size should this request be dropped
  348. * (if other requests of higher priority are in the queue)
  349. * @param timeout timeout for the operation
  350. * @param response_proc function to call with replies (can be NULL)
  351. * @param qc client context (NOT a closure for @a response_proc)
  352. * @return NULL if the queue is full
  353. */
  354. static struct GNUNET_DATASTORE_QueueEntry *
  355. make_queue_entry (struct GNUNET_DATASTORE_Handle *h, size_t msize,
  356. unsigned int queue_priority, unsigned int max_queue_size,
  357. struct GNUNET_TIME_Relative timeout,
  358. GNUNET_CLIENT_MessageHandler response_proc,
  359. const union QueueContext *qc)
  360. {
  361. struct GNUNET_DATASTORE_QueueEntry *ret;
  362. struct GNUNET_DATASTORE_QueueEntry *pos;
  363. unsigned int c;
  364. c = 0;
  365. pos = h->queue_head;
  366. while ((pos != NULL) && (c < max_queue_size) &&
  367. (pos->priority >= queue_priority))
  368. {
  369. c++;
  370. pos = pos->next;
  371. }
  372. if (c >= max_queue_size)
  373. {
  374. GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue overflows"), 1,
  375. GNUNET_NO);
  376. return NULL;
  377. }
  378. ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
  379. ret->h = h;
  380. ret->response_proc = response_proc;
  381. ret->qc = *qc;
  382. ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
  383. ret->priority = queue_priority;
  384. ret->max_queue = max_queue_size;
  385. ret->message_size = msize;
  386. ret->was_transmitted = GNUNET_NO;
  387. if (pos == NULL)
  388. {
  389. /* append at the tail */
  390. pos = h->queue_tail;
  391. }
  392. else
  393. {
  394. pos = pos->prev;
  395. /* do not insert at HEAD if HEAD query was already
  396. * transmitted and we are still receiving replies! */
  397. if ((pos == NULL) && (h->queue_head->was_transmitted))
  398. pos = h->queue_head;
  399. }
  400. c++;
  401. #if INSANE_STATISTICS
  402. GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue entries created"),
  403. 1, GNUNET_NO);
  404. #endif
  405. GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, ret);
  406. h->queue_size++;
  407. ret->task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, ret);
  408. for (pos = ret->next; NULL != pos; pos = pos->next)
  409. {
  410. if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == GNUNET_NO))
  411. {
  412. GNUNET_assert (NULL != pos->response_proc);
  413. /* move 'pos' element to head so that it will be
  414. * killed on 'NULL' call below */
  415. LOG (GNUNET_ERROR_TYPE_DEBUG,
  416. "Dropping request from datastore queue\n");
  417. /* response_proc's expect request at the head of the queue! */
  418. GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, pos);
  419. GNUNET_CONTAINER_DLL_insert (h->queue_head, h->queue_tail, pos);
  420. GNUNET_STATISTICS_update (h->stats,
  421. gettext_noop
  422. ("# Requests dropped from datastore queue"), 1,
  423. GNUNET_NO);
  424. GNUNET_assert (h->queue_head == pos);
  425. pos->response_proc (h, NULL);
  426. break;
  427. }
  428. }
  429. return ret;
  430. }
  431. /**
  432. * Process entries in the queue (or do nothing if we are already
  433. * doing so).
  434. *
  435. * @param h handle to the datastore
  436. */
  437. static void
  438. process_queue (struct GNUNET_DATASTORE_Handle *h);
  439. /**
  440. * Try reconnecting to the datastore service.
  441. *
  442. * @param cls the `struct GNUNET_DATASTORE_Handle`
  443. * @param tc scheduler context
  444. */
  445. static void
  446. try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  447. {
  448. struct GNUNET_DATASTORE_Handle *h = cls;
  449. h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
  450. h->reconnect_task = NULL;
  451. h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
  452. if (h->client == NULL)
  453. {
  454. LOG (GNUNET_ERROR_TYPE_ERROR, "DATASTORE reconnect failed (fatally)\n");
  455. return;
  456. }
  457. GNUNET_STATISTICS_update (h->stats,
  458. gettext_noop
  459. ("# datastore connections (re)created"), 1,
  460. GNUNET_NO);
  461. LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n");
  462. process_queue (h);
  463. }
  464. /**
  465. * Disconnect from the service and then try reconnecting to the datastore service
  466. * after some delay.
  467. *
  468. * @param h handle to datastore to disconnect and reconnect
  469. */
  470. static void
  471. do_disconnect (struct GNUNET_DATASTORE_Handle *h)
  472. {
  473. if (NULL == h->client)
  474. {
  475. LOG (GNUNET_ERROR_TYPE_DEBUG,
  476. "Client NULL in disconnect, will not try to reconnect\n");
  477. return;
  478. }
  479. GNUNET_CLIENT_disconnect (h->client);
  480. h->skip_next_messages = 0;
  481. h->client = NULL;
  482. h->reconnect_task =
  483. GNUNET_SCHEDULER_add_delayed (h->retry_time, &try_reconnect, h);
  484. }
  485. /**
  486. * Function called whenever we receive a message from
  487. * the service. Calls the appropriate handler.
  488. *
  489. * @param cls the `struct GNUNET_DATASTORE_Handle`
  490. * @param msg the received message
  491. */
  492. static void
  493. receive_cb (void *cls,
  494. const struct GNUNET_MessageHeader *msg)
  495. {
  496. struct GNUNET_DATASTORE_Handle *h = cls;
  497. struct GNUNET_DATASTORE_QueueEntry *qe;
  498. h->in_receive = GNUNET_NO;
  499. LOG (GNUNET_ERROR_TYPE_DEBUG,
  500. "Receiving reply from datastore\n");
  501. if (h->skip_next_messages > 0)
  502. {
  503. h->skip_next_messages--;
  504. process_queue (h);
  505. return;
  506. }
  507. if (NULL == (qe = h->queue_head))
  508. {
  509. GNUNET_break (0);
  510. process_queue (h);
  511. return;
  512. }
  513. qe->response_proc (h, msg);
  514. }
  515. /**
  516. * Transmit request from queue to datastore service.
  517. *
  518. * @param cls the `struct GNUNET_DATASTORE_Handle`
  519. * @param size number of bytes that can be copied to @a buf
  520. * @param buf where to copy the drop message
  521. * @return number of bytes written to @a buf
  522. */
  523. static size_t
  524. transmit_request (void *cls,
  525. size_t size,
  526. void *buf)
  527. {
  528. struct GNUNET_DATASTORE_Handle *h = cls;
  529. struct GNUNET_DATASTORE_QueueEntry *qe;
  530. size_t msize;
  531. h->th = NULL;
  532. if (NULL == (qe = h->queue_head))
  533. return 0; /* no entry in queue */
  534. if (NULL == buf)
  535. {
  536. LOG (GNUNET_ERROR_TYPE_DEBUG,
  537. "Failed to transmit request to DATASTORE.\n");
  538. GNUNET_STATISTICS_update (h->stats,
  539. gettext_noop ("# transmission request failures"),
  540. 1, GNUNET_NO);
  541. do_disconnect (h);
  542. return 0;
  543. }
  544. if (size < (msize = qe->message_size))
  545. {
  546. process_queue (h);
  547. return 0;
  548. }
  549. LOG (GNUNET_ERROR_TYPE_DEBUG,
  550. "Transmitting %u byte request to DATASTORE\n",
  551. msize);
  552. memcpy (buf, &qe[1], msize);
  553. qe->was_transmitted = GNUNET_YES;
  554. GNUNET_SCHEDULER_cancel (qe->task);
  555. qe->task = NULL;
  556. GNUNET_assert (GNUNET_NO == h->in_receive);
  557. h->in_receive = GNUNET_YES;
  558. GNUNET_CLIENT_receive (h->client,
  559. &receive_cb, h,
  560. GNUNET_TIME_absolute_get_remaining (qe->timeout));
  561. #if INSANE_STATISTICS
  562. GNUNET_STATISTICS_update (h->stats,
  563. gettext_noop ("# bytes sent to datastore"), msize,
  564. GNUNET_NO);
  565. #endif
  566. return msize;
  567. }
  568. /**
  569. * Process entries in the queue (or do nothing if we are already
  570. * doing so).
  571. *
  572. * @param h handle to the datastore
  573. */
  574. static void
  575. process_queue (struct GNUNET_DATASTORE_Handle *h)
  576. {
  577. struct GNUNET_DATASTORE_QueueEntry *qe;
  578. if (NULL == (qe = h->queue_head))
  579. {
  580. /* no entry in queue */
  581. LOG (GNUNET_ERROR_TYPE_DEBUG,
  582. "Queue empty\n");
  583. return;
  584. }
  585. if (GNUNET_YES == qe->was_transmitted)
  586. {
  587. /* waiting for replies */
  588. LOG (GNUNET_ERROR_TYPE_DEBUG,
  589. "Head request already transmitted\n");
  590. return;
  591. }
  592. if (NULL != h->th)
  593. {
  594. /* request pending */
  595. LOG (GNUNET_ERROR_TYPE_DEBUG,
  596. "Pending transmission request\n");
  597. return;
  598. }
  599. if (NULL == h->client)
  600. {
  601. /* waiting for reconnect */
  602. LOG (GNUNET_ERROR_TYPE_DEBUG,
  603. "Not connected\n");
  604. return;
  605. }
  606. if (GNUNET_YES == h->in_receive)
  607. {
  608. /* wait for response to previous query */
  609. return;
  610. }
  611. LOG (GNUNET_ERROR_TYPE_DEBUG,
  612. "Queueing %u byte request to DATASTORE\n",
  613. qe->message_size);
  614. h->th
  615. = GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size,
  616. GNUNET_TIME_absolute_get_remaining (qe->timeout),
  617. GNUNET_YES,
  618. &transmit_request, h);
  619. GNUNET_assert (GNUNET_NO == h->in_receive);
  620. GNUNET_break (NULL != h->th);
  621. }
  622. /**
  623. * Dummy continuation used to do nothing (but be non-zero).
  624. *
  625. * @param cls closure
  626. * @param result result
  627. * @param min_expiration expiration time
  628. * @param emsg error message
  629. */
  630. static void
  631. drop_status_cont (void *cls, int32_t result,
  632. struct GNUNET_TIME_Absolute min_expiration,
  633. const char *emsg)
  634. {
  635. /* do nothing */
  636. }
  637. /**
  638. * Free a queue entry. Removes the given entry from the
  639. * queue and releases associated resources. Does NOT
  640. * call the callback.
  641. *
  642. * @param qe entry to free.
  643. */
  644. static void
  645. free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
  646. {
  647. struct GNUNET_DATASTORE_Handle *h = qe->h;
  648. GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe);
  649. if (qe->task != NULL)
  650. {
  651. GNUNET_SCHEDULER_cancel (qe->task);
  652. qe->task = NULL;
  653. }
  654. h->queue_size--;
  655. qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */
  656. GNUNET_free (qe);
  657. }
  658. /**
  659. * Type of a function to call when we receive a message
  660. * from the service.
  661. *
  662. * @param cls closure
  663. * @param msg message received, NULL on timeout or fatal error
  664. */
  665. static void
  666. process_status_message (void *cls,
  667. const struct GNUNET_MessageHeader *msg)
  668. {
  669. struct GNUNET_DATASTORE_Handle *h = cls;
  670. struct GNUNET_DATASTORE_QueueEntry *qe;
  671. struct StatusContext rc;
  672. const struct StatusMessage *sm;
  673. const char *emsg;
  674. int32_t status;
  675. int was_transmitted;
  676. if (NULL == (qe = h->queue_head))
  677. {
  678. GNUNET_break (0);
  679. do_disconnect (h);
  680. return;
  681. }
  682. rc = qe->qc.sc;
  683. if (NULL == msg)
  684. {
  685. was_transmitted = qe->was_transmitted;
  686. free_queue_entry (qe);
  687. if (was_transmitted == GNUNET_YES)
  688. do_disconnect (h);
  689. else
  690. process_queue (h);
  691. if (NULL != rc.cont)
  692. rc.cont (rc.cont_cls, GNUNET_SYSERR,
  693. GNUNET_TIME_UNIT_ZERO_ABS,
  694. _("Failed to receive status response from database."));
  695. return;
  696. }
  697. GNUNET_assert (GNUNET_YES == qe->was_transmitted);
  698. free_queue_entry (qe);
  699. if ((ntohs (msg->size) < sizeof (struct StatusMessage)) ||
  700. (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS))
  701. {
  702. GNUNET_break (0);
  703. h->retry_time = GNUNET_TIME_UNIT_ZERO;
  704. do_disconnect (h);
  705. if (rc.cont != NULL)
  706. rc.cont (rc.cont_cls, GNUNET_SYSERR,
  707. GNUNET_TIME_UNIT_ZERO_ABS,
  708. _("Error reading response from datastore service"));
  709. return;
  710. }
  711. sm = (const struct StatusMessage *) msg;
  712. status = ntohl (sm->status);
  713. emsg = NULL;
  714. if (ntohs (msg->size) > sizeof (struct StatusMessage))
  715. {
  716. emsg = (const char *) &sm[1];
  717. if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0')
  718. {
  719. GNUNET_break (0);
  720. emsg = _("Invalid error message received from datastore service");
  721. }
  722. }
  723. if ((status == GNUNET_SYSERR) && (emsg == NULL))
  724. {
  725. GNUNET_break (0);
  726. emsg = _("Invalid error message received from datastore service");
  727. }
  728. LOG (GNUNET_ERROR_TYPE_DEBUG, "Received status %d/%s\n", (int) status, emsg);
  729. GNUNET_STATISTICS_update (h->stats,
  730. gettext_noop ("# status messages received"), 1,
  731. GNUNET_NO);
  732. h->retry_time = GNUNET_TIME_UNIT_ZERO;
  733. process_queue (h);
  734. if (rc.cont != NULL)
  735. rc.cont (rc.cont_cls, status,
  736. GNUNET_TIME_absolute_ntoh (sm->min_expiration),
  737. emsg);
  738. }
  739. /**
  740. * Store an item in the datastore. If the item is already present,
  741. * the priorities are summed up and the higher expiration time and
  742. * lower anonymity level is used.
  743. *
  744. * @param h handle to the datastore
  745. * @param rid reservation ID to use (from "reserve"); use 0 if no
  746. * prior reservation was made
  747. * @param key key for the value
  748. * @param size number of bytes in data
  749. * @param data content stored
  750. * @param type type of the content
  751. * @param priority priority of the content
  752. * @param anonymity anonymity-level for the content
  753. * @param replication how often should the content be replicated to other peers?
  754. * @param expiration expiration time for the content
  755. * @param queue_priority ranking of this request in the priority queue
  756. * @param max_queue_size at what queue size should this request be dropped
  757. * (if other requests of higher priority are in the queue)
  758. * @param timeout timeout for the operation
  759. * @param cont continuation to call when done
  760. * @param cont_cls closure for @a cont
  761. * @return NULL if the entry was not queued, otherwise a handle that can be used to
  762. * cancel; note that even if NULL is returned, the callback will be invoked
  763. * (or rather, will already have been invoked)
  764. */
  765. struct GNUNET_DATASTORE_QueueEntry *
  766. GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, uint32_t rid,
  767. const struct GNUNET_HashCode * key, size_t size,
  768. const void *data, enum GNUNET_BLOCK_Type type,
  769. uint32_t priority, uint32_t anonymity,
  770. uint32_t replication,
  771. struct GNUNET_TIME_Absolute expiration,
  772. unsigned int queue_priority, unsigned int max_queue_size,
  773. struct GNUNET_TIME_Relative timeout,
  774. GNUNET_DATASTORE_ContinuationWithStatus cont,
  775. void *cont_cls)
  776. {
  777. struct GNUNET_DATASTORE_QueueEntry *qe;
  778. struct DataMessage *dm;
  779. size_t msize;
  780. union QueueContext qc;
  781. LOG (GNUNET_ERROR_TYPE_DEBUG,
  782. "Asked to put %u bytes of data under key `%s' for %s\n", size,
  783. GNUNET_h2s (key),
  784. GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
  785. GNUNET_YES));
  786. msize = sizeof (struct DataMessage) + size;
  787. GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
  788. qc.sc.cont = cont;
  789. qc.sc.cont_cls = cont_cls;
  790. qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
  791. &process_status_message, &qc);
  792. if (qe == NULL)
  793. {
  794. LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for PUT\n");
  795. return NULL;
  796. }
  797. GNUNET_STATISTICS_update (h->stats, gettext_noop ("# PUT requests executed"),
  798. 1, GNUNET_NO);
  799. dm = (struct DataMessage *) &qe[1];
  800. dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
  801. dm->header.size = htons (msize);
  802. dm->rid = htonl (rid);
  803. dm->size = htonl ((uint32_t) size);
  804. dm->type = htonl (type);
  805. dm->priority = htonl (priority);
  806. dm->anonymity = htonl (anonymity);
  807. dm->replication = htonl (replication);
  808. dm->reserved = htonl (0);
  809. dm->uid = GNUNET_htonll (0);
  810. dm->expiration = GNUNET_TIME_absolute_hton (expiration);
  811. dm->key = *key;
  812. memcpy (&dm[1], data, size);
  813. process_queue (h);
  814. return qe;
  815. }
  816. /**
  817. * Reserve space in the datastore. This function should be used
  818. * to avoid "out of space" failures during a longer sequence of "put"
  819. * operations (for example, when a file is being inserted).
  820. *
  821. * @param h handle to the datastore
  822. * @param amount how much space (in bytes) should be reserved (for content only)
  823. * @param entries how many entries will be created (to calculate per-entry overhead)
  824. * @param cont continuation to call when done; "success" will be set to
  825. * a positive reservation value if space could be reserved.
  826. * @param cont_cls closure for @a cont
  827. * @return NULL if the entry was not queued, otherwise a handle that can be used to
  828. * cancel; note that even if NULL is returned, the callback will be invoked
  829. * (or rather, will already have been invoked)
  830. */
  831. struct GNUNET_DATASTORE_QueueEntry *
  832. GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount,
  833. uint32_t entries,
  834. GNUNET_DATASTORE_ContinuationWithStatus cont,
  835. void *cont_cls)
  836. {
  837. struct GNUNET_DATASTORE_QueueEntry *qe;
  838. struct ReserveMessage *rm;
  839. union QueueContext qc;
  840. if (NULL == cont)
  841. cont = &drop_status_cont;
  842. LOG (GNUNET_ERROR_TYPE_DEBUG,
  843. "Asked to reserve %llu bytes of data and %u entries\n",
  844. (unsigned long long) amount, (unsigned int) entries);
  845. qc.sc.cont = cont;
  846. qc.sc.cont_cls = cont_cls;
  847. qe = make_queue_entry (h,
  848. sizeof (struct ReserveMessage),
  849. UINT_MAX,
  850. UINT_MAX,
  851. GNUNET_TIME_UNIT_FOREVER_REL,
  852. &process_status_message, &qc);
  853. if (NULL == qe)
  854. {
  855. LOG (GNUNET_ERROR_TYPE_DEBUG,
  856. "Could not create queue entry to reserve\n");
  857. return NULL;
  858. }
  859. GNUNET_STATISTICS_update (h->stats,
  860. gettext_noop ("# RESERVE requests executed"), 1,
  861. GNUNET_NO);
  862. rm = (struct ReserveMessage *) &qe[1];
  863. rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
  864. rm->header.size = htons (sizeof (struct ReserveMessage));
  865. rm->entries = htonl (entries);
  866. rm->amount = GNUNET_htonll (amount);
  867. process_queue (h);
  868. return qe;
  869. }
  870. /**
  871. * Signal that all of the data for which a reservation was made has
  872. * been stored and that whatever excess space might have been reserved
  873. * can now be released.
  874. *
  875. * @param h handle to the datastore
  876. * @param rid reservation ID (value of "success" in original continuation
  877. * from the "reserve" function).
  878. * @param queue_priority ranking of this request in the priority queue
  879. * @param max_queue_size at what queue size should this request be dropped
  880. * (if other requests of higher priority are in the queue)
  881. * @param queue_priority ranking of this request in the priority queue
  882. * @param max_queue_size at what queue size should this request be dropped
  883. * (if other requests of higher priority are in the queue)
  884. * @param timeout how long to wait at most for a response
  885. * @param cont continuation to call when done
  886. * @param cont_cls closure for @a cont
  887. * @return NULL if the entry was not queued, otherwise a handle that can be used to
  888. * cancel; note that even if NULL is returned, the callback will be invoked
  889. * (or rather, will already have been invoked)
  890. */
  891. struct GNUNET_DATASTORE_QueueEntry *
  892. GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
  893. uint32_t rid, unsigned int queue_priority,
  894. unsigned int max_queue_size,
  895. struct GNUNET_TIME_Relative timeout,
  896. GNUNET_DATASTORE_ContinuationWithStatus cont,
  897. void *cont_cls)
  898. {
  899. struct GNUNET_DATASTORE_QueueEntry *qe;
  900. struct ReleaseReserveMessage *rrm;
  901. union QueueContext qc;
  902. if (cont == NULL)
  903. cont = &drop_status_cont;
  904. LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to release reserve %d\n", rid);
  905. qc.sc.cont = cont;
  906. qc.sc.cont_cls = cont_cls;
  907. qe = make_queue_entry (h, sizeof (struct ReleaseReserveMessage),
  908. queue_priority, max_queue_size, timeout,
  909. &process_status_message, &qc);
  910. if (qe == NULL)
  911. {
  912. LOG (GNUNET_ERROR_TYPE_DEBUG,
  913. "Could not create queue entry to release reserve\n");
  914. return NULL;
  915. }
  916. GNUNET_STATISTICS_update (h->stats,
  917. gettext_noop
  918. ("# RELEASE RESERVE requests executed"), 1,
  919. GNUNET_NO);
  920. rrm = (struct ReleaseReserveMessage *) &qe[1];
  921. rrm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
  922. rrm->header.size = htons (sizeof (struct ReleaseReserveMessage));
  923. rrm->rid = htonl (rid);
  924. process_queue (h);
  925. return qe;
  926. }
  927. /**
  928. * Update a value in the datastore.
  929. *
  930. * @param h handle to the datastore
  931. * @param uid identifier for the value
  932. * @param priority how much to increase the priority of the value
  933. * @param expiration new expiration value should be MAX of existing and this argument
  934. * @param queue_priority ranking of this request in the priority queue
  935. * @param max_queue_size at what queue size should this request be dropped
  936. * (if other requests of higher priority are in the queue)
  937. * @param timeout how long to wait at most for a response
  938. * @param cont continuation to call when done
  939. * @param cont_cls closure for @a cont
  940. * @return NULL if the entry was not queued, otherwise a handle that can be used to
  941. * cancel; note that even if NULL is returned, the callback will be invoked
  942. * (or rather, will already have been invoked)
  943. */
  944. struct GNUNET_DATASTORE_QueueEntry *
  945. GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, uint64_t uid,
  946. uint32_t priority,
  947. struct GNUNET_TIME_Absolute expiration,
  948. unsigned int queue_priority,
  949. unsigned int max_queue_size,
  950. struct GNUNET_TIME_Relative timeout,
  951. GNUNET_DATASTORE_ContinuationWithStatus cont,
  952. void *cont_cls)
  953. {
  954. struct GNUNET_DATASTORE_QueueEntry *qe;
  955. struct UpdateMessage *um;
  956. union QueueContext qc;
  957. if (cont == NULL)
  958. cont = &drop_status_cont;
  959. LOG (GNUNET_ERROR_TYPE_DEBUG,
  960. "Asked to update entry %llu raising priority by %u and expiration to %s\n",
  961. uid,
  962. (unsigned int) priority,
  963. GNUNET_STRINGS_absolute_time_to_string (expiration));
  964. qc.sc.cont = cont;
  965. qc.sc.cont_cls = cont_cls;
  966. qe = make_queue_entry (h, sizeof (struct UpdateMessage), queue_priority,
  967. max_queue_size, timeout, &process_status_message, &qc);
  968. if (qe == NULL)
  969. {
  970. LOG (GNUNET_ERROR_TYPE_DEBUG,
  971. "Could not create queue entry for UPDATE\n");
  972. return NULL;
  973. }
  974. GNUNET_STATISTICS_update (h->stats,
  975. gettext_noop ("# UPDATE requests executed"), 1,
  976. GNUNET_NO);
  977. um = (struct UpdateMessage *) &qe[1];
  978. um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
  979. um->header.size = htons (sizeof (struct UpdateMessage));
  980. um->priority = htonl (priority);
  981. um->expiration = GNUNET_TIME_absolute_hton (expiration);
  982. um->uid = GNUNET_htonll (uid);
  983. process_queue (h);
  984. return qe;
  985. }
  986. /**
  987. * Explicitly remove some content from the database.
  988. * The @a cont continuation will be called with `status`
  989. * #GNUNET_OK" if content was removed, #GNUNET_NO
  990. * if no matching entry was found and #GNUNET_SYSERR
  991. * on all other types of errors.
  992. *
  993. * @param h handle to the datastore
  994. * @param key key for the value
  995. * @param size number of bytes in data
  996. * @param data content stored
  997. * @param queue_priority ranking of this request in the priority queue
  998. * @param max_queue_size at what queue size should this request be dropped
  999. * (if other requests of higher priority are in the queue)
  1000. * @param timeout how long to wait at most for a response
  1001. * @param cont continuation to call when done
  1002. * @param cont_cls closure for @a cont
  1003. * @return NULL if the entry was not queued, otherwise a handle that can be used to
  1004. * cancel; note that even if NULL is returned, the callback will be invoked
  1005. * (or rather, will already have been invoked)
  1006. */
  1007. struct GNUNET_DATASTORE_QueueEntry *
  1008. GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
  1009. const struct GNUNET_HashCode * key, size_t size,
  1010. const void *data, unsigned int queue_priority,
  1011. unsigned int max_queue_size,
  1012. struct GNUNET_TIME_Relative timeout,
  1013. GNUNET_DATASTORE_ContinuationWithStatus cont,
  1014. void *cont_cls)
  1015. {
  1016. struct GNUNET_DATASTORE_QueueEntry *qe;
  1017. struct DataMessage *dm;
  1018. size_t msize;
  1019. union QueueContext qc;
  1020. if (cont == NULL)
  1021. cont = &drop_status_cont;
  1022. LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to remove %u bytes under key `%s'\n",
  1023. size, GNUNET_h2s (key));
  1024. qc.sc.cont = cont;
  1025. qc.sc.cont_cls = cont_cls;
  1026. msize = sizeof (struct DataMessage) + size;
  1027. GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
  1028. qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
  1029. &process_status_message, &qc);
  1030. if (qe == NULL)
  1031. {
  1032. LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for REMOVE\n");
  1033. return NULL;
  1034. }
  1035. GNUNET_STATISTICS_update (h->stats,
  1036. gettext_noop ("# REMOVE requests executed"), 1,
  1037. GNUNET_NO);
  1038. dm = (struct DataMessage *) &qe[1];
  1039. dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
  1040. dm->header.size = htons (msize);
  1041. dm->rid = htonl (0);
  1042. dm->size = htonl (size);
  1043. dm->type = htonl (0);
  1044. dm->priority = htonl (0);
  1045. dm->anonymity = htonl (0);
  1046. dm->uid = GNUNET_htonll (0);
  1047. dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
  1048. dm->key = *key;
  1049. memcpy (&dm[1], data, size);
  1050. process_queue (h);
  1051. return qe;
  1052. }
  1053. /**
  1054. * Type of a function to call when we receive a message
  1055. * from the service.
  1056. *
  1057. * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
  1058. * @param msg message received, NULL on timeout or fatal error
  1059. */
  1060. static void
  1061. process_result_message (void *cls, const struct GNUNET_MessageHeader *msg)
  1062. {
  1063. struct GNUNET_DATASTORE_Handle *h = cls;
  1064. struct GNUNET_DATASTORE_QueueEntry *qe;
  1065. struct ResultContext rc;
  1066. const struct DataMessage *dm;
  1067. int was_transmitted;
  1068. if (NULL == msg)
  1069. {
  1070. qe = h->queue_head;
  1071. GNUNET_assert (NULL != qe);
  1072. rc = qe->qc.rc;
  1073. was_transmitted = qe->was_transmitted;
  1074. free_queue_entry (qe);
  1075. if (GNUNET_YES == was_transmitted)
  1076. {
  1077. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1078. "Failed to receive response from database.\n");
  1079. do_disconnect (h);
  1080. }
  1081. else
  1082. {
  1083. process_queue (h);
  1084. }
  1085. if (NULL != rc.proc)
  1086. rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
  1087. 0);
  1088. return;
  1089. }
  1090. if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
  1091. {
  1092. GNUNET_break (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader));
  1093. qe = h->queue_head;
  1094. rc = qe->qc.rc;
  1095. GNUNET_assert (GNUNET_YES == qe->was_transmitted);
  1096. free_queue_entry (qe);
  1097. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1098. "Received end of result set, new queue size is %u\n", h->queue_size);
  1099. h->retry_time = GNUNET_TIME_UNIT_ZERO;
  1100. h->result_count = 0;
  1101. process_queue (h);
  1102. if (NULL != rc.proc)
  1103. rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
  1104. 0);
  1105. return;
  1106. }
  1107. qe = h->queue_head;
  1108. GNUNET_assert (NULL != qe);
  1109. rc = qe->qc.rc;
  1110. if (GNUNET_YES != qe->was_transmitted)
  1111. {
  1112. GNUNET_break (0);
  1113. free_queue_entry (qe);
  1114. h->retry_time = GNUNET_TIME_UNIT_ZERO;
  1115. do_disconnect (h);
  1116. if (rc.proc != NULL)
  1117. rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
  1118. 0);
  1119. return;
  1120. }
  1121. if ((ntohs (msg->size) < sizeof (struct DataMessage)) ||
  1122. (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
  1123. (ntohs (msg->size) !=
  1124. sizeof (struct DataMessage) +
  1125. ntohl (((const struct DataMessage *) msg)->size)))
  1126. {
  1127. GNUNET_break (0);
  1128. free_queue_entry (qe);
  1129. h->retry_time = GNUNET_TIME_UNIT_ZERO;
  1130. do_disconnect (h);
  1131. if (rc.proc != NULL)
  1132. rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
  1133. 0);
  1134. return;
  1135. }
  1136. #if INSANE_STATISTICS
  1137. GNUNET_STATISTICS_update (h->stats, gettext_noop ("# Results received"), 1,
  1138. GNUNET_NO);
  1139. #endif
  1140. dm = (const struct DataMessage *) msg;
  1141. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1142. "Received result %llu with type %u and size %u with key %s\n",
  1143. (unsigned long long) GNUNET_ntohll (dm->uid), ntohl (dm->type),
  1144. ntohl (dm->size), GNUNET_h2s (&dm->key));
  1145. free_queue_entry (qe);
  1146. h->retry_time = GNUNET_TIME_UNIT_ZERO;
  1147. process_queue (h);
  1148. if (rc.proc != NULL)
  1149. rc.proc (rc.proc_cls, &dm->key, ntohl (dm->size), &dm[1], ntohl (dm->type),
  1150. ntohl (dm->priority), ntohl (dm->anonymity),
  1151. GNUNET_TIME_absolute_ntoh (dm->expiration),
  1152. GNUNET_ntohll (dm->uid));
  1153. }
  1154. /**
  1155. * Get a random value from the datastore for content replication.
  1156. * Returns a single, random value among those with the highest
  1157. * replication score, lowering positive replication scores by one for
  1158. * the chosen value (if only content with a replication score exists,
  1159. * a random value is returned and replication scores are not changed).
  1160. *
  1161. * @param h handle to the datastore
  1162. * @param queue_priority ranking of this request in the priority queue
  1163. * @param max_queue_size at what queue size should this request be dropped
  1164. * (if other requests of higher priority are in the queue)
  1165. * @param timeout how long to wait at most for a response
  1166. * @param proc function to call on a random value; it
  1167. * will be called once with a value (if available)
  1168. * and always once with a value of NULL.
  1169. * @param proc_cls closure for @a proc
  1170. * @return NULL if the entry was not queued, otherwise a handle that can be used to
  1171. * cancel
  1172. */
  1173. struct GNUNET_DATASTORE_QueueEntry *
  1174. GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
  1175. unsigned int queue_priority,
  1176. unsigned int max_queue_size,
  1177. struct GNUNET_TIME_Relative timeout,
  1178. GNUNET_DATASTORE_DatumProcessor proc,
  1179. void *proc_cls)
  1180. {
  1181. struct GNUNET_DATASTORE_QueueEntry *qe;
  1182. struct GNUNET_MessageHeader *m;
  1183. union QueueContext qc;
  1184. GNUNET_assert (NULL != proc);
  1185. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1186. "Asked to get replication entry in %s\n",
  1187. GNUNET_STRINGS_relative_time_to_string (timeout, GNUNET_YES));
  1188. qc.rc.proc = proc;
  1189. qc.rc.proc_cls = proc_cls;
  1190. qe = make_queue_entry (h, sizeof (struct GNUNET_MessageHeader),
  1191. queue_priority, max_queue_size, timeout,
  1192. &process_result_message, &qc);
  1193. if (NULL == qe)
  1194. {
  1195. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1196. "Could not create queue entry for GET REPLICATION\n");
  1197. return NULL;
  1198. }
  1199. GNUNET_STATISTICS_update (h->stats,
  1200. gettext_noop
  1201. ("# GET REPLICATION requests executed"), 1,
  1202. GNUNET_NO);
  1203. m = (struct GNUNET_MessageHeader *) &qe[1];
  1204. m->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
  1205. m->size = htons (sizeof (struct GNUNET_MessageHeader));
  1206. process_queue (h);
  1207. return qe;
  1208. }
  1209. /**
  1210. * Get a single zero-anonymity value from the datastore.
  1211. *
  1212. * @param h handle to the datastore
  1213. * @param offset offset of the result (modulo num-results); set to
  1214. * a random 64-bit value initially; then increment by
  1215. * one each time; detect that all results have been found by uid
  1216. * being again the first uid ever returned.
  1217. * @param queue_priority ranking of this request in the priority queue
  1218. * @param max_queue_size at what queue size should this request be dropped
  1219. * (if other requests of higher priority are in the queue)
  1220. * @param timeout how long to wait at most for a response
  1221. * @param type allowed type for the operation (never zero)
  1222. * @param proc function to call on a random value; it
  1223. * will be called once with a value (if available)
  1224. * or with NULL if none value exists.
  1225. * @param proc_cls closure for @a proc
  1226. * @return NULL if the entry was not queued, otherwise a handle that can be used to
  1227. * cancel
  1228. */
  1229. struct GNUNET_DATASTORE_QueueEntry *
  1230. GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
  1231. uint64_t offset,
  1232. unsigned int queue_priority,
  1233. unsigned int max_queue_size,
  1234. struct GNUNET_TIME_Relative timeout,
  1235. enum GNUNET_BLOCK_Type type,
  1236. GNUNET_DATASTORE_DatumProcessor proc,
  1237. void *proc_cls)
  1238. {
  1239. struct GNUNET_DATASTORE_QueueEntry *qe;
  1240. struct GetZeroAnonymityMessage *m;
  1241. union QueueContext qc;
  1242. GNUNET_assert (NULL != proc);
  1243. GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
  1244. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1245. "Asked to get %llu-th zero-anonymity entry of type %d in %s\n",
  1246. (unsigned long long) offset, type,
  1247. GNUNET_STRINGS_relative_time_to_string (timeout, GNUNET_YES));
  1248. qc.rc.proc = proc;
  1249. qc.rc.proc_cls = proc_cls;
  1250. qe = make_queue_entry (h, sizeof (struct GetZeroAnonymityMessage),
  1251. queue_priority, max_queue_size, timeout,
  1252. &process_result_message, &qc);
  1253. if (NULL == qe)
  1254. {
  1255. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1256. "Could not create queue entry for zero-anonymity procation\n");
  1257. return NULL;
  1258. }
  1259. GNUNET_STATISTICS_update (h->stats,
  1260. gettext_noop
  1261. ("# GET ZERO ANONYMITY requests executed"), 1,
  1262. GNUNET_NO);
  1263. m = (struct GetZeroAnonymityMessage *) &qe[1];
  1264. m->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
  1265. m->header.size = htons (sizeof (struct GetZeroAnonymityMessage));
  1266. m->type = htonl ((uint32_t) type);
  1267. m->offset = GNUNET_htonll (offset);
  1268. process_queue (h);
  1269. return qe;
  1270. }
  1271. /**
  1272. * Get a result for a particular key from the datastore. The processor
  1273. * will only be called once.
  1274. *
  1275. * @param h handle to the datastore
  1276. * @param offset offset of the result (modulo num-results); set to
  1277. * a random 64-bit value initially; then increment by
  1278. * one each time; detect that all results have been found by uid
  1279. * being again the first uid ever returned.
  1280. * @param key maybe NULL (to match all entries)
  1281. * @param type desired type, 0 for any
  1282. * @param queue_priority ranking of this request in the priority queue
  1283. * @param max_queue_size at what queue size should this request be dropped
  1284. * (if other requests of higher priority are in the queue)
  1285. * @param timeout how long to wait at most for a response
  1286. * @param proc function to call on each matching value;
  1287. * will be called once with a NULL value at the end
  1288. * @param proc_cls closure for @a proc
  1289. * @return NULL if the entry was not queued, otherwise a handle that can be used to
  1290. * cancel
  1291. */
  1292. struct GNUNET_DATASTORE_QueueEntry *
  1293. GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, uint64_t offset,
  1294. const struct GNUNET_HashCode * key,
  1295. enum GNUNET_BLOCK_Type type,
  1296. unsigned int queue_priority,
  1297. unsigned int max_queue_size,
  1298. struct GNUNET_TIME_Relative timeout,
  1299. GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
  1300. {
  1301. struct GNUNET_DATASTORE_QueueEntry *qe;
  1302. struct GetMessage *gm;
  1303. union QueueContext qc;
  1304. GNUNET_assert (NULL != proc);
  1305. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1306. "Asked to look for data of type %u under key `%s'\n",
  1307. (unsigned int) type, GNUNET_h2s (key));
  1308. qc.rc.proc = proc;
  1309. qc.rc.proc_cls = proc_cls;
  1310. qe = make_queue_entry (h, sizeof (struct GetMessage), queue_priority,
  1311. max_queue_size, timeout, &process_result_message, &qc);
  1312. if (qe == NULL)
  1313. {
  1314. LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not queue request for `%s'\n",
  1315. GNUNET_h2s (key));
  1316. return NULL;
  1317. }
  1318. #if INSANE_STATISTICS
  1319. GNUNET_STATISTICS_update (h->stats, gettext_noop ("# GET requests executed"),
  1320. 1, GNUNET_NO);
  1321. #endif
  1322. gm = (struct GetMessage *) &qe[1];
  1323. gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET);
  1324. gm->type = htonl (type);
  1325. gm->offset = GNUNET_htonll (offset);
  1326. if (key != NULL)
  1327. {
  1328. gm->header.size = htons (sizeof (struct GetMessage));
  1329. gm->key = *key;
  1330. }
  1331. else
  1332. {
  1333. gm->header.size =
  1334. htons (sizeof (struct GetMessage) - sizeof (struct GNUNET_HashCode));
  1335. }
  1336. process_queue (h);
  1337. return qe;
  1338. }
  1339. /**
  1340. * Cancel a datastore operation. The final callback from the
  1341. * operation must not have been done yet.
  1342. *
  1343. * @param qe operation to cancel
  1344. */
  1345. void
  1346. GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
  1347. {
  1348. struct GNUNET_DATASTORE_Handle *h;
  1349. GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
  1350. h = qe->h;
  1351. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1352. "Pending DATASTORE request %p cancelled (%d, %d)\n", qe,
  1353. qe->was_transmitted, h->queue_head == qe);
  1354. if (GNUNET_YES == qe->was_transmitted)
  1355. {
  1356. free_queue_entry (qe);
  1357. h->skip_next_messages++;
  1358. return;
  1359. }
  1360. free_queue_entry (qe);
  1361. process_queue (h);
  1362. }
  1363. /* end of datastore_api.c */