gnunet-service-datastore.c 44 KB


  1. /*
  2. This file is part of GNUnet
  3. (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
  4. GNUnet is free software; you can redistribute it and/or modify
  5. it under the terms of the GNU General Public License as published
  6. by the Free Software Foundation; either version 2, or (at your
  7. option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. General Public License for more details.
  12. You should have received a copy of the GNU General Public License
  13. along with GNUnet; see the file COPYING. If not, write to the
  14. Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  15. Boston, MA 02111-1307, USA.
  16. */
  17. /**
  18. * @file datastore/gnunet-service-datastore.c
  19. * @brief Management for the datastore for files stored on a GNUnet node
  20. * @author Christian Grothoff
  21. */
  22. #include "platform.h"
  23. #include "gnunet_util_lib.h"
  24. #include "gnunet_protocols.h"
  25. #include "gnunet_statistics_service.h"
  26. #include "gnunet_datastore_plugin.h"
  27. #include "datastore.h"
  28. /**
  29. * How many messages do we queue at most per client?
  30. */
  31. #define MAX_PENDING 1024
  32. /**
  33. * How long are we at most keeping "expired" content
  34. * past the expiration date in the database?
  35. */
  36. #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
  37. /**
  38. * How fast are we allowed to query the database for deleting
  39. * expired content? (1 item per second).
  40. */
  41. #define MIN_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
  42. #define QUOTA_STAT_NAME gettext_noop ("# bytes used in file-sharing datastore")
  43. /**
  44. * After how many payload-changing operations
  45. * do we sync our statistics?
  46. */
  47. #define MAX_STAT_SYNC_LAG 50
  48. /**
  49. * Our datastore plugin.
  50. */
  51. struct DatastorePlugin
  52. {
  53. /**
  54. * API of the transport as returned by the plugin's
  55. * initialization function.
  56. */
  57. struct GNUNET_DATASTORE_PluginFunctions *api;
  58. /**
  59. * Short name for the plugin (i.e. "sqlite").
  60. */
  61. char *short_name;
  62. /**
  63. * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
  64. */
  65. char *lib_name;
  66. /**
  67. * Environment this transport service is using
  68. * for this plugin.
  69. */
  70. struct GNUNET_DATASTORE_PluginEnvironment env;
  71. };
  72. /**
  73. * Linked list of active reservations.
  74. */
  75. struct ReservationList
  76. {
  77. /**
  78. * This is a linked list.
  79. */
  80. struct ReservationList *next;
  81. /**
  82. * Client that made the reservation.
  83. */
  84. struct GNUNET_SERVER_Client *client;
  85. /**
  86. * Number of bytes (still) reserved.
  87. */
  88. uint64_t amount;
  89. /**
  90. * Number of items (still) reserved.
  91. */
  92. uint64_t entries;
  93. /**
  94. * Reservation identifier.
  95. */
  96. int32_t rid;
  97. };
  98. /**
  99. * Our datastore plugin (NULL if not available).
  100. */
  101. static struct DatastorePlugin *plugin;
  102. /**
  103. * Linked list of space reservations made by clients.
  104. */
  105. static struct ReservationList *reservations;
  106. /**
  107. * Bloomfilter to quickly tell if we don't have the content.
  108. */
  109. static struct GNUNET_CONTAINER_BloomFilter *filter;
  110. /**
  111. * How much space are we allowed to use?
  112. */
  113. static unsigned long long quota;
  114. /**
  115. * Should the database be dropped on exit?
  116. */
  117. static int do_drop;
  118. /**
  119. * How much space are we using for the cache? (space available for
  120. * insertions that will be instantly reclaimed by discarding less
  121. * important content --- or possibly whatever we just inserted into
  122. * the "cache").
  123. */
  124. static unsigned long long cache_size;
  125. /**
  126. * How much space have we currently reserved?
  127. */
  128. static unsigned long long reserved;
  129. /**
  130. * How much data are we currently storing
  131. * in the database?
  132. */
  133. static unsigned long long payload;
  134. /**
  135. * Number of updates that were made to the
  136. * payload value since we last synchronized
  137. * it with the statistics service.
  138. */
  139. static unsigned int lastSync;
  140. /**
  141. * Did we get an answer from statistics?
  142. */
  143. static int stats_worked;
  144. /**
  145. * Identity of the task that is used to delete
  146. * expired content.
  147. */
  148. static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
  149. /**
  150. * Our configuration.
  151. */
  152. const struct GNUNET_CONFIGURATION_Handle *cfg;
  153. /**
  154. * Handle for reporting statistics.
  155. */
  156. static struct GNUNET_STATISTICS_Handle *stats;
  157. /**
  158. * Synchronize our utilization statistics with the
  159. * statistics service.
  160. */
  161. static void
  162. sync_stats ()
  163. {
  164. GNUNET_STATISTICS_set (stats, QUOTA_STAT_NAME, payload, GNUNET_YES);
  165. lastSync = 0;
  166. }
  167. /**
  168. * Context for transmitting replies to clients.
  169. */
  170. struct TransmitCallbackContext
  171. {
  172. /**
  173. * We keep these in a doubly-linked list (for cleanup).
  174. */
  175. struct TransmitCallbackContext *next;
  176. /**
  177. * We keep these in a doubly-linked list (for cleanup).
  178. */
  179. struct TransmitCallbackContext *prev;
  180. /**
  181. * The message that we're asked to transmit.
  182. */
  183. struct GNUNET_MessageHeader *msg;
  184. /**
  185. * Handle for the transmission request.
  186. */
  187. struct GNUNET_CONNECTION_TransmitHandle *th;
  188. /**
  189. * Client that we are transmitting to.
  190. */
  191. struct GNUNET_SERVER_Client *client;
  192. };
  193. /**
  194. * Head of the doubly-linked list (for cleanup).
  195. */
  196. static struct TransmitCallbackContext *tcc_head;
  197. /**
  198. * Tail of the doubly-linked list (for cleanup).
  199. */
  200. static struct TransmitCallbackContext *tcc_tail;
  201. /**
  202. * Have we already cleaned up the TCCs and are hence no longer
  203. * willing (or able) to transmit anything to anyone?
  204. */
  205. static int cleaning_done;
  206. /**
  207. * Handle for pending get request.
  208. */
  209. static struct GNUNET_STATISTICS_GetHandle *stat_get;
  210. /**
  211. * Task that is used to remove expired entries from
  212. * the datastore. This task will schedule itself
  213. * again automatically to always delete all expired
  214. * content quickly.
  215. *
  216. * @param cls not used
  217. * @param tc task context
  218. */
  219. static void
  220. delete_expired (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
  221. /**
  222. * Iterate over the expired items stored in the datastore.
  223. * Delete all expired items; once we have processed all
  224. * expired items, re-schedule the "delete_expired" task.
  225. *
  226. * @param cls not used
  227. * @param key key for the content
  228. * @param size number of bytes in data
  229. * @param data content stored
  230. * @param type type of the content
  231. * @param priority priority of the content
  232. * @param anonymity anonymity-level for the content
  233. * @param expiration expiration time for the content
  234. * @param uid unique identifier for the datum;
  235. * maybe 0 if no unique identifier is available
  236. *
  237. * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
  238. * (continue on call to "next", of course),
  239. * GNUNET_NO to delete the item and continue (if supported)
  240. */
  241. static int
  242. expired_processor (void *cls, const GNUNET_HashCode * key, uint32_t size,
  243. const void *data, enum GNUNET_BLOCK_Type type,
  244. uint32_t priority, uint32_t anonymity,
  245. struct GNUNET_TIME_Absolute expiration, uint64_t uid)
  246. {
  247. struct GNUNET_TIME_Absolute now;
  248. if (key == NULL)
  249. {
  250. expired_kill_task =
  251. GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY, &delete_expired, NULL);
  252. return GNUNET_SYSERR;
  253. }
  254. now = GNUNET_TIME_absolute_get ();
  255. if (expiration.abs_value > now.abs_value)
  256. {
  257. /* finished processing */
  258. expired_kill_task =
  259. GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY, &delete_expired, NULL);
  260. return GNUNET_SYSERR;
  261. }
  262. #if DEBUG_DATASTORE
  263. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  264. "Deleting content `%s' of type %u that expired %llu ms ago\n",
  265. GNUNET_h2s (key), type,
  266. (unsigned long long) (now.abs_value - expiration.abs_value));
  267. #endif
  268. GNUNET_STATISTICS_update (stats, gettext_noop ("# bytes expired"), size,
  269. GNUNET_YES);
  270. GNUNET_CONTAINER_bloomfilter_remove (filter, key);
  271. expired_kill_task =
  272. GNUNET_SCHEDULER_add_delayed (MIN_EXPIRE_DELAY, &delete_expired, NULL);
  273. return GNUNET_NO;
  274. }
  275. /**
  276. * Task that is used to remove expired entries from
  277. * the datastore. This task will schedule itself
  278. * again automatically to always delete all expired
  279. * content quickly.
  280. *
  281. * @param cls not used
  282. * @param tc task context
  283. */
  284. static void
  285. delete_expired (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  286. {
  287. expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
  288. plugin->api->get_expiration (plugin->api->cls, &expired_processor, NULL);
  289. }
  290. /**
  291. * An iterator over a set of items stored in the datastore
  292. * that deletes until we're happy with respect to our quota.
  293. *
  294. * @param cls closure
  295. * @param key key for the content
  296. * @param size number of bytes in data
  297. * @param data content stored
  298. * @param type type of the content
  299. * @param priority priority of the content
  300. * @param anonymity anonymity-level for the content
  301. * @param expiration expiration time for the content
  302. * @param uid unique identifier for the datum;
  303. * maybe 0 if no unique identifier is available
  304. *
  305. * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
  306. * (continue on call to "next", of course),
  307. * GNUNET_NO to delete the item and continue (if supported)
  308. */
  309. static int
  310. quota_processor (void *cls, const GNUNET_HashCode * key, uint32_t size,
  311. const void *data, enum GNUNET_BLOCK_Type type,
  312. uint32_t priority, uint32_t anonymity,
  313. struct GNUNET_TIME_Absolute expiration, uint64_t uid)
  314. {
  315. unsigned long long *need = cls;
  316. if (NULL == key)
  317. return GNUNET_SYSERR;
  318. #if DEBUG_DATASTORE
  319. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  320. "Deleting %llu bytes of low-priority content `%s' of type %u (still trying to free another %llu bytes)\n",
  321. (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
  322. GNUNET_h2s (key), type, *need);
  323. #endif
  324. if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
  325. *need = 0;
  326. else
  327. *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
  328. GNUNET_STATISTICS_update (stats,
  329. gettext_noop ("# bytes purged (low-priority)"),
  330. size, GNUNET_YES);
  331. GNUNET_CONTAINER_bloomfilter_remove (filter, key);
  332. return GNUNET_NO;
  333. }
  334. /**
  335. * Manage available disk space by running tasks
  336. * that will discard content if necessary. This
  337. * function will be run whenever a request for
  338. * "need" bytes of storage could only be satisfied
  339. * by eating into the "cache" (and we want our cache
  340. * space back).
  341. *
  342. * @param need number of bytes of content that were
  343. * placed into the "cache" (and hence the
  344. * number of bytes that should be removed).
  345. */
  346. static void
  347. manage_space (unsigned long long need)
  348. {
  349. unsigned long long last;
  350. #if DEBUG_DATASTORE
  351. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  352. "Asked to free up %llu bytes of cache space\n", need);
  353. #endif
  354. last = 0;
  355. while ((need > 0) && (last != need))
  356. {
  357. last = need;
  358. plugin->api->get_expiration (plugin->api->cls, &quota_processor, &need);
  359. }
  360. }
  361. /**
  362. * Function called to notify a client about the socket
  363. * begin ready to queue more data. "buf" will be
  364. * NULL and "size" zero if the socket was closed for
  365. * writing in the meantime.
  366. *
  367. * @param cls closure
  368. * @param size number of bytes available in buf
  369. * @param buf where the callee should write the message
  370. * @return number of bytes written to buf
  371. */
  372. static size_t
  373. transmit_callback (void *cls, size_t size, void *buf)
  374. {
  375. struct TransmitCallbackContext *tcc = cls;
  376. size_t msize;
  377. tcc->th = NULL;
  378. GNUNET_CONTAINER_DLL_remove (tcc_head, tcc_tail, tcc);
  379. msize = ntohs (tcc->msg->size);
  380. if (size == 0)
  381. {
  382. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  383. _("Transmission to client failed!\n"));
  384. GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);
  385. GNUNET_SERVER_client_drop (tcc->client);
  386. GNUNET_free (tcc->msg);
  387. GNUNET_free (tcc);
  388. return 0;
  389. }
  390. GNUNET_assert (size >= msize);
  391. memcpy (buf, tcc->msg, msize);
  392. GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
  393. GNUNET_SERVER_client_drop (tcc->client);
  394. GNUNET_free (tcc->msg);
  395. GNUNET_free (tcc);
  396. return msize;
  397. }
  398. /**
  399. * Transmit the given message to the client.
  400. *
  401. * @param client target of the message
  402. * @param msg message to transmit, will be freed!
  403. */
  404. static void
  405. transmit (struct GNUNET_SERVER_Client *client, struct GNUNET_MessageHeader *msg)
  406. {
  407. struct TransmitCallbackContext *tcc;
  408. if (GNUNET_YES == cleaning_done)
  409. {
  410. #if DEBUG_DATASTORE
  411. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  412. "Shutdown in progress, aborting transmission.\n");
  413. #endif
  414. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  415. GNUNET_free (msg);
  416. return;
  417. }
  418. tcc = GNUNET_malloc (sizeof (struct TransmitCallbackContext));
  419. tcc->msg = msg;
  420. tcc->client = client;
  421. if (NULL ==
  422. (tcc->th =
  423. GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
  424. GNUNET_TIME_UNIT_FOREVER_REL,
  425. &transmit_callback, tcc)))
  426. {
  427. GNUNET_break (0);
  428. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  429. GNUNET_free (msg);
  430. GNUNET_free (tcc);
  431. return;
  432. }
  433. GNUNET_SERVER_client_keep (client);
  434. GNUNET_CONTAINER_DLL_insert (tcc_head, tcc_tail, tcc);
  435. }
  436. /**
  437. * Transmit a status code to the client.
  438. *
  439. * @param client receiver of the response
  440. * @param code status code
  441. * @param msg optional error message (can be NULL)
  442. */
  443. static void
  444. transmit_status (struct GNUNET_SERVER_Client *client, int code, const char *msg)
  445. {
  446. struct StatusMessage *sm;
  447. size_t slen;
  448. #if DEBUG_DATASTORE
  449. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  450. "Transmitting `%s' message with value %d and message `%s'\n",
  451. "STATUS", code, msg != NULL ? msg : "(none)");
  452. #endif
  453. slen = (msg == NULL) ? 0 : strlen (msg) + 1;
  454. sm = GNUNET_malloc (sizeof (struct StatusMessage) + slen);
  455. sm->header.size = htons (sizeof (struct StatusMessage) + slen);
  456. sm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
  457. sm->status = htonl (code);
  458. if (slen > 0)
  459. memcpy (&sm[1], msg, slen);
  460. transmit (client, &sm->header);
  461. }
  462. /**
  463. * Function that will transmit the given datastore entry
  464. * to the client.
  465. *
  466. * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
  467. * @param key key for the content
  468. * @param size number of bytes in data
  469. * @param data content stored
  470. * @param type type of the content
  471. * @param priority priority of the content
  472. * @param anonymity anonymity-level for the content
  473. * @param expiration expiration time for the content
  474. * @param uid unique identifier for the datum;
  475. * maybe 0 if no unique identifier is available
  476. *
  477. * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
  478. * GNUNET_NO to delete the item and continue (if supported)
  479. */
  480. static int
  481. transmit_item (void *cls, const GNUNET_HashCode * key, uint32_t size,
  482. const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
  483. uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
  484. uint64_t uid)
  485. {
  486. struct GNUNET_SERVER_Client *client = cls;
  487. struct GNUNET_MessageHeader *end;
  488. struct DataMessage *dm;
  489. if (key == NULL)
  490. {
  491. /* transmit 'DATA_END' */
  492. #if DEBUG_DATASTORE
  493. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting `%s' message\n",
  494. "DATA_END");
  495. #endif
  496. end = GNUNET_malloc (sizeof (struct GNUNET_MessageHeader));
  497. end->size = htons (sizeof (struct GNUNET_MessageHeader));
  498. end->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
  499. transmit (client, end);
  500. GNUNET_SERVER_client_drop (client);
  501. return GNUNET_OK;
  502. }
  503. GNUNET_assert (sizeof (struct DataMessage) + size <
  504. GNUNET_SERVER_MAX_MESSAGE_SIZE);
  505. dm = GNUNET_malloc (sizeof (struct DataMessage) + size);
  506. dm->header.size = htons (sizeof (struct DataMessage) + size);
  507. dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
  508. dm->rid = htonl (0);
  509. dm->size = htonl (size);
  510. dm->type = htonl (type);
  511. dm->priority = htonl (priority);
  512. dm->anonymity = htonl (anonymity);
  513. dm->replication = htonl (0);
  514. dm->reserved = htonl (0);
  515. dm->expiration = GNUNET_TIME_absolute_hton (expiration);
  516. dm->uid = GNUNET_htonll (uid);
  517. dm->key = *key;
  518. memcpy (&dm[1], data, size);
  519. #if DEBUG_DATASTORE
  520. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  521. "Transmitting `%s' message for `%s' of type %u with expiration %llu (now: %llu)\n",
  522. "DATA", GNUNET_h2s (key), type,
  523. (unsigned long long) expiration.abs_value,
  524. (unsigned long long) GNUNET_TIME_absolute_get ().abs_value);
  525. #endif
  526. GNUNET_STATISTICS_update (stats, gettext_noop ("# results found"), 1,
  527. GNUNET_NO);
  528. transmit (client, &dm->header);
  529. GNUNET_SERVER_client_drop (client);
  530. return GNUNET_OK;
  531. }
  532. /**
  533. * Handle RESERVE-message.
  534. *
  535. * @param cls closure
  536. * @param client identification of the client
  537. * @param message the actual message
  538. */
  539. static void
  540. handle_reserve (void *cls, struct GNUNET_SERVER_Client *client,
  541. const struct GNUNET_MessageHeader *message)
  542. {
  543. /**
  544. * Static counter to produce reservation identifiers.
  545. */
  546. static int reservation_gen;
  547. const struct ReserveMessage *msg = (const struct ReserveMessage *) message;
  548. struct ReservationList *e;
  549. unsigned long long used;
  550. unsigned long long req;
  551. uint64_t amount;
  552. uint32_t entries;
  553. #if DEBUG_DATASTORE
  554. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n", "RESERVE");
  555. #endif
  556. amount = GNUNET_ntohll (msg->amount);
  557. entries = ntohl (msg->entries);
  558. used = payload + reserved;
  559. req =
  560. amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
  561. if (used + req > quota)
  562. {
  563. if (quota < used)
  564. used = quota; /* cheat a bit for error message (to avoid negative numbers) */
  565. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  566. _
  567. ("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
  568. quota - used, "RESERVE", req);
  569. if (cache_size < req)
  570. {
  571. /* TODO: document this in the FAQ; essentially, if this
  572. * message happens, the insertion request could be blocked
  573. * by less-important content from migration because it is
  574. * larger than 1/8th of the overall available space, and
  575. * we only reserve 1/8th for "fresh" insertions */
  576. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  577. _
  578. ("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
  579. req, cache_size);
  580. transmit_status (client, 0,
  581. gettext_noop
  582. ("Insufficient space to satisfy request and "
  583. "requested amount is larger than cache size"));
  584. }
  585. else
  586. {
  587. transmit_status (client, 0,
  588. gettext_noop ("Insufficient space to satisfy request"));
  589. }
  590. return;
  591. }
  592. reserved += req;
  593. GNUNET_STATISTICS_set (stats, gettext_noop ("# reserved"), reserved,
  594. GNUNET_NO);
  595. e = GNUNET_malloc (sizeof (struct ReservationList));
  596. e->next = reservations;
  597. reservations = e;
  598. e->client = client;
  599. e->amount = amount;
  600. e->entries = entries;
  601. e->rid = ++reservation_gen;
  602. if (reservation_gen < 0)
  603. reservation_gen = 0; /* wrap around */
  604. transmit_status (client, e->rid, NULL);
  605. }
  606. /**
  607. * Handle RELEASE_RESERVE-message.
  608. *
  609. * @param cls closure
  610. * @param client identification of the client
  611. * @param message the actual message
  612. */
  613. static void
  614. handle_release_reserve (void *cls, struct GNUNET_SERVER_Client *client,
  615. const struct GNUNET_MessageHeader *message)
  616. {
  617. const struct ReleaseReserveMessage *msg =
  618. (const struct ReleaseReserveMessage *) message;
  619. struct ReservationList *pos;
  620. struct ReservationList *prev;
  621. struct ReservationList *next;
  622. int rid = ntohl (msg->rid);
  623. unsigned long long rem;
  624. #if DEBUG_DATASTORE
  625. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n",
  626. "RELEASE_RESERVE");
  627. #endif
  628. next = reservations;
  629. prev = NULL;
  630. while (NULL != (pos = next))
  631. {
  632. next = pos->next;
  633. if (rid == pos->rid)
  634. {
  635. if (prev == NULL)
  636. reservations = next;
  637. else
  638. prev->next = next;
  639. rem =
  640. pos->amount +
  641. ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
  642. GNUNET_assert (reserved >= rem);
  643. reserved -= rem;
  644. GNUNET_STATISTICS_set (stats, gettext_noop ("# reserved"), reserved,
  645. GNUNET_NO);
  646. #if DEBUG_DATASTORE
  647. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  648. "Returning %llu remaining reserved bytes to storage pool\n",
  649. rem);
  650. #endif
  651. GNUNET_free (pos);
  652. transmit_status (client, GNUNET_OK, NULL);
  653. return;
  654. }
  655. prev = pos;
  656. }
  657. GNUNET_break (0);
  658. transmit_status (client, GNUNET_SYSERR,
  659. gettext_noop ("Could not find matching reservation"));
  660. }
  661. /**
  662. * Check that the given message is a valid data message.
  663. *
  664. * @return NULL if the message is not well-formed, otherwise the message
  665. */
  666. static const struct DataMessage *
  667. check_data (const struct GNUNET_MessageHeader *message)
  668. {
  669. uint16_t size;
  670. uint32_t dsize;
  671. const struct DataMessage *dm;
  672. size = ntohs (message->size);
  673. if (size < sizeof (struct DataMessage))
  674. {
  675. GNUNET_break (0);
  676. return NULL;
  677. }
  678. dm = (const struct DataMessage *) message;
  679. dsize = ntohl (dm->size);
  680. if (size != dsize + sizeof (struct DataMessage))
  681. {
  682. GNUNET_break (0);
  683. return NULL;
  684. }
  685. return dm;
  686. }
  687. /**
  688. * Context for a PUT request used to see if the content is
  689. * already present.
  690. */
  691. struct PutContext
  692. {
  693. /**
  694. * Client to notify on completion.
  695. */
  696. struct GNUNET_SERVER_Client *client;
  697. #if ! HAVE_UNALIGNED_64_ACCESS
  698. void *reserved;
  699. #endif
  700. /* followed by the 'struct DataMessage' */
  701. };
  702. /**
  703. * Actually put the data message.
  704. */
  705. static void
  706. execute_put (struct GNUNET_SERVER_Client *client, const struct DataMessage *dm)
  707. {
  708. uint32_t size;
  709. char *msg;
  710. int ret;
  711. size = ntohl (dm->size);
  712. msg = NULL;
  713. ret =
  714. plugin->api->put (plugin->api->cls, &dm->key, size, &dm[1],
  715. ntohl (dm->type), ntohl (dm->priority),
  716. ntohl (dm->anonymity), ntohl (dm->replication),
  717. GNUNET_TIME_absolute_ntoh (dm->expiration), &msg);
  718. if (GNUNET_OK == ret)
  719. {
  720. GNUNET_STATISTICS_update (stats, gettext_noop ("# bytes stored"), size,
  721. GNUNET_YES);
  722. GNUNET_CONTAINER_bloomfilter_add (filter, &dm->key);
  723. #if DEBUG_DATASTORE
  724. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  725. "Successfully stored %u bytes of type %u under key `%s'\n",
  726. size, ntohl (dm->type), GNUNET_h2s (&dm->key));
  727. #endif
  728. }
  729. transmit_status (client, ret, msg);
  730. GNUNET_free_non_null (msg);
  731. if (quota - reserved - cache_size < payload)
  732. {
  733. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  734. _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
  735. (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
  736. (unsigned long long) (quota - reserved - cache_size),
  737. (unsigned long long) payload);
  738. manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
  739. }
  740. }
  741. /**
  742. * Function that will check if the given datastore entry
  743. * matches the put and if none match executes the put.
  744. *
  745. * @param cls closure, pointer to the client (of type 'struct PutContext').
  746. * @param key key for the content
  747. * @param size number of bytes in data
  748. * @param data content stored
  749. * @param type type of the content
  750. * @param priority priority of the content
  751. * @param anonymity anonymity-level for the content
  752. * @param expiration expiration time for the content
  753. * @param uid unique identifier for the datum;
  754. * maybe 0 if no unique identifier is available
  755. *
  756. * @return GNUNET_OK usually
  757. * GNUNET_NO to delete the item
  758. */
  759. static int
  760. check_present (void *cls, const GNUNET_HashCode * key, uint32_t size,
  761. const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
  762. uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
  763. uint64_t uid)
  764. {
  765. struct PutContext *pc = cls;
  766. const struct DataMessage *dm;
  767. dm = (const struct DataMessage *) &pc[1];
  768. if (key == NULL)
  769. {
  770. execute_put (pc->client, dm);
  771. GNUNET_SERVER_client_drop (pc->client);
  772. GNUNET_free (pc);
  773. return GNUNET_OK;
  774. }
  775. if ((GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
  776. (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) || ((size == ntohl (dm->size)) &&
  777. (0 ==
  778. memcmp (&dm[1], data, size))))
  779. {
  780. #if DEBUG_MYSQL
  781. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  782. "Result already present in datastore\n");
  783. #endif
  784. /* FIXME: change API to allow increasing 'replication' counter */
  785. if ((ntohl (dm->priority) > 0) ||
  786. (GNUNET_TIME_absolute_ntoh (dm->expiration).abs_value >
  787. expiration.abs_value))
  788. plugin->api->update (plugin->api->cls, uid,
  789. (int32_t) ntohl (dm->priority),
  790. GNUNET_TIME_absolute_ntoh (dm->expiration), NULL);
  791. transmit_status (pc->client, GNUNET_NO, NULL);
  792. GNUNET_SERVER_client_drop (pc->client);
  793. GNUNET_free (pc);
  794. }
  795. else
  796. {
  797. execute_put (pc->client, dm);
  798. GNUNET_SERVER_client_drop (pc->client);
  799. GNUNET_free (pc);
  800. }
  801. return GNUNET_OK;
  802. }
  803. /**
  804. * Handle PUT-message.
  805. *
  806. * @param cls closure
  807. * @param client identification of the client
  808. * @param message the actual message
  809. */
  810. static void
  811. handle_put (void *cls, struct GNUNET_SERVER_Client *client,
  812. const struct GNUNET_MessageHeader *message)
  813. {
  814. const struct DataMessage *dm = check_data (message);
  815. int rid;
  816. struct ReservationList *pos;
  817. struct PutContext *pc;
  818. GNUNET_HashCode vhash;
  819. uint32_t size;
  820. if ((dm == NULL) || (ntohl (dm->type) == 0))
  821. {
  822. GNUNET_break (0);
  823. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  824. return;
  825. }
  826. #if DEBUG_DATASTORE
  827. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  828. "Processing `%s' request for `%s' of type %u\n", "PUT",
  829. GNUNET_h2s (&dm->key), ntohl (dm->type));
  830. #endif
  831. rid = ntohl (dm->rid);
  832. size = ntohl (dm->size);
  833. if (rid > 0)
  834. {
  835. pos = reservations;
  836. while ((NULL != pos) && (rid != pos->rid))
  837. pos = pos->next;
  838. GNUNET_break (pos != NULL);
  839. if (NULL != pos)
  840. {
  841. GNUNET_break (pos->entries > 0);
  842. GNUNET_break (pos->amount >= size);
  843. pos->entries--;
  844. pos->amount -= size;
  845. reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
  846. GNUNET_STATISTICS_set (stats, gettext_noop ("# reserved"), reserved,
  847. GNUNET_NO);
  848. }
  849. }
  850. if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter, &dm->key))
  851. {
  852. GNUNET_CRYPTO_hash (&dm[1], size, &vhash);
  853. pc = GNUNET_malloc (sizeof (struct PutContext) + size +
  854. sizeof (struct DataMessage));
  855. pc->client = client;
  856. GNUNET_SERVER_client_keep (client);
  857. memcpy (&pc[1], dm, size + sizeof (struct DataMessage));
  858. plugin->api->get_key (plugin->api->cls, 0, &dm->key, &vhash,
  859. ntohl (dm->type), &check_present, pc);
  860. return;
  861. }
  862. execute_put (client, dm);
  863. }
  864. /**
  865. * Handle GET-message.
  866. *
  867. * @param cls closure
  868. * @param client identification of the client
  869. * @param message the actual message
  870. */
  871. static void
  872. handle_get (void *cls, struct GNUNET_SERVER_Client *client,
  873. const struct GNUNET_MessageHeader *message)
  874. {
  875. const struct GetMessage *msg;
  876. uint16_t size;
  877. size = ntohs (message->size);
  878. if ((size != sizeof (struct GetMessage)) &&
  879. (size != sizeof (struct GetMessage) - sizeof (GNUNET_HashCode)))
  880. {
  881. GNUNET_break (0);
  882. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  883. return;
  884. }
  885. msg = (const struct GetMessage *) message;
  886. #if DEBUG_DATASTORE
  887. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  888. "Processing `%s' request for `%s' of type %u\n", "GET",
  889. GNUNET_h2s (&msg->key), ntohl (msg->type));
  890. #endif
  891. GNUNET_STATISTICS_update (stats, gettext_noop ("# GET requests received"), 1,
  892. GNUNET_NO);
  893. GNUNET_SERVER_client_keep (client);
  894. if ((size == sizeof (struct GetMessage)) &&
  895. (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter, &msg->key)))
  896. {
  897. /* don't bother database... */
  898. #if DEBUG_DATASTORE
  899. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  900. "Empty result set for `%s' request for `%s' (bloomfilter).\n",
  901. "GET", GNUNET_h2s (&msg->key));
  902. #endif
  903. GNUNET_STATISTICS_update (stats,
  904. gettext_noop
  905. ("# requests filtered by bloomfilter"), 1,
  906. GNUNET_NO);
  907. transmit_item (client, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
  908. 0);
  909. return;
  910. }
  911. plugin->api->get_key (plugin->api->cls, GNUNET_ntohll (msg->offset),
  912. ((size ==
  913. sizeof (struct GetMessage)) ? &msg->key : NULL), NULL,
  914. ntohl (msg->type), &transmit_item, client);
  915. }
  916. /**
  917. * Handle UPDATE-message.
  918. *
  919. * @param cls closure
  920. * @param client identification of the client
  921. * @param message the actual message
  922. */
  923. static void
  924. handle_update (void *cls, struct GNUNET_SERVER_Client *client,
  925. const struct GNUNET_MessageHeader *message)
  926. {
  927. const struct UpdateMessage *msg;
  928. int ret;
  929. char *emsg;
  930. GNUNET_STATISTICS_update (stats, gettext_noop ("# UPDATE requests received"),
  931. 1, GNUNET_NO);
  932. msg = (const struct UpdateMessage *) message;
  933. emsg = NULL;
  934. #if DEBUG_DATASTORE
  935. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request for %llu\n",
  936. "UPDATE", (unsigned long long) GNUNET_ntohll (msg->uid));
  937. #endif
  938. ret =
  939. plugin->api->update (plugin->api->cls, GNUNET_ntohll (msg->uid),
  940. (int32_t) ntohl (msg->priority),
  941. GNUNET_TIME_absolute_ntoh (msg->expiration), &emsg);
  942. transmit_status (client, ret, emsg);
  943. GNUNET_free_non_null (emsg);
  944. }
  945. /**
  946. * Handle GET_REPLICATION-message.
  947. *
  948. * @param cls closure
  949. * @param client identification of the client
  950. * @param message the actual message
  951. */
  952. static void
  953. handle_get_replication (void *cls, struct GNUNET_SERVER_Client *client,
  954. const struct GNUNET_MessageHeader *message)
  955. {
  956. #if DEBUG_DATASTORE
  957. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n",
  958. "GET_REPLICATION");
  959. #endif
  960. GNUNET_STATISTICS_update (stats,
  961. gettext_noop
  962. ("# GET REPLICATION requests received"), 1,
  963. GNUNET_NO);
  964. GNUNET_SERVER_client_keep (client);
  965. plugin->api->get_replication (plugin->api->cls, &transmit_item, client);
  966. }
  967. /**
  968. * Handle GET_ZERO_ANONYMITY-message.
  969. *
  970. * @param cls closure
  971. * @param client identification of the client
  972. * @param message the actual message
  973. */
  974. static void
  975. handle_get_zero_anonymity (void *cls, struct GNUNET_SERVER_Client *client,
  976. const struct GNUNET_MessageHeader *message)
  977. {
  978. const struct GetZeroAnonymityMessage *msg =
  979. (const struct GetZeroAnonymityMessage *) message;
  980. enum GNUNET_BLOCK_Type type;
  981. type = (enum GNUNET_BLOCK_Type) ntohl (msg->type);
  982. if (type == GNUNET_BLOCK_TYPE_ANY)
  983. {
  984. GNUNET_break (0);
  985. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  986. return;
  987. }
  988. #if DEBUG_DATASTORE
  989. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n",
  990. "GET_ZERO_ANONYMITY");
  991. #endif
  992. GNUNET_STATISTICS_update (stats,
  993. gettext_noop
  994. ("# GET ZERO ANONYMITY requests received"), 1,
  995. GNUNET_NO);
  996. GNUNET_SERVER_client_keep (client);
  997. plugin->api->get_zero_anonymity (plugin->api->cls,
  998. GNUNET_ntohll (msg->offset), type,
  999. &transmit_item, client);
  1000. }
  1001. /**
  1002. * Callback function that will cause the item that is passed
  1003. * in to be deleted (by returning GNUNET_NO).
  1004. */
  1005. static int
  1006. remove_callback (void *cls, const GNUNET_HashCode * key, uint32_t size,
  1007. const void *data, enum GNUNET_BLOCK_Type type,
  1008. uint32_t priority, uint32_t anonymity,
  1009. struct GNUNET_TIME_Absolute expiration, uint64_t uid)
  1010. {
  1011. struct GNUNET_SERVER_Client *client = cls;
  1012. if (key == NULL)
  1013. {
  1014. #if DEBUG_DATASTORE
  1015. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1016. "No further matches for `%s' request.\n", "REMOVE");
  1017. #endif
  1018. transmit_status (client, GNUNET_NO, _("Content not found"));
  1019. GNUNET_SERVER_client_drop (client);
  1020. return GNUNET_OK; /* last item */
  1021. }
  1022. #if DEBUG_DATASTORE
  1023. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1024. "Item %llu matches `%s' request for key `%s' and type %u.\n",
  1025. (unsigned long long) uid, "REMOVE", GNUNET_h2s (key), type);
  1026. #endif
  1027. GNUNET_STATISTICS_update (stats,
  1028. gettext_noop ("# bytes removed (explicit request)"),
  1029. size, GNUNET_YES);
  1030. GNUNET_CONTAINER_bloomfilter_remove (filter, key);
  1031. transmit_status (client, GNUNET_OK, NULL);
  1032. GNUNET_SERVER_client_drop (client);
  1033. return GNUNET_NO;
  1034. }
  1035. /**
  1036. * Handle REMOVE-message.
  1037. *
  1038. * @param cls closure
  1039. * @param client identification of the client
  1040. * @param message the actual message
  1041. */
  1042. static void
  1043. handle_remove (void *cls, struct GNUNET_SERVER_Client *client,
  1044. const struct GNUNET_MessageHeader *message)
  1045. {
  1046. const struct DataMessage *dm = check_data (message);
  1047. GNUNET_HashCode vhash;
  1048. if (dm == NULL)
  1049. {
  1050. GNUNET_break (0);
  1051. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  1052. return;
  1053. }
  1054. #if DEBUG_DATASTORE
  1055. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1056. "Processing `%s' request for `%s' of type %u\n", "REMOVE",
  1057. GNUNET_h2s (&dm->key), ntohl (dm->type));
  1058. #endif
  1059. GNUNET_STATISTICS_update (stats, gettext_noop ("# REMOVE requests received"),
  1060. 1, GNUNET_NO);
  1061. GNUNET_SERVER_client_keep (client);
  1062. GNUNET_CRYPTO_hash (&dm[1], ntohl (dm->size), &vhash);
  1063. plugin->api->get_key (plugin->api->cls, 0, &dm->key, &vhash,
  1064. (enum GNUNET_BLOCK_Type) ntohl (dm->type),
  1065. &remove_callback, client);
  1066. }
  1067. /**
  1068. * Handle DROP-message.
  1069. *
  1070. * @param cls closure
  1071. * @param client identification of the client
  1072. * @param message the actual message
  1073. */
  1074. static void
  1075. handle_drop (void *cls, struct GNUNET_SERVER_Client *client,
  1076. const struct GNUNET_MessageHeader *message)
  1077. {
  1078. #if DEBUG_DATASTORE
  1079. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing `%s' request\n", "DROP");
  1080. #endif
  1081. do_drop = GNUNET_YES;
  1082. GNUNET_SERVER_receive_done (client, GNUNET_OK);
  1083. }
  1084. /**
  1085. * Function called by plugins to notify us about a
  1086. * change in their disk utilization.
  1087. *
  1088. * @param cls closure (NULL)
  1089. * @param delta change in disk utilization,
  1090. * 0 for "reset to empty"
  1091. */
  1092. static void
  1093. disk_utilization_change_cb (void *cls, int delta)
  1094. {
  1095. if ((delta < 0) && (payload < -delta))
  1096. {
  1097. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  1098. _
  1099. ("Datastore payload inaccurate (%lld < %lld). Trying to fix.\n"),
  1100. (long long) payload, (long long) -delta);
  1101. payload = plugin->api->estimate_size (plugin->api->cls);
  1102. sync_stats ();
  1103. return;
  1104. }
  1105. payload += delta;
  1106. lastSync++;
  1107. if (lastSync >= MAX_STAT_SYNC_LAG)
  1108. sync_stats ();
  1109. }
  1110. /**
  1111. * Callback function to process statistic values.
  1112. *
  1113. * @param cls closure (struct Plugin*)
  1114. * @param subsystem name of subsystem that created the statistic
  1115. * @param name the name of the datum
  1116. * @param value the current value
  1117. * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
  1118. * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
  1119. */
  1120. static int
  1121. process_stat_in (void *cls, const char *subsystem, const char *name,
  1122. uint64_t value, int is_persistent)
  1123. {
  1124. GNUNET_assert (stats_worked == GNUNET_NO);
  1125. stats_worked = GNUNET_YES;
  1126. payload += value;
  1127. #if DEBUG_SQLITE
  1128. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1129. "Notification from statistics about existing payload (%llu), new payload is %llu\n",
  1130. abs_value, payload);
  1131. #endif
  1132. return GNUNET_OK;
  1133. }
  1134. static void
  1135. process_stat_done (void *cls, int success)
  1136. {
  1137. struct DatastorePlugin *plugin = cls;
  1138. stat_get = NULL;
  1139. if (stats_worked == GNUNET_NO)
  1140. payload = plugin->api->estimate_size (plugin->api->cls);
  1141. }
  1142. /**
  1143. * Load the datastore plugin.
  1144. */
  1145. static struct DatastorePlugin *
  1146. load_plugin ()
  1147. {
  1148. struct DatastorePlugin *ret;
  1149. char *libname;
  1150. char *name;
  1151. if (GNUNET_OK !=
  1152. GNUNET_CONFIGURATION_get_value_string (cfg, "DATASTORE", "DATABASE",
  1153. &name))
  1154. {
  1155. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  1156. _("No `%s' specified for `%s' in configuration!\n"), "DATABASE",
  1157. "DATASTORE");
  1158. return NULL;
  1159. }
  1160. ret = GNUNET_malloc (sizeof (struct DatastorePlugin));
  1161. ret->env.cfg = cfg;
  1162. ret->env.duc = &disk_utilization_change_cb;
  1163. ret->env.cls = NULL;
  1164. GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Loading `%s' datastore plugin\n"),
  1165. name);
  1166. GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
  1167. ret->short_name = name;
  1168. ret->lib_name = libname;
  1169. ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
  1170. if (ret->api == NULL)
  1171. {
  1172. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  1173. _("Failed to load datastore plugin for `%s'\n"), name);
  1174. GNUNET_free (ret->short_name);
  1175. GNUNET_free (libname);
  1176. GNUNET_free (ret);
  1177. return NULL;
  1178. }
  1179. return ret;
  1180. }
  1181. /**
  1182. * Function called when the service shuts
  1183. * down. Unloads our datastore plugin.
  1184. *
  1185. * @param plug plugin to unload
  1186. */
  1187. static void
  1188. unload_plugin (struct DatastorePlugin *plug)
  1189. {
  1190. #if DEBUG_DATASTORE
  1191. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1192. "Datastore service is unloading plugin...\n");
  1193. #endif
  1194. GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
  1195. GNUNET_free (plug->lib_name);
  1196. GNUNET_free (plug->short_name);
  1197. GNUNET_free (plug);
  1198. }
  1199. /**
  1200. * Final task run after shutdown. Unloads plugins and disconnects us from
  1201. * statistics.
  1202. */
  1203. static void
  1204. unload_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  1205. {
  1206. if (GNUNET_YES == do_drop)
  1207. plugin->api->drop (plugin->api->cls);
  1208. unload_plugin (plugin);
  1209. plugin = NULL;
  1210. if (filter != NULL)
  1211. {
  1212. GNUNET_CONTAINER_bloomfilter_free (filter);
  1213. filter = NULL;
  1214. }
  1215. if (lastSync > 0)
  1216. sync_stats ();
  1217. if (stat_get != NULL)
  1218. {
  1219. GNUNET_STATISTICS_get_cancel (stat_get);
  1220. stat_get = NULL;
  1221. }
  1222. if (stats != NULL)
  1223. {
  1224. GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
  1225. stats = NULL;
  1226. }
  1227. }
  1228. /**
  1229. * Last task run during shutdown. Disconnects us from
  1230. * the transport and core.
  1231. */
  1232. static void
  1233. cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  1234. {
  1235. struct TransmitCallbackContext *tcc;
  1236. cleaning_done = GNUNET_YES;
  1237. while (NULL != (tcc = tcc_head))
  1238. {
  1239. GNUNET_CONTAINER_DLL_remove (tcc_head, tcc_tail, tcc);
  1240. if (tcc->th != NULL)
  1241. {
  1242. GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
  1243. GNUNET_SERVER_client_drop (tcc->client);
  1244. }
  1245. GNUNET_free (tcc->msg);
  1246. GNUNET_free (tcc);
  1247. }
  1248. if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
  1249. {
  1250. GNUNET_SCHEDULER_cancel (expired_kill_task);
  1251. expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
  1252. }
  1253. GNUNET_SCHEDULER_add_continuation (&unload_task, NULL,
  1254. GNUNET_SCHEDULER_REASON_PREREQ_DONE);
  1255. }
  1256. /**
  1257. * Function that removes all active reservations made
  1258. * by the given client and releases the space for other
  1259. * requests.
  1260. *
  1261. * @param cls closure
  1262. * @param client identification of the client
  1263. */
  1264. static void
  1265. cleanup_reservations (void *cls, struct GNUNET_SERVER_Client *client)
  1266. {
  1267. struct ReservationList *pos;
  1268. struct ReservationList *prev;
  1269. struct ReservationList *next;
  1270. if (client == NULL)
  1271. return;
  1272. prev = NULL;
  1273. pos = reservations;
  1274. while (NULL != pos)
  1275. {
  1276. next = pos->next;
  1277. if (pos->client == client)
  1278. {
  1279. if (prev == NULL)
  1280. reservations = next;
  1281. else
  1282. prev->next = next;
  1283. reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
  1284. GNUNET_free (pos);
  1285. }
  1286. else
  1287. {
  1288. prev = pos;
  1289. }
  1290. pos = next;
  1291. }
  1292. GNUNET_STATISTICS_set (stats, gettext_noop ("# reserved"), reserved,
  1293. GNUNET_NO);
  1294. }
  1295. /**
  1296. * Process datastore requests.
  1297. *
  1298. * @param cls closure
  1299. * @param server the initialized server
  1300. * @param c configuration to use
  1301. */
  1302. static void
  1303. run (void *cls, struct GNUNET_SERVER_Handle *server,
  1304. const struct GNUNET_CONFIGURATION_Handle *c)
  1305. {
  1306. static const struct GNUNET_SERVER_MessageHandler handlers[] = {
  1307. {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE,
  1308. sizeof (struct ReserveMessage)},
  1309. {&handle_release_reserve, NULL,
  1310. GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE,
  1311. sizeof (struct ReleaseReserveMessage)},
  1312. {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0},
  1313. {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE,
  1314. sizeof (struct UpdateMessage)},
  1315. {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0},
  1316. {&handle_get_replication, NULL,
  1317. GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION,
  1318. sizeof (struct GNUNET_MessageHeader)},
  1319. {&handle_get_zero_anonymity, NULL,
  1320. GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY,
  1321. sizeof (struct GetZeroAnonymityMessage)},
  1322. {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0},
  1323. {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP,
  1324. sizeof (struct GNUNET_MessageHeader)},
  1325. {NULL, NULL, 0, 0}
  1326. };
  1327. char *fn;
  1328. unsigned int bf_size;
  1329. cfg = c;
  1330. if (GNUNET_OK !=
  1331. GNUNET_CONFIGURATION_get_value_number (cfg, "DATASTORE", "QUOTA", &quota))
  1332. {
  1333. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  1334. _("No `%s' specified for `%s' in configuration!\n"), "QUOTA",
  1335. "DATASTORE");
  1336. return;
  1337. }
  1338. stats = GNUNET_STATISTICS_create ("datastore", cfg);
  1339. GNUNET_STATISTICS_set (stats, gettext_noop ("# quota"), quota, GNUNET_NO);
  1340. cache_size = quota / 8; /* Or should we make this an option? */
  1341. GNUNET_STATISTICS_set (stats, gettext_noop ("# cache size"), cache_size,
  1342. GNUNET_NO);
  1343. bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
  1344. fn = NULL;
  1345. if ((GNUNET_OK !=
  1346. GNUNET_CONFIGURATION_get_value_filename (cfg, "DATASTORE", "BLOOMFILTER",
  1347. &fn)) ||
  1348. (GNUNET_OK != GNUNET_DISK_directory_create_for_file (fn)))
  1349. {
  1350. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  1351. _("Could not use specified filename `%s' for bloomfilter.\n"),
  1352. fn != NULL ? fn : "");
  1353. GNUNET_free_non_null (fn);
  1354. fn = NULL;
  1355. }
  1356. if (fn != NULL)
  1357. filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5); /* approx. 3% false positives at max use */
  1358. else
  1359. filter = GNUNET_CONTAINER_bloomfilter_init (NULL, bf_size, 5); /* approx. 3% false positives at max use */
  1360. GNUNET_free_non_null (fn);
  1361. if (filter == NULL)
  1362. {
  1363. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  1364. _("Failed to initialize bloomfilter.\n"));
  1365. if (stats != NULL)
  1366. {
  1367. GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
  1368. stats = NULL;
  1369. }
  1370. return;
  1371. }
  1372. plugin = load_plugin ();
  1373. if (NULL == plugin)
  1374. {
  1375. GNUNET_CONTAINER_bloomfilter_free (filter);
  1376. filter = NULL;
  1377. if (stats != NULL)
  1378. {
  1379. GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
  1380. stats = NULL;
  1381. }
  1382. return;
  1383. }
  1384. stat_get =
  1385. GNUNET_STATISTICS_get (stats, "datastore", QUOTA_STAT_NAME,
  1386. GNUNET_TIME_UNIT_SECONDS, &process_stat_done,
  1387. &process_stat_in, plugin);
  1388. GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
  1389. GNUNET_SERVER_add_handlers (server, handlers);
  1390. expired_kill_task =
  1391. GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
  1392. &delete_expired, NULL);
  1393. GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &cleaning_task,
  1394. NULL);
  1395. }
  1396. /**
  1397. * The main function for the datastore service.
  1398. *
  1399. * @param argc number of arguments from the command line
  1400. * @param argv command line arguments
  1401. * @return 0 ok, 1 on error
  1402. */
  1403. int
  1404. main (int argc, char *const *argv)
  1405. {
  1406. int ret;
  1407. ret =
  1408. (GNUNET_OK ==
  1409. GNUNET_SERVICE_run (argc, argv, "datastore", GNUNET_SERVICE_OPTION_NONE,
  1410. &run, NULL)) ? 0 : 1;
  1411. return ret;
  1412. }
  1413. /* end of gnunet-service-datastore.c */