123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488 |
- /*
- This file is part of GNUnet
- (C) 2004-2013 Christian Grothoff (and other contributing authors)
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3, or (at your
- option) any later version.
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
- */
- /**
- * @file datastore/datastore_api.c
- * @brief Management for the datastore for files stored on a GNUnet node. Implements
- * a priority queue for requests (with timeouts).
- * @author Christian Grothoff
- */
- #include "platform.h"
- #include "gnunet_arm_service.h"
- #include "gnunet_constants.h"
- #include "gnunet_datastore_service.h"
- #include "gnunet_statistics_service.h"
- #include "datastore.h"
- #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
- /**
- * Collect an instane number of statistics? May cause excessive IPC.
- */
- #define INSANE_STATISTICS GNUNET_NO
- /**
- * If a client stopped asking for more results, how many more do
- * we receive from the DB before killing the connection? Trade-off
- * between re-doing TCP handshakes and (needlessly) receiving
- * useless results.
- */
- #define MAX_EXCESS_RESULTS 8
- /**
- * Context for processing status messages.
- */
- struct StatusContext
- {
- /**
- * Continuation to call with the status.
- */
- GNUNET_DATASTORE_ContinuationWithStatus cont;
- /**
- * Closure for cont.
- */
- void *cont_cls;
- };
- /**
- * Context for processing result messages.
- */
- struct ResultContext
- {
- /**
- * Function to call with the result.
- */
- GNUNET_DATASTORE_DatumProcessor proc;
- /**
- * Closure for proc.
- */
- void *proc_cls;
- };
- /**
- * Context for a queue operation.
- */
- union QueueContext
- {
- struct StatusContext sc;
- struct ResultContext rc;
- };
- /**
- * Entry in our priority queue.
- */
- struct GNUNET_DATASTORE_QueueEntry
- {
- /**
- * This is a linked list.
- */
- struct GNUNET_DATASTORE_QueueEntry *next;
- /**
- * This is a linked list.
- */
- struct GNUNET_DATASTORE_QueueEntry *prev;
- /**
- * Handle to the master context.
- */
- struct GNUNET_DATASTORE_Handle *h;
- /**
- * Response processor (NULL if we are not waiting for a response).
- * This struct should be used for the closure, function-specific
- * arguments can be passed via 'qc'.
- */
- GNUNET_CLIENT_MessageHandler response_proc;
- /**
- * Function to call after transmission of the request.
- */
- GNUNET_DATASTORE_ContinuationWithStatus cont;
- /**
- * Closure for 'cont'.
- */
- void *cont_cls;
- /**
- * Context for the operation.
- */
- union QueueContext qc;
- /**
- * Task for timeout signalling.
- */
- struct GNUNET_SCHEDULER_Task * task;
- /**
- * Timeout for the current operation.
- */
- struct GNUNET_TIME_Absolute timeout;
- /**
- * Priority in the queue.
- */
- unsigned int priority;
- /**
- * Maximum allowed length of queue (otherwise
- * this request should be discarded).
- */
- unsigned int max_queue;
- /**
- * Number of bytes in the request message following
- * this struct. 32-bit value for nicer memory
- * access (and overall struct alignment).
- */
- uint32_t message_size;
- /**
- * Has this message been transmitted to the service?
- * Only ever GNUNET_YES for the head of the queue.
- * Note that the overall struct should end at a
- * multiple of 64 bits.
- */
- int was_transmitted;
- };
- /**
- * Handle to the datastore service.
- */
- struct GNUNET_DATASTORE_Handle
- {
- /**
- * Our configuration.
- */
- const struct GNUNET_CONFIGURATION_Handle *cfg;
- /**
- * Current connection to the datastore service.
- */
- struct GNUNET_CLIENT_Connection *client;
- /**
- * Handle for statistics.
- */
- struct GNUNET_STATISTICS_Handle *stats;
- /**
- * Current transmit handle.
- */
- struct GNUNET_CLIENT_TransmitHandle *th;
- /**
- * Current head of priority queue.
- */
- struct GNUNET_DATASTORE_QueueEntry *queue_head;
- /**
- * Current tail of priority queue.
- */
- struct GNUNET_DATASTORE_QueueEntry *queue_tail;
- /**
- * Task for trying to reconnect.
- */
- struct GNUNET_SCHEDULER_Task * reconnect_task;
- /**
- * How quickly should we retry? Used for exponential back-off on
- * connect-errors.
- */
- struct GNUNET_TIME_Relative retry_time;
- /**
- * Number of entries in the queue.
- */
- unsigned int queue_size;
- /**
- * Number of results we're receiving for the current query
- * after application stopped to care. Used to determine when
- * to reset the connection.
- */
- unsigned int result_count;
- /**
- * Are we currently trying to receive from the service?
- */
- int in_receive;
- /**
- * We should ignore the next message(s) from the service.
- */
- unsigned int skip_next_messages;
- };
- /**
- * Connect to the datastore service.
- *
- * @param cfg configuration to use
- * @return handle to use to access the service
- */
- struct GNUNET_DATASTORE_Handle *
- GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
- {
- struct GNUNET_CLIENT_Connection *c;
- struct GNUNET_DATASTORE_Handle *h;
- c = GNUNET_CLIENT_connect ("datastore", cfg);
- if (c == NULL)
- return NULL; /* oops */
- h = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_Handle) +
- GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
- h->client = c;
- h->cfg = cfg;
- h->stats = GNUNET_STATISTICS_create ("datastore-api", cfg);
- return h;
- }
- /**
- * Task used by 'transmit_drop' to disconnect the datastore.
- *
- * @param cls the datastore handle
- * @param tc scheduler context
- */
- static void
- disconnect_after_drop (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
- {
- struct GNUNET_DATASTORE_Handle *h = cls;
- GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
- }
- /**
- * Transmit DROP message to datastore service.
- *
- * @param cls the `struct GNUNET_DATASTORE_Handle`
- * @param size number of bytes that can be copied to @a buf
- * @param buf where to copy the drop message
- * @return number of bytes written to @a buf
- */
- static size_t
- transmit_drop (void *cls, size_t size, void *buf)
- {
- struct GNUNET_DATASTORE_Handle *h = cls;
- struct GNUNET_MessageHeader *hdr;
- if (buf == NULL)
- {
- LOG (GNUNET_ERROR_TYPE_WARNING,
- _("Failed to transmit request to drop database.\n"));
- GNUNET_SCHEDULER_add_continuation (&disconnect_after_drop, h,
- GNUNET_SCHEDULER_REASON_PREREQ_DONE);
- return 0;
- }
- GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
- hdr = buf;
- hdr->size = htons (sizeof (struct GNUNET_MessageHeader));
- hdr->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
- GNUNET_SCHEDULER_add_continuation (&disconnect_after_drop, h,
- GNUNET_SCHEDULER_REASON_PREREQ_DONE);
- return sizeof (struct GNUNET_MessageHeader);
- }
- /**
- * Disconnect from the datastore service (and free
- * associated resources).
- *
- * @param h handle to the datastore
- * @param drop set to #GNUNET_YES to delete all data in datastore (!)
- */
- void
- GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, int drop)
- {
- struct GNUNET_DATASTORE_QueueEntry *qe;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Datastore disconnect\n");
- if (NULL != h->th)
- {
- GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
- h->th = NULL;
- }
- if (h->client != NULL)
- {
- GNUNET_CLIENT_disconnect (h->client);
- h->client = NULL;
- }
- if (h->reconnect_task != NULL)
- {
- GNUNET_SCHEDULER_cancel (h->reconnect_task);
- h->reconnect_task = NULL;
- }
- while (NULL != (qe = h->queue_head))
- {
- GNUNET_assert (NULL != qe->response_proc);
- qe->response_proc (h, NULL);
- }
- if (GNUNET_YES == drop)
- {
- h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
- if (h->client != NULL)
- {
- if (NULL !=
- GNUNET_CLIENT_notify_transmit_ready (h->client,
- sizeof (struct
- GNUNET_MessageHeader),
- GNUNET_TIME_UNIT_MINUTES,
- GNUNET_YES, &transmit_drop, h))
- return;
- GNUNET_CLIENT_disconnect (h->client);
- h->client = NULL;
- }
- GNUNET_break (0);
- }
- GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO);
- h->stats = NULL;
- GNUNET_free (h);
- }
- /**
- * A request has timed out (before being transmitted to the service).
- *
- * @param cls the `struct GNUNET_DATASTORE_QueueEntry`
- * @param tc scheduler context
- */
- static void
- timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
- {
- struct GNUNET_DATASTORE_QueueEntry *qe = cls;
- struct GNUNET_DATASTORE_Handle *h = qe->h;
- GNUNET_STATISTICS_update (h->stats,
- gettext_noop ("# queue entry timeouts"), 1,
- GNUNET_NO);
- qe->task = NULL;
- GNUNET_assert (GNUNET_NO == qe->was_transmitted);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Timeout of request in datastore queue\n");
- /* response_proc's expect request at the head of the queue! */
- GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe);
- GNUNET_CONTAINER_DLL_insert (h->queue_head, h->queue_tail, qe);
- GNUNET_assert (h->queue_head == qe);
- qe->response_proc (qe->h, NULL);
- }
- /**
- * Create a new entry for our priority queue (and possibly discard other entires if
- * the queue is getting too long).
- *
- * @param h handle to the datastore
- * @param msize size of the message to queue
- * @param queue_priority priority of the entry
- * @param max_queue_size at what queue size should this request be dropped
- * (if other requests of higher priority are in the queue)
- * @param timeout timeout for the operation
- * @param response_proc function to call with replies (can be NULL)
- * @param qc client context (NOT a closure for @a response_proc)
- * @return NULL if the queue is full
- */
- static struct GNUNET_DATASTORE_QueueEntry *
- make_queue_entry (struct GNUNET_DATASTORE_Handle *h, size_t msize,
- unsigned int queue_priority, unsigned int max_queue_size,
- struct GNUNET_TIME_Relative timeout,
- GNUNET_CLIENT_MessageHandler response_proc,
- const union QueueContext *qc)
- {
- struct GNUNET_DATASTORE_QueueEntry *ret;
- struct GNUNET_DATASTORE_QueueEntry *pos;
- unsigned int c;
- c = 0;
- pos = h->queue_head;
- while ((pos != NULL) && (c < max_queue_size) &&
- (pos->priority >= queue_priority))
- {
- c++;
- pos = pos->next;
- }
- if (c >= max_queue_size)
- {
- GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue overflows"), 1,
- GNUNET_NO);
- return NULL;
- }
- ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
- ret->h = h;
- ret->response_proc = response_proc;
- ret->qc = *qc;
- ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
- ret->priority = queue_priority;
- ret->max_queue = max_queue_size;
- ret->message_size = msize;
- ret->was_transmitted = GNUNET_NO;
- if (pos == NULL)
- {
- /* append at the tail */
- pos = h->queue_tail;
- }
- else
- {
- pos = pos->prev;
- /* do not insert at HEAD if HEAD query was already
- * transmitted and we are still receiving replies! */
- if ((pos == NULL) && (h->queue_head->was_transmitted))
- pos = h->queue_head;
- }
- c++;
- #if INSANE_STATISTICS
- GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue entries created"),
- 1, GNUNET_NO);
- #endif
- GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, ret);
- h->queue_size++;
- ret->task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, ret);
- for (pos = ret->next; NULL != pos; pos = pos->next)
- {
- if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == GNUNET_NO))
- {
- GNUNET_assert (NULL != pos->response_proc);
- /* move 'pos' element to head so that it will be
- * killed on 'NULL' call below */
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Dropping request from datastore queue\n");
- /* response_proc's expect request at the head of the queue! */
- GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, pos);
- GNUNET_CONTAINER_DLL_insert (h->queue_head, h->queue_tail, pos);
- GNUNET_STATISTICS_update (h->stats,
- gettext_noop
- ("# Requests dropped from datastore queue"), 1,
- GNUNET_NO);
- GNUNET_assert (h->queue_head == pos);
- pos->response_proc (h, NULL);
- break;
- }
- }
- return ret;
- }
- /**
- * Process entries in the queue (or do nothing if we are already
- * doing so).
- *
- * @param h handle to the datastore
- */
- static void
- process_queue (struct GNUNET_DATASTORE_Handle *h);
- /**
- * Try reconnecting to the datastore service.
- *
- * @param cls the `struct GNUNET_DATASTORE_Handle`
- * @param tc scheduler context
- */
- static void
- try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
- {
- struct GNUNET_DATASTORE_Handle *h = cls;
- h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
- h->reconnect_task = NULL;
- h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
- if (h->client == NULL)
- {
- LOG (GNUNET_ERROR_TYPE_ERROR, "DATASTORE reconnect failed (fatally)\n");
- return;
- }
- GNUNET_STATISTICS_update (h->stats,
- gettext_noop
- ("# datastore connections (re)created"), 1,
- GNUNET_NO);
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n");
- process_queue (h);
- }
- /**
- * Disconnect from the service and then try reconnecting to the datastore service
- * after some delay.
- *
- * @param h handle to datastore to disconnect and reconnect
- */
- static void
- do_disconnect (struct GNUNET_DATASTORE_Handle *h)
- {
- if (NULL == h->client)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Client NULL in disconnect, will not try to reconnect\n");
- return;
- }
- GNUNET_CLIENT_disconnect (h->client);
- h->skip_next_messages = 0;
- h->client = NULL;
- h->reconnect_task =
- GNUNET_SCHEDULER_add_delayed (h->retry_time, &try_reconnect, h);
- }
- /**
- * Function called whenever we receive a message from
- * the service. Calls the appropriate handler.
- *
- * @param cls the `struct GNUNET_DATASTORE_Handle`
- * @param msg the received message
- */
- static void
- receive_cb (void *cls,
- const struct GNUNET_MessageHeader *msg)
- {
- struct GNUNET_DATASTORE_Handle *h = cls;
- struct GNUNET_DATASTORE_QueueEntry *qe;
- h->in_receive = GNUNET_NO;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Receiving reply from datastore\n");
- if (h->skip_next_messages > 0)
- {
- h->skip_next_messages--;
- process_queue (h);
- return;
- }
- if (NULL == (qe = h->queue_head))
- {
- GNUNET_break (0);
- process_queue (h);
- return;
- }
- qe->response_proc (h, msg);
- }
- /**
- * Transmit request from queue to datastore service.
- *
- * @param cls the `struct GNUNET_DATASTORE_Handle`
- * @param size number of bytes that can be copied to @a buf
- * @param buf where to copy the drop message
- * @return number of bytes written to @a buf
- */
- static size_t
- transmit_request (void *cls,
- size_t size,
- void *buf)
- {
- struct GNUNET_DATASTORE_Handle *h = cls;
- struct GNUNET_DATASTORE_QueueEntry *qe;
- size_t msize;
- h->th = NULL;
- if (NULL == (qe = h->queue_head))
- return 0; /* no entry in queue */
- if (NULL == buf)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Failed to transmit request to DATASTORE.\n");
- GNUNET_STATISTICS_update (h->stats,
- gettext_noop ("# transmission request failures"),
- 1, GNUNET_NO);
- do_disconnect (h);
- return 0;
- }
- if (size < (msize = qe->message_size))
- {
- process_queue (h);
- return 0;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting %u byte request to DATASTORE\n",
- msize);
- memcpy (buf, &qe[1], msize);
- qe->was_transmitted = GNUNET_YES;
- GNUNET_SCHEDULER_cancel (qe->task);
- qe->task = NULL;
- GNUNET_assert (GNUNET_NO == h->in_receive);
- h->in_receive = GNUNET_YES;
- GNUNET_CLIENT_receive (h->client,
- &receive_cb, h,
- GNUNET_TIME_absolute_get_remaining (qe->timeout));
- #if INSANE_STATISTICS
- GNUNET_STATISTICS_update (h->stats,
- gettext_noop ("# bytes sent to datastore"), msize,
- GNUNET_NO);
- #endif
- return msize;
- }
- /**
- * Process entries in the queue (or do nothing if we are already
- * doing so).
- *
- * @param h handle to the datastore
- */
- static void
- process_queue (struct GNUNET_DATASTORE_Handle *h)
- {
- struct GNUNET_DATASTORE_QueueEntry *qe;
- if (NULL == (qe = h->queue_head))
- {
- /* no entry in queue */
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Queue empty\n");
- return;
- }
- if (GNUNET_YES == qe->was_transmitted)
- {
- /* waiting for replies */
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Head request already transmitted\n");
- return;
- }
- if (NULL != h->th)
- {
- /* request pending */
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Pending transmission request\n");
- return;
- }
- if (NULL == h->client)
- {
- /* waiting for reconnect */
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Not connected\n");
- return;
- }
- if (GNUNET_YES == h->in_receive)
- {
- /* wait for response to previous query */
- return;
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Queueing %u byte request to DATASTORE\n",
- qe->message_size);
- h->th
- = GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size,
- GNUNET_TIME_absolute_get_remaining (qe->timeout),
- GNUNET_YES,
- &transmit_request, h);
- GNUNET_assert (GNUNET_NO == h->in_receive);
- GNUNET_break (NULL != h->th);
- }
- /**
- * Dummy continuation used to do nothing (but be non-zero).
- *
- * @param cls closure
- * @param result result
- * @param min_expiration expiration time
- * @param emsg error message
- */
- static void
- drop_status_cont (void *cls, int32_t result,
- struct GNUNET_TIME_Absolute min_expiration,
- const char *emsg)
- {
- /* do nothing */
- }
- /**
- * Free a queue entry. Removes the given entry from the
- * queue and releases associated resources. Does NOT
- * call the callback.
- *
- * @param qe entry to free.
- */
- static void
- free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
- {
- struct GNUNET_DATASTORE_Handle *h = qe->h;
- GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe);
- if (qe->task != NULL)
- {
- GNUNET_SCHEDULER_cancel (qe->task);
- qe->task = NULL;
- }
- h->queue_size--;
- qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */
- GNUNET_free (qe);
- }
- /**
- * Type of a function to call when we receive a message
- * from the service.
- *
- * @param cls closure
- * @param msg message received, NULL on timeout or fatal error
- */
- static void
- process_status_message (void *cls,
- const struct GNUNET_MessageHeader *msg)
- {
- struct GNUNET_DATASTORE_Handle *h = cls;
- struct GNUNET_DATASTORE_QueueEntry *qe;
- struct StatusContext rc;
- const struct StatusMessage *sm;
- const char *emsg;
- int32_t status;
- int was_transmitted;
- if (NULL == (qe = h->queue_head))
- {
- GNUNET_break (0);
- do_disconnect (h);
- return;
- }
- rc = qe->qc.sc;
- if (NULL == msg)
- {
- was_transmitted = qe->was_transmitted;
- free_queue_entry (qe);
- if (was_transmitted == GNUNET_YES)
- do_disconnect (h);
- else
- process_queue (h);
- if (NULL != rc.cont)
- rc.cont (rc.cont_cls, GNUNET_SYSERR,
- GNUNET_TIME_UNIT_ZERO_ABS,
- _("Failed to receive status response from database."));
- return;
- }
- GNUNET_assert (GNUNET_YES == qe->was_transmitted);
- free_queue_entry (qe);
- if ((ntohs (msg->size) < sizeof (struct StatusMessage)) ||
- (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS))
- {
- GNUNET_break (0);
- h->retry_time = GNUNET_TIME_UNIT_ZERO;
- do_disconnect (h);
- if (rc.cont != NULL)
- rc.cont (rc.cont_cls, GNUNET_SYSERR,
- GNUNET_TIME_UNIT_ZERO_ABS,
- _("Error reading response from datastore service"));
- return;
- }
- sm = (const struct StatusMessage *) msg;
- status = ntohl (sm->status);
- emsg = NULL;
- if (ntohs (msg->size) > sizeof (struct StatusMessage))
- {
- emsg = (const char *) &sm[1];
- if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0')
- {
- GNUNET_break (0);
- emsg = _("Invalid error message received from datastore service");
- }
- }
- if ((status == GNUNET_SYSERR) && (emsg == NULL))
- {
- GNUNET_break (0);
- emsg = _("Invalid error message received from datastore service");
- }
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Received status %d/%s\n", (int) status, emsg);
- GNUNET_STATISTICS_update (h->stats,
- gettext_noop ("# status messages received"), 1,
- GNUNET_NO);
- h->retry_time = GNUNET_TIME_UNIT_ZERO;
- process_queue (h);
- if (rc.cont != NULL)
- rc.cont (rc.cont_cls, status,
- GNUNET_TIME_absolute_ntoh (sm->min_expiration),
- emsg);
- }
- /**
- * Store an item in the datastore. If the item is already present,
- * the priorities are summed up and the higher expiration time and
- * lower anonymity level is used.
- *
- * @param h handle to the datastore
- * @param rid reservation ID to use (from "reserve"); use 0 if no
- * prior reservation was made
- * @param key key for the value
- * @param size number of bytes in data
- * @param data content stored
- * @param type type of the content
- * @param priority priority of the content
- * @param anonymity anonymity-level for the content
- * @param replication how often should the content be replicated to other peers?
- * @param expiration expiration time for the content
- * @param queue_priority ranking of this request in the priority queue
- * @param max_queue_size at what queue size should this request be dropped
- * (if other requests of higher priority are in the queue)
- * @param timeout timeout for the operation
- * @param cont continuation to call when done
- * @param cont_cls closure for @a cont
- * @return NULL if the entry was not queued, otherwise a handle that can be used to
- * cancel; note that even if NULL is returned, the callback will be invoked
- * (or rather, will already have been invoked)
- */
- struct GNUNET_DATASTORE_QueueEntry *
- GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, uint32_t rid,
- const struct GNUNET_HashCode * key, size_t size,
- const void *data, enum GNUNET_BLOCK_Type type,
- uint32_t priority, uint32_t anonymity,
- uint32_t replication,
- struct GNUNET_TIME_Absolute expiration,
- unsigned int queue_priority, unsigned int max_queue_size,
- struct GNUNET_TIME_Relative timeout,
- GNUNET_DATASTORE_ContinuationWithStatus cont,
- void *cont_cls)
- {
- struct GNUNET_DATASTORE_QueueEntry *qe;
- struct DataMessage *dm;
- size_t msize;
- union QueueContext qc;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Asked to put %u bytes of data under key `%s' for %s\n", size,
- GNUNET_h2s (key),
- GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
- GNUNET_YES));
- msize = sizeof (struct DataMessage) + size;
- GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
- qc.sc.cont = cont;
- qc.sc.cont_cls = cont_cls;
- qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
- &process_status_message, &qc);
- if (qe == NULL)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for PUT\n");
- return NULL;
- }
- GNUNET_STATISTICS_update (h->stats, gettext_noop ("# PUT requests executed"),
- 1, GNUNET_NO);
- dm = (struct DataMessage *) &qe[1];
- dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
- dm->header.size = htons (msize);
- dm->rid = htonl (rid);
- dm->size = htonl ((uint32_t) size);
- dm->type = htonl (type);
- dm->priority = htonl (priority);
- dm->anonymity = htonl (anonymity);
- dm->replication = htonl (replication);
- dm->reserved = htonl (0);
- dm->uid = GNUNET_htonll (0);
- dm->expiration = GNUNET_TIME_absolute_hton (expiration);
- dm->key = *key;
- memcpy (&dm[1], data, size);
- process_queue (h);
- return qe;
- }
- /**
- * Reserve space in the datastore. This function should be used
- * to avoid "out of space" failures during a longer sequence of "put"
- * operations (for example, when a file is being inserted).
- *
- * @param h handle to the datastore
- * @param amount how much space (in bytes) should be reserved (for content only)
- * @param entries how many entries will be created (to calculate per-entry overhead)
- * @param cont continuation to call when done; "success" will be set to
- * a positive reservation value if space could be reserved.
- * @param cont_cls closure for @a cont
- * @return NULL if the entry was not queued, otherwise a handle that can be used to
- * cancel; note that even if NULL is returned, the callback will be invoked
- * (or rather, will already have been invoked)
- */
- struct GNUNET_DATASTORE_QueueEntry *
- GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount,
- uint32_t entries,
- GNUNET_DATASTORE_ContinuationWithStatus cont,
- void *cont_cls)
- {
- struct GNUNET_DATASTORE_QueueEntry *qe;
- struct ReserveMessage *rm;
- union QueueContext qc;
- if (NULL == cont)
- cont = &drop_status_cont;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Asked to reserve %llu bytes of data and %u entries\n",
- (unsigned long long) amount, (unsigned int) entries);
- qc.sc.cont = cont;
- qc.sc.cont_cls = cont_cls;
- qe = make_queue_entry (h,
- sizeof (struct ReserveMessage),
- UINT_MAX,
- UINT_MAX,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &process_status_message, &qc);
- if (NULL == qe)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Could not create queue entry to reserve\n");
- return NULL;
- }
- GNUNET_STATISTICS_update (h->stats,
- gettext_noop ("# RESERVE requests executed"), 1,
- GNUNET_NO);
- rm = (struct ReserveMessage *) &qe[1];
- rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
- rm->header.size = htons (sizeof (struct ReserveMessage));
- rm->entries = htonl (entries);
- rm->amount = GNUNET_htonll (amount);
- process_queue (h);
- return qe;
- }
- /**
- * Signal that all of the data for which a reservation was made has
- * been stored and that whatever excess space might have been reserved
- * can now be released.
- *
- * @param h handle to the datastore
- * @param rid reservation ID (value of "success" in original continuation
- * from the "reserve" function).
- * @param queue_priority ranking of this request in the priority queue
- * @param max_queue_size at what queue size should this request be dropped
- * (if other requests of higher priority are in the queue)
- * @param queue_priority ranking of this request in the priority queue
- * @param max_queue_size at what queue size should this request be dropped
- * (if other requests of higher priority are in the queue)
- * @param timeout how long to wait at most for a response
- * @param cont continuation to call when done
- * @param cont_cls closure for @a cont
- * @return NULL if the entry was not queued, otherwise a handle that can be used to
- * cancel; note that even if NULL is returned, the callback will be invoked
- * (or rather, will already have been invoked)
- */
- struct GNUNET_DATASTORE_QueueEntry *
- GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
- uint32_t rid, unsigned int queue_priority,
- unsigned int max_queue_size,
- struct GNUNET_TIME_Relative timeout,
- GNUNET_DATASTORE_ContinuationWithStatus cont,
- void *cont_cls)
- {
- struct GNUNET_DATASTORE_QueueEntry *qe;
- struct ReleaseReserveMessage *rrm;
- union QueueContext qc;
- if (cont == NULL)
- cont = &drop_status_cont;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to release reserve %d\n", rid);
- qc.sc.cont = cont;
- qc.sc.cont_cls = cont_cls;
- qe = make_queue_entry (h, sizeof (struct ReleaseReserveMessage),
- queue_priority, max_queue_size, timeout,
- &process_status_message, &qc);
- if (qe == NULL)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Could not create queue entry to release reserve\n");
- return NULL;
- }
- GNUNET_STATISTICS_update (h->stats,
- gettext_noop
- ("# RELEASE RESERVE requests executed"), 1,
- GNUNET_NO);
- rrm = (struct ReleaseReserveMessage *) &qe[1];
- rrm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
- rrm->header.size = htons (sizeof (struct ReleaseReserveMessage));
- rrm->rid = htonl (rid);
- process_queue (h);
- return qe;
- }
- /**
- * Update a value in the datastore.
- *
- * @param h handle to the datastore
- * @param uid identifier for the value
- * @param priority how much to increase the priority of the value
- * @param expiration new expiration value should be MAX of existing and this argument
- * @param queue_priority ranking of this request in the priority queue
- * @param max_queue_size at what queue size should this request be dropped
- * (if other requests of higher priority are in the queue)
- * @param timeout how long to wait at most for a response
- * @param cont continuation to call when done
- * @param cont_cls closure for @a cont
- * @return NULL if the entry was not queued, otherwise a handle that can be used to
- * cancel; note that even if NULL is returned, the callback will be invoked
- * (or rather, will already have been invoked)
- */
- struct GNUNET_DATASTORE_QueueEntry *
- GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, uint64_t uid,
- uint32_t priority,
- struct GNUNET_TIME_Absolute expiration,
- unsigned int queue_priority,
- unsigned int max_queue_size,
- struct GNUNET_TIME_Relative timeout,
- GNUNET_DATASTORE_ContinuationWithStatus cont,
- void *cont_cls)
- {
- struct GNUNET_DATASTORE_QueueEntry *qe;
- struct UpdateMessage *um;
- union QueueContext qc;
- if (cont == NULL)
- cont = &drop_status_cont;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Asked to update entry %llu raising priority by %u and expiration to %s\n",
- uid,
- (unsigned int) priority,
- GNUNET_STRINGS_absolute_time_to_string (expiration));
- qc.sc.cont = cont;
- qc.sc.cont_cls = cont_cls;
- qe = make_queue_entry (h, sizeof (struct UpdateMessage), queue_priority,
- max_queue_size, timeout, &process_status_message, &qc);
- if (qe == NULL)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Could not create queue entry for UPDATE\n");
- return NULL;
- }
- GNUNET_STATISTICS_update (h->stats,
- gettext_noop ("# UPDATE requests executed"), 1,
- GNUNET_NO);
- um = (struct UpdateMessage *) &qe[1];
- um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
- um->header.size = htons (sizeof (struct UpdateMessage));
- um->priority = htonl (priority);
- um->expiration = GNUNET_TIME_absolute_hton (expiration);
- um->uid = GNUNET_htonll (uid);
- process_queue (h);
- return qe;
- }
- /**
- * Explicitly remove some content from the database.
- * The @a cont continuation will be called with `status`
- * #GNUNET_OK" if content was removed, #GNUNET_NO
- * if no matching entry was found and #GNUNET_SYSERR
- * on all other types of errors.
- *
- * @param h handle to the datastore
- * @param key key for the value
- * @param size number of bytes in data
- * @param data content stored
- * @param queue_priority ranking of this request in the priority queue
- * @param max_queue_size at what queue size should this request be dropped
- * (if other requests of higher priority are in the queue)
- * @param timeout how long to wait at most for a response
- * @param cont continuation to call when done
- * @param cont_cls closure for @a cont
- * @return NULL if the entry was not queued, otherwise a handle that can be used to
- * cancel; note that even if NULL is returned, the callback will be invoked
- * (or rather, will already have been invoked)
- */
- struct GNUNET_DATASTORE_QueueEntry *
- GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
- const struct GNUNET_HashCode * key, size_t size,
- const void *data, unsigned int queue_priority,
- unsigned int max_queue_size,
- struct GNUNET_TIME_Relative timeout,
- GNUNET_DATASTORE_ContinuationWithStatus cont,
- void *cont_cls)
- {
- struct GNUNET_DATASTORE_QueueEntry *qe;
- struct DataMessage *dm;
- size_t msize;
- union QueueContext qc;
- if (cont == NULL)
- cont = &drop_status_cont;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to remove %u bytes under key `%s'\n",
- size, GNUNET_h2s (key));
- qc.sc.cont = cont;
- qc.sc.cont_cls = cont_cls;
- msize = sizeof (struct DataMessage) + size;
- GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
- qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
- &process_status_message, &qc);
- if (qe == NULL)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for REMOVE\n");
- return NULL;
- }
- GNUNET_STATISTICS_update (h->stats,
- gettext_noop ("# REMOVE requests executed"), 1,
- GNUNET_NO);
- dm = (struct DataMessage *) &qe[1];
- dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
- dm->header.size = htons (msize);
- dm->rid = htonl (0);
- dm->size = htonl (size);
- dm->type = htonl (0);
- dm->priority = htonl (0);
- dm->anonymity = htonl (0);
- dm->uid = GNUNET_htonll (0);
- dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
- dm->key = *key;
- memcpy (&dm[1], data, size);
- process_queue (h);
- return qe;
- }
- /**
- * Type of a function to call when we receive a message
- * from the service.
- *
- * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
- * @param msg message received, NULL on timeout or fatal error
- */
- static void
- process_result_message (void *cls, const struct GNUNET_MessageHeader *msg)
- {
- struct GNUNET_DATASTORE_Handle *h = cls;
- struct GNUNET_DATASTORE_QueueEntry *qe;
- struct ResultContext rc;
- const struct DataMessage *dm;
- int was_transmitted;
- if (NULL == msg)
- {
- qe = h->queue_head;
- GNUNET_assert (NULL != qe);
- rc = qe->qc.rc;
- was_transmitted = qe->was_transmitted;
- free_queue_entry (qe);
- if (GNUNET_YES == was_transmitted)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Failed to receive response from database.\n");
- do_disconnect (h);
- }
- else
- {
- process_queue (h);
- }
- if (NULL != rc.proc)
- rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
- 0);
- return;
- }
- if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
- {
- GNUNET_break (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader));
- qe = h->queue_head;
- rc = qe->qc.rc;
- GNUNET_assert (GNUNET_YES == qe->was_transmitted);
- free_queue_entry (qe);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Received end of result set, new queue size is %u\n", h->queue_size);
- h->retry_time = GNUNET_TIME_UNIT_ZERO;
- h->result_count = 0;
- process_queue (h);
- if (NULL != rc.proc)
- rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
- 0);
- return;
- }
- qe = h->queue_head;
- GNUNET_assert (NULL != qe);
- rc = qe->qc.rc;
- if (GNUNET_YES != qe->was_transmitted)
- {
- GNUNET_break (0);
- free_queue_entry (qe);
- h->retry_time = GNUNET_TIME_UNIT_ZERO;
- do_disconnect (h);
- if (rc.proc != NULL)
- rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
- 0);
- return;
- }
- if ((ntohs (msg->size) < sizeof (struct DataMessage)) ||
- (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
- (ntohs (msg->size) !=
- sizeof (struct DataMessage) +
- ntohl (((const struct DataMessage *) msg)->size)))
- {
- GNUNET_break (0);
- free_queue_entry (qe);
- h->retry_time = GNUNET_TIME_UNIT_ZERO;
- do_disconnect (h);
- if (rc.proc != NULL)
- rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
- 0);
- return;
- }
- #if INSANE_STATISTICS
- GNUNET_STATISTICS_update (h->stats, gettext_noop ("# Results received"), 1,
- GNUNET_NO);
- #endif
- dm = (const struct DataMessage *) msg;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Received result %llu with type %u and size %u with key %s\n",
- (unsigned long long) GNUNET_ntohll (dm->uid), ntohl (dm->type),
- ntohl (dm->size), GNUNET_h2s (&dm->key));
- free_queue_entry (qe);
- h->retry_time = GNUNET_TIME_UNIT_ZERO;
- process_queue (h);
- if (rc.proc != NULL)
- rc.proc (rc.proc_cls, &dm->key, ntohl (dm->size), &dm[1], ntohl (dm->type),
- ntohl (dm->priority), ntohl (dm->anonymity),
- GNUNET_TIME_absolute_ntoh (dm->expiration),
- GNUNET_ntohll (dm->uid));
- }
- /**
- * Get a random value from the datastore for content replication.
- * Returns a single, random value among those with the highest
- * replication score, lowering positive replication scores by one for
- * the chosen value (if only content with a replication score exists,
- * a random value is returned and replication scores are not changed).
- *
- * @param h handle to the datastore
- * @param queue_priority ranking of this request in the priority queue
- * @param max_queue_size at what queue size should this request be dropped
- * (if other requests of higher priority are in the queue)
- * @param timeout how long to wait at most for a response
- * @param proc function to call on a random value; it
- * will be called once with a value (if available)
- * and always once with a value of NULL.
- * @param proc_cls closure for @a proc
- * @return NULL if the entry was not queued, otherwise a handle that can be used to
- * cancel
- */
- struct GNUNET_DATASTORE_QueueEntry *
- GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
- unsigned int queue_priority,
- unsigned int max_queue_size,
- struct GNUNET_TIME_Relative timeout,
- GNUNET_DATASTORE_DatumProcessor proc,
- void *proc_cls)
- {
- struct GNUNET_DATASTORE_QueueEntry *qe;
- struct GNUNET_MessageHeader *m;
- union QueueContext qc;
- GNUNET_assert (NULL != proc);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Asked to get replication entry in %s\n",
- GNUNET_STRINGS_relative_time_to_string (timeout, GNUNET_YES));
- qc.rc.proc = proc;
- qc.rc.proc_cls = proc_cls;
- qe = make_queue_entry (h, sizeof (struct GNUNET_MessageHeader),
- queue_priority, max_queue_size, timeout,
- &process_result_message, &qc);
- if (NULL == qe)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Could not create queue entry for GET REPLICATION\n");
- return NULL;
- }
- GNUNET_STATISTICS_update (h->stats,
- gettext_noop
- ("# GET REPLICATION requests executed"), 1,
- GNUNET_NO);
- m = (struct GNUNET_MessageHeader *) &qe[1];
- m->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
- m->size = htons (sizeof (struct GNUNET_MessageHeader));
- process_queue (h);
- return qe;
- }
- /**
- * Get a single zero-anonymity value from the datastore.
- *
- * @param h handle to the datastore
- * @param offset offset of the result (modulo num-results); set to
- * a random 64-bit value initially; then increment by
- * one each time; detect that all results have been found by uid
- * being again the first uid ever returned.
- * @param queue_priority ranking of this request in the priority queue
- * @param max_queue_size at what queue size should this request be dropped
- * (if other requests of higher priority are in the queue)
- * @param timeout how long to wait at most for a response
- * @param type allowed type for the operation (never zero)
- * @param proc function to call on a random value; it
- * will be called once with a value (if available)
- * or with NULL if none value exists.
- * @param proc_cls closure for @a proc
- * @return NULL if the entry was not queued, otherwise a handle that can be used to
- * cancel
- */
- struct GNUNET_DATASTORE_QueueEntry *
- GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
- uint64_t offset,
- unsigned int queue_priority,
- unsigned int max_queue_size,
- struct GNUNET_TIME_Relative timeout,
- enum GNUNET_BLOCK_Type type,
- GNUNET_DATASTORE_DatumProcessor proc,
- void *proc_cls)
- {
- struct GNUNET_DATASTORE_QueueEntry *qe;
- struct GetZeroAnonymityMessage *m;
- union QueueContext qc;
- GNUNET_assert (NULL != proc);
- GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Asked to get %llu-th zero-anonymity entry of type %d in %s\n",
- (unsigned long long) offset, type,
- GNUNET_STRINGS_relative_time_to_string (timeout, GNUNET_YES));
- qc.rc.proc = proc;
- qc.rc.proc_cls = proc_cls;
- qe = make_queue_entry (h, sizeof (struct GetZeroAnonymityMessage),
- queue_priority, max_queue_size, timeout,
- &process_result_message, &qc);
- if (NULL == qe)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Could not create queue entry for zero-anonymity procation\n");
- return NULL;
- }
- GNUNET_STATISTICS_update (h->stats,
- gettext_noop
- ("# GET ZERO ANONYMITY requests executed"), 1,
- GNUNET_NO);
- m = (struct GetZeroAnonymityMessage *) &qe[1];
- m->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
- m->header.size = htons (sizeof (struct GetZeroAnonymityMessage));
- m->type = htonl ((uint32_t) type);
- m->offset = GNUNET_htonll (offset);
- process_queue (h);
- return qe;
- }
- /**
- * Get a result for a particular key from the datastore. The processor
- * will only be called once.
- *
- * @param h handle to the datastore
- * @param offset offset of the result (modulo num-results); set to
- * a random 64-bit value initially; then increment by
- * one each time; detect that all results have been found by uid
- * being again the first uid ever returned.
- * @param key maybe NULL (to match all entries)
- * @param type desired type, 0 for any
- * @param queue_priority ranking of this request in the priority queue
- * @param max_queue_size at what queue size should this request be dropped
- * (if other requests of higher priority are in the queue)
- * @param timeout how long to wait at most for a response
- * @param proc function to call on each matching value;
- * will be called once with a NULL value at the end
- * @param proc_cls closure for @a proc
- * @return NULL if the entry was not queued, otherwise a handle that can be used to
- * cancel
- */
- struct GNUNET_DATASTORE_QueueEntry *
- GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, uint64_t offset,
- const struct GNUNET_HashCode * key,
- enum GNUNET_BLOCK_Type type,
- unsigned int queue_priority,
- unsigned int max_queue_size,
- struct GNUNET_TIME_Relative timeout,
- GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
- {
- struct GNUNET_DATASTORE_QueueEntry *qe;
- struct GetMessage *gm;
- union QueueContext qc;
- GNUNET_assert (NULL != proc);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Asked to look for data of type %u under key `%s'\n",
- (unsigned int) type, GNUNET_h2s (key));
- qc.rc.proc = proc;
- qc.rc.proc_cls = proc_cls;
- qe = make_queue_entry (h, sizeof (struct GetMessage), queue_priority,
- max_queue_size, timeout, &process_result_message, &qc);
- if (qe == NULL)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not queue request for `%s'\n",
- GNUNET_h2s (key));
- return NULL;
- }
- #if INSANE_STATISTICS
- GNUNET_STATISTICS_update (h->stats, gettext_noop ("# GET requests executed"),
- 1, GNUNET_NO);
- #endif
- gm = (struct GetMessage *) &qe[1];
- gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET);
- gm->type = htonl (type);
- gm->offset = GNUNET_htonll (offset);
- if (key != NULL)
- {
- gm->header.size = htons (sizeof (struct GetMessage));
- gm->key = *key;
- }
- else
- {
- gm->header.size =
- htons (sizeof (struct GetMessage) - sizeof (struct GNUNET_HashCode));
- }
- process_queue (h);
- return qe;
- }
- /**
- * Cancel a datastore operation. The final callback from the
- * operation must not have been done yet.
- *
- * @param qe operation to cancel
- */
- void
- GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
- {
- struct GNUNET_DATASTORE_Handle *h;
- GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
- h = qe->h;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Pending DATASTORE request %p cancelled (%d, %d)\n", qe,
- qe->was_transmitted, h->queue_head == qe);
- if (GNUNET_YES == qe->was_transmitted)
- {
- free_queue_entry (qe);
- h->skip_next_messages++;
- return;
- }
- free_queue_entry (qe);
- process_queue (h);
- }
- /* end of datastore_api.c */
|