12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316 |
- /*
- This file is part of GNUnet.
- Copyright (C) 2012-2019 GNUnet e.V.
- GNUnet is free software: you can redistribute it and/or modify it
- under the terms of the GNU Affero General Public License as published
- by the Free Software Foundation, either version 3 of the License,
- or (at your option) any later version.
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Affero General Public License for more details.
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
- SPDX-License-Identifier: AGPL3.0-or-later
- */
- /**
- * @author Florian Dold
- * @file util/mq.c
- * @brief general purpose request queue
- */
- #include "platform.h"
- #include "gnunet_util_lib.h"
- #define LOG(kind, ...) GNUNET_log_from (kind, "util-mq", __VA_ARGS__)
- struct GNUNET_MQ_Envelope
- {
- /**
- * Messages are stored in a linked list.
- * Each queue has its own list of envelopes.
- */
- struct GNUNET_MQ_Envelope *next;
- /**
- * Messages are stored in a linked list
- * Each queue has its own list of envelopes.
- */
- struct GNUNET_MQ_Envelope *prev;
- /**
- * Actual allocated message header.
- * The GNUNET_MQ_Envelope header is allocated at
- * the end of the message.
- */
- struct GNUNET_MessageHeader *mh;
- /**
- * Queue the message is queued in, NULL if message is not queued.
- */
- struct GNUNET_MQ_Handle *parent_queue;
- /**
- * Called after the message was sent irrevocably.
- */
- GNUNET_SCHEDULER_TaskCallback sent_cb;
- /**
- * Closure for @e send_cb
- */
- void *sent_cls;
- /**
- * Flags that were set for this envelope by
- * #GNUNET_MQ_env_set_options(). Only valid if
- * @e have_custom_options is set.
- */
- enum GNUNET_MQ_PriorityPreferences priority;
- /**
- * Did the application call #GNUNET_MQ_env_set_options()?
- */
- int have_custom_options;
- };
- /**
- * Handle to a message queue.
- */
- struct GNUNET_MQ_Handle
- {
- /**
- * Handlers array, or NULL if the queue should not receive messages
- */
- struct GNUNET_MQ_MessageHandler *handlers;
- /**
- * Actual implementation of message sending,
- * called when a message is added
- */
- GNUNET_MQ_SendImpl send_impl;
- /**
- * Implementation-dependent queue destruction function
- */
- GNUNET_MQ_DestroyImpl destroy_impl;
- /**
- * Implementation-dependent send cancel function
- */
- GNUNET_MQ_CancelImpl cancel_impl;
- /**
- * Implementation-specific state
- */
- void *impl_state;
- /**
- * Callback will be called when an error occurs.
- */
- GNUNET_MQ_ErrorHandler error_handler;
- /**
- * Closure for the error handler.
- */
- void *error_handler_cls;
- /**
- * Task to asynchronously run #impl_send_continue().
- */
- struct GNUNET_SCHEDULER_Task *send_task;
- /**
- * Linked list of messages pending to be sent
- */
- struct GNUNET_MQ_Envelope *envelope_head;
- /**
- * Linked list of messages pending to be sent
- */
- struct GNUNET_MQ_Envelope *envelope_tail;
- /**
- * Message that is currently scheduled to be
- * sent. Not the head of the message queue, as the implementation
- * needs to know if sending has been already scheduled or not.
- */
- struct GNUNET_MQ_Envelope *current_envelope;
- /**
- * Map of associations, lazily allocated
- */
- struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
- /**
- * Functions to call on queue destruction; kept in a DLL.
- */
- struct GNUNET_MQ_DestroyNotificationHandle *dnh_head;
- /**
- * Functions to call on queue destruction; kept in a DLL.
- */
- struct GNUNET_MQ_DestroyNotificationHandle *dnh_tail;
- /**
- * Flags that were set for this queue by
- * #GNUNET_MQ_set_options(). Default is 0.
- */
- enum GNUNET_MQ_PriorityPreferences priority;
- /**
- * Next id that should be used for the @e assoc_map,
- * initialized lazily to a random value together with
- * @e assoc_map
- */
- uint32_t assoc_id;
- /**
- * Number of entries we have in the envelope-DLL.
- */
- unsigned int queue_length;
- /**
- * #GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
- * FIXME: is this dead?
- */
- int evacuate_called;
- /**
- * #GNUNET_YES if GNUNET_MQ_impl_send_in_flight() was called.
- */
- int in_flight;
- };
- /**
- * Call the message message handler that was registered
- * for the type of the given message in the given message queue.
- *
- * This function is indended to be used for the implementation
- * of message queues.
- *
- * @param mq message queue with the handlers
- * @param mh message to dispatch
- */
- void
- GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
- const struct GNUNET_MessageHeader *mh)
- {
- int ret;
- ret = GNUNET_MQ_handle_message (mq->handlers, mh);
- if (GNUNET_SYSERR == ret)
- {
- GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_MALFORMED);
- return;
- }
- }
- /**
- * Call the message message handler that was registered
- * for the type of the given message in the given @a handlers list.
- *
- * This function is indended to be used for the implementation
- * of message queues.
- *
- * @param handlers a set of handlers
- * @param mh message to dispatch
- * @return #GNUNET_OK on success, #GNUNET_NO if no handler matched,
- * #GNUNET_SYSERR if message was rejected by check function
- */
- int
- GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers,
- const struct GNUNET_MessageHeader *mh)
- {
- const struct GNUNET_MQ_MessageHandler *handler;
- int handled = GNUNET_NO;
- uint16_t msize = ntohs (mh->size);
- uint16_t mtype = ntohs (mh->type);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Received message of type %u and size %u\n",
- mtype,
- msize);
- if (NULL == handlers)
- goto done;
- for (handler = handlers; NULL != handler->cb; handler++)
- {
- if (handler->type == mtype)
- {
- handled = GNUNET_YES;
- if ((handler->expected_size > msize) ||
- ((handler->expected_size != msize) && (NULL == handler->mv)))
- {
- /* Too small, or not an exact size and
- no 'mv' handler to check rest */
- LOG (GNUNET_ERROR_TYPE_ERROR,
- "Received malformed message of type %u\n",
- (unsigned int) handler->type);
- return GNUNET_SYSERR;
- }
- if ((NULL == handler->mv) ||
- (GNUNET_OK == handler->mv (handler->cls, mh)))
- {
- /* message well-formed, pass to handler */
- handler->cb (handler->cls, mh);
- }
- else
- {
- /* Message rejected by check routine */
- LOG (GNUNET_ERROR_TYPE_ERROR,
- "Received malformed message of type %u\n",
- (unsigned int) handler->type);
- return GNUNET_SYSERR;
- }
- break;
- }
- }
- done:
- if (GNUNET_NO == handled)
- {
- LOG (GNUNET_ERROR_TYPE_INFO,
- "No handler for message of type %u and size %u\n",
- mtype,
- msize);
- return GNUNET_NO;
- }
- return GNUNET_OK;
- }
- /**
- * Call the error handler of a message queue with the given
- * error code. If there is no error handler, log a warning.
- *
- * This function is intended to be used by the implementation
- * of message queues.
- *
- * @param mq message queue
- * @param error the error type
- */
- void
- GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq, enum GNUNET_MQ_Error error)
- {
- if (NULL == mq->error_handler)
- {
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "Got error %d, but no handler installed\n",
- (int) error);
- return;
- }
- mq->error_handler (mq->error_handler_cls, error);
- }
- /**
- * Discard the message queue message, free all
- * allocated resources. Must be called in the event
- * that a message is created but should not actually be sent.
- *
- * @param mqm the message to discard
- */
- void
- GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev)
- {
- GNUNET_assert (NULL == ev->parent_queue);
- GNUNET_free (ev);
- }
- /**
- * Obtain the current length of the message queue.
- *
- * @param mq queue to inspect
- * @return number of queued, non-transmitted messages
- */
- unsigned int
- GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
- {
- if (GNUNET_YES != mq->in_flight)
- {
- return mq->queue_length;
- }
- return mq->queue_length - 1;
- }
- /**
- * Send a message with the given message queue.
- * May only be called once per message.
- *
- * @param mq message queue
- * @param ev the envelope with the message to send.
- */
- void
- GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
- {
- GNUNET_assert (NULL != mq);
- GNUNET_assert (NULL == ev->parent_queue);
- mq->queue_length++;
- if (mq->queue_length >= 10000)
- {
- /* This would seem like a bug... */
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "MQ with %u entries extended by message of type %u (FC broken?)\n",
- (unsigned int) mq->queue_length,
- (unsigned int) ntohs (ev->mh->type));
- }
- ev->parent_queue = mq;
- /* is the implementation busy? queue it! */
- if ((NULL != mq->current_envelope) || (NULL != mq->send_task))
- {
- GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, mq->envelope_tail, ev);
- return;
- }
- GNUNET_assert (NULL == mq->envelope_head);
- mq->current_envelope = ev;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "sending message of type %u, queue empty (MQ: %p)\n",
- ntohs (ev->mh->type),
- mq);
- mq->send_impl (mq, ev->mh, mq->impl_state);
- }
- /**
- * Remove the first envelope that has not yet been sent from the message
- * queue and return it.
- *
- * @param mq queue to remove envelope from
- * @return NULL if queue is empty (or has no envelope that is not under transmission)
- */
- struct GNUNET_MQ_Envelope *
- GNUNET_MQ_unsent_head (struct GNUNET_MQ_Handle *mq)
- {
- struct GNUNET_MQ_Envelope *env;
- env = mq->envelope_head;
- GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, env);
- mq->queue_length--;
- env->parent_queue = NULL;
- return env;
- }
- /**
- * Function to copy an envelope. The envelope must not yet
- * be in any queue or have any options or callbacks set.
- *
- * @param env envelope to copy
- * @return copy of @a env
- */
- struct GNUNET_MQ_Envelope *
- GNUNET_MQ_env_copy (struct GNUNET_MQ_Envelope *env)
- {
- GNUNET_assert (NULL == env->next);
- GNUNET_assert (NULL == env->parent_queue);
- GNUNET_assert (NULL == env->sent_cb);
- GNUNET_assert (GNUNET_NO == env->have_custom_options);
- return GNUNET_MQ_msg_copy (env->mh);
- }
- /**
- * Send a copy of a message with the given message queue.
- * Can be called repeatedly on the same envelope.
- *
- * @param mq message queue
- * @param ev the envelope with the message to send.
- */
- void
- GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq,
- const struct GNUNET_MQ_Envelope *ev)
- {
- struct GNUNET_MQ_Envelope *env;
- uint16_t msize;
- msize = ntohs (ev->mh->size);
- env = GNUNET_malloc (sizeof(struct GNUNET_MQ_Envelope) + msize);
- env->mh = (struct GNUNET_MessageHeader *) &env[1];
- env->sent_cb = ev->sent_cb;
- env->sent_cls = ev->sent_cls;
- GNUNET_memcpy (&env[1], ev->mh, msize);
- GNUNET_MQ_send (mq, env);
- }
- /**
- * Task run to call the send implementation for the next queued
- * message, if any. Only useful for implementing message queues,
- * results in undefined behavior if not used carefully.
- *
- * @param cls message queue to send the next message with
- */
- static void
- impl_send_continue (void *cls)
- {
- struct GNUNET_MQ_Handle *mq = cls;
- mq->send_task = NULL;
- /* call is only valid if we're actually currently sending
- * a message */
- if (NULL == mq->envelope_head)
- return;
- mq->current_envelope = mq->envelope_head;
- GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
- mq->envelope_tail,
- mq->current_envelope);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "sending message of type %u from queue\n",
- ntohs (mq->current_envelope->mh->type));
- mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
- }
- /**
- * Call the send implementation for the next queued message, if any.
- * Only useful for implementing message queues, results in undefined
- * behavior if not used carefully.
- *
- * @param mq message queue to send the next message with
- */
- void
- GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
- {
- struct GNUNET_MQ_Envelope *current_envelope;
- GNUNET_SCHEDULER_TaskCallback cb;
- GNUNET_assert (0 < mq->queue_length);
- mq->queue_length--;
- mq->in_flight = GNUNET_NO;
- current_envelope = mq->current_envelope;
- current_envelope->parent_queue = NULL;
- mq->current_envelope = NULL;
- GNUNET_assert (NULL == mq->send_task);
- mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, mq);
- if (NULL != (cb = current_envelope->sent_cb))
- {
- current_envelope->sent_cb = NULL;
- cb (current_envelope->sent_cls);
- }
- GNUNET_free (current_envelope);
- }
- /**
- * Call the send notification for the current message, but do not
- * try to send the next message until #GNUNET_MQ_impl_send_continue
- * is called.
- *
- * Only useful for implementing message queues, results in undefined
- * behavior if not used carefully.
- *
- * @param mq message queue to send the next message with
- */
- void
- GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq)
- {
- struct GNUNET_MQ_Envelope *current_envelope;
- GNUNET_SCHEDULER_TaskCallback cb;
- mq->in_flight = GNUNET_YES;
- /* call is only valid if we're actually currently sending
- * a message */
- current_envelope = mq->current_envelope;
- GNUNET_assert (NULL != current_envelope);
- /* can't call cancel from now on anymore */
- current_envelope->parent_queue = NULL;
- if (NULL != (cb = current_envelope->sent_cb))
- {
- current_envelope->sent_cb = NULL;
- cb (current_envelope->sent_cls);
- }
- }
- /**
- * Create a message queue for the specified handlers.
- *
- * @param send function the implements sending messages
- * @param destroy function that implements destroying the queue
- * @param cancel function that implements canceling a message
- * @param impl_state for the queue, passed to 'send' and 'destroy'
- * @param handlers array of message handlers
- * @param error_handler handler for read and write errors
- * @param error_handler_cls closure for @a error_handler
- * @return a new message queue
- */
- struct GNUNET_MQ_Handle *
- GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
- GNUNET_MQ_DestroyImpl destroy,
- GNUNET_MQ_CancelImpl cancel,
- void *impl_state,
- const struct GNUNET_MQ_MessageHandler *handlers,
- GNUNET_MQ_ErrorHandler error_handler,
- void *error_handler_cls)
- {
- struct GNUNET_MQ_Handle *mq;
- mq = GNUNET_new (struct GNUNET_MQ_Handle);
- mq->send_impl = send;
- mq->destroy_impl = destroy;
- mq->cancel_impl = cancel;
- mq->handlers = GNUNET_MQ_copy_handlers (handlers);
- mq->error_handler = error_handler;
- mq->error_handler_cls = error_handler_cls;
- mq->impl_state = impl_state;
- return mq;
- }
- /**
- * Change the closure argument in all of the `handlers` of the
- * @a mq.
- *
- * @param mq to modify
- * @param handlers_cls new closure to use
- */
- void
- GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq, void *handlers_cls)
- {
- if (NULL == mq->handlers)
- return;
- for (unsigned int i = 0; NULL != mq->handlers[i].cb; i++)
- mq->handlers[i].cls = handlers_cls;
- }
- /**
- * Get the message that should currently be sent.
- * Fails if there is no current message.
- * Only useful for implementing message queues,
- * results in undefined behavior if not used carefully.
- *
- * @param mq message queue with the current message
- * @return message to send, never NULL
- */
- const struct GNUNET_MessageHeader *
- GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
- {
- GNUNET_assert (NULL != mq->current_envelope);
- GNUNET_assert (NULL != mq->current_envelope->mh);
- return mq->current_envelope->mh;
- }
- /**
- * Get the implementation state associated with the
- * message queue.
- *
- * While the GNUNET_MQ_Impl* callbacks receive the
- * implementation state, continuations that are scheduled
- * by the implementation function often only have one closure
- * argument, with this function it is possible to get at the
- * implementation state when only passing the GNUNET_MQ_Handle
- * as closure.
- *
- * @param mq message queue with the current message
- * @return message to send, never NULL
- */
- void *
- GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
- {
- return mq->impl_state;
- }
- struct GNUNET_MQ_Envelope *
- GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
- {
- struct GNUNET_MQ_Envelope *ev;
- ev = GNUNET_malloc (size + sizeof(struct GNUNET_MQ_Envelope));
- ev->mh = (struct GNUNET_MessageHeader *) &ev[1];
- ev->mh->size = htons (size);
- ev->mh->type = htons (type);
- if (NULL != mhp)
- *mhp = ev->mh;
- return ev;
- }
- /**
- * Create a new envelope by copying an existing message.
- *
- * @param hdr header of the message to copy
- * @return envelope containing @a hdr
- */
- struct GNUNET_MQ_Envelope *
- GNUNET_MQ_msg_copy (const struct GNUNET_MessageHeader *hdr)
- {
- struct GNUNET_MQ_Envelope *mqm;
- uint16_t size = ntohs (hdr->size);
- mqm = GNUNET_malloc (sizeof(*mqm) + size);
- mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
- GNUNET_memcpy (mqm->mh, hdr, size);
- return mqm;
- }
- /**
- * Implementation of the #GNUNET_MQ_msg_nested_mh macro.
- *
- * @param mhp pointer to the message header pointer that will be changed to allocate at
- * the newly allocated space for the message.
- * @param base_size size of the data before the nested message
- * @param type type of the message in the envelope
- * @param nested_mh the message to append to the message after base_size
- */
- struct GNUNET_MQ_Envelope *
- GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
- uint16_t base_size,
- uint16_t type,
- const struct GNUNET_MessageHeader *nested_mh)
- {
- struct GNUNET_MQ_Envelope *mqm;
- uint16_t size;
- if (NULL == nested_mh)
- return GNUNET_MQ_msg_ (mhp, base_size, type);
- size = base_size + ntohs (nested_mh->size);
- /* check for uint16_t overflow */
- if (size < base_size)
- return NULL;
- mqm = GNUNET_MQ_msg_ (mhp, size, type);
- GNUNET_memcpy ((char *) mqm->mh + base_size,
- nested_mh,
- ntohs (nested_mh->size));
- return mqm;
- }
- /**
- * Associate the assoc_data in mq with a unique request id.
- *
- * @param mq message queue, id will be unique for the queue
- * @param assoc_data to associate
- */
- uint32_t
- GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, void *assoc_data)
- {
- uint32_t id;
- if (NULL == mq->assoc_map)
- {
- mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
- mq->assoc_id = 1;
- }
- id = mq->assoc_id++;
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap32_put (
- mq->assoc_map,
- id,
- assoc_data,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- return id;
- }
- /**
- * Get the data associated with a @a request_id in a queue
- *
- * @param mq the message queue with the association
- * @param request_id the request id we are interested in
- * @return the associated data
- */
- void *
- GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
- {
- if (NULL == mq->assoc_map)
- return NULL;
- return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
- }
- /**
- * Remove the association for a @a request_id
- *
- * @param mq the message queue with the association
- * @param request_id the request id we want to remove
- * @return the associated data
- */
- void *
- GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
- {
- void *val;
- if (NULL == mq->assoc_map)
- return NULL;
- val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
- GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map, request_id);
- return val;
- }
- /**
- * Call a callback once the envelope has been sent, that is,
- * sending it can not be canceled anymore.
- * There can be only one notify sent callback per envelope.
- *
- * @param ev message to call the notify callback for
- * @param cb the notify callback
- * @param cb_cls closure for the callback
- */
- void
- GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev,
- GNUNET_SCHEDULER_TaskCallback cb,
- void *cb_cls)
- {
- /* allow setting *OR* clearing callback */
- GNUNET_assert ((NULL == ev->sent_cb) || (NULL == cb));
- ev->sent_cb = cb;
- ev->sent_cls = cb_cls;
- }
- /**
- * Handle we return for callbacks registered to be
- * notified when #GNUNET_MQ_destroy() is called on a queue.
- */
- struct GNUNET_MQ_DestroyNotificationHandle
- {
- /**
- * Kept in a DLL.
- */
- struct GNUNET_MQ_DestroyNotificationHandle *prev;
- /**
- * Kept in a DLL.
- */
- struct GNUNET_MQ_DestroyNotificationHandle *next;
- /**
- * Queue to notify about.
- */
- struct GNUNET_MQ_Handle *mq;
- /**
- * Function to call.
- */
- GNUNET_SCHEDULER_TaskCallback cb;
- /**
- * Closure for @e cb.
- */
- void *cb_cls;
- };
- /**
- * Destroy the message queue.
- *
- * @param mq message queue to destroy
- */
- void
- GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
- {
- struct GNUNET_MQ_DestroyNotificationHandle *dnh;
- if (NULL != mq->destroy_impl)
- {
- mq->destroy_impl (mq, mq->impl_state);
- }
- if (NULL != mq->send_task)
- {
- GNUNET_SCHEDULER_cancel (mq->send_task);
- mq->send_task = NULL;
- }
- while (NULL != mq->envelope_head)
- {
- struct GNUNET_MQ_Envelope *ev;
- ev = mq->envelope_head;
- ev->parent_queue = NULL;
- GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
- GNUNET_assert (0 < mq->queue_length);
- mq->queue_length--;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "MQ destroy drops message of type %u\n",
- ntohs (ev->mh->type));
- GNUNET_MQ_discard (ev);
- }
- if (NULL != mq->current_envelope)
- {
- /* we can only discard envelopes that
- * are not queued! */
- mq->current_envelope->parent_queue = NULL;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "MQ destroy drops current message of type %u\n",
- ntohs (mq->current_envelope->mh->type));
- GNUNET_MQ_discard (mq->current_envelope);
- mq->current_envelope = NULL;
- GNUNET_assert (0 < mq->queue_length);
- mq->queue_length--;
- }
- GNUNET_assert (0 == mq->queue_length);
- while (NULL != (dnh = mq->dnh_head))
- {
- dnh->cb (dnh->cb_cls);
- GNUNET_MQ_destroy_notify_cancel (dnh);
- }
- if (NULL != mq->assoc_map)
- {
- GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
- mq->assoc_map = NULL;
- }
- GNUNET_free_non_null (mq->handlers);
- GNUNET_free (mq);
- }
- const struct GNUNET_MessageHeader *
- GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh,
- uint16_t base_size)
- {
- uint16_t whole_size;
- uint16_t nested_size;
- const struct GNUNET_MessageHeader *nested_msg;
- whole_size = ntohs (mh->size);
- GNUNET_assert (whole_size >= base_size);
- nested_size = whole_size - base_size;
- if (0 == nested_size)
- return NULL;
- if (nested_size < sizeof(struct GNUNET_MessageHeader))
- {
- GNUNET_break_op (0);
- return NULL;
- }
- nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
- if (ntohs (nested_msg->size) != nested_size)
- {
- GNUNET_break_op (0);
- return NULL;
- }
- return nested_msg;
- }
- /**
- * Cancel sending the message. Message must have been sent with
- * #GNUNET_MQ_send before. May not be called after the notify sent
- * callback has been called
- *
- * @param ev queued envelope to cancel
- */
- void
- GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
- {
- struct GNUNET_MQ_Handle *mq = ev->parent_queue;
- GNUNET_assert (NULL != mq);
- GNUNET_assert (NULL != mq->cancel_impl);
- mq->evacuate_called = GNUNET_NO;
- if (mq->current_envelope == ev)
- {
- /* complex case, we already started with transmitting
- the message using the callbacks. */
- GNUNET_assert (GNUNET_NO == mq->in_flight);
- GNUNET_assert (0 < mq->queue_length);
- mq->queue_length--;
- mq->cancel_impl (mq, mq->impl_state);
- /* continue sending the next message, if any */
- mq->current_envelope = mq->envelope_head;
- if (NULL != mq->current_envelope)
- {
- GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
- mq->envelope_tail,
- mq->current_envelope);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "sending canceled message of type %u queue\n",
- ntohs (ev->mh->type));
- mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
- }
- }
- else
- {
- /* simple case, message is still waiting in the queue */
- GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
- GNUNET_assert (0 < mq->queue_length);
- mq->queue_length--;
- }
- if (GNUNET_YES != mq->evacuate_called)
- {
- ev->parent_queue = NULL;
- ev->mh = NULL;
- /* also frees ev */
- GNUNET_free (ev);
- }
- }
- /**
- * Function to obtain the current envelope
- * from within #GNUNET_MQ_SendImpl implementations.
- *
- * @param mq message queue to interrogate
- * @return the current envelope
- */
- struct GNUNET_MQ_Envelope *
- GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq)
- {
- return mq->current_envelope;
- }
- /**
- * Function to obtain the last envelope in the queue.
- *
- * @param mq message queue to interrogate
- * @return the last envelope in the queue
- */
- struct GNUNET_MQ_Envelope *
- GNUNET_MQ_get_last_envelope (struct GNUNET_MQ_Handle *mq)
- {
- if (NULL != mq->envelope_tail)
- return mq->envelope_tail;
- return mq->current_envelope;
- }
- /**
- * Set application-specific preferences for this envelope.
- * Overrides the options set for the queue with
- * #GNUNET_MQ_set_options() for this message only.
- *
- * @param env message to set options for
- * @param pp priorities and preferences to apply
- */
- void
- GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env,
- enum GNUNET_MQ_PriorityPreferences pp)
- {
- env->priority = pp;
- env->have_custom_options = GNUNET_YES;
- }
- /**
- * Get application-specific options for this envelope.
- *
- * @param env message to set options for
- * @return priorities and preferences to apply for @a env
- */
- enum GNUNET_MQ_PriorityPreferences
- GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env)
- {
- struct GNUNET_MQ_Handle *mq = env->parent_queue;
- if (GNUNET_YES == env->have_custom_options)
- return env->priority;
- if (NULL == mq)
- return 0;
- return mq->priority;
- }
- /**
- * Combine performance preferences set for different
- * envelopes that are being combined into one larger envelope.
- *
- * @param p1 one set of preferences
- * @param p2 second set of preferences
- * @return combined priority and preferences to use
- */
- enum GNUNET_MQ_PriorityPreferences
- GNUNET_MQ_env_combine_options (enum GNUNET_MQ_PriorityPreferences p1,
- enum GNUNET_MQ_PriorityPreferences p2)
- {
- enum GNUNET_MQ_PriorityPreferences ret;
- ret = GNUNET_MAX (p1 & GNUNET_MQ_PRIORITY_MASK, p2 & GNUNET_MQ_PRIORITY_MASK);
- ret |= ((p1 & GNUNET_MQ_PREF_UNRELIABLE) & (p2 & GNUNET_MQ_PREF_UNRELIABLE));
- ret |=
- ((p1 & GNUNET_MQ_PREF_LOW_LATENCY) | (p2 & GNUNET_MQ_PREF_LOW_LATENCY));
- ret |=
- ((p1 & GNUNET_MQ_PREF_CORK_ALLOWED) & (p2 & GNUNET_MQ_PREF_CORK_ALLOWED));
- ret |= ((p1 & GNUNET_MQ_PREF_GOODPUT) & (p2 & GNUNET_MQ_PREF_GOODPUT));
- ret |=
- ((p1 & GNUNET_MQ_PREF_OUT_OF_ORDER) & (p2 & GNUNET_MQ_PREF_OUT_OF_ORDER));
- return ret;
- }
- /**
- * Set application-specific default options for this queue.
- *
- * @param mq message queue to set options for
- * @param pp priorities and preferences to apply
- */
- void
- GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq,
- enum GNUNET_MQ_PriorityPreferences pp)
- {
- mq->priority = pp;
- }
- /**
- * Obtain message contained in envelope.
- *
- * @param env the envelope
- * @return message contained in the envelope
- */
- const struct GNUNET_MessageHeader *
- GNUNET_MQ_env_get_msg (const struct GNUNET_MQ_Envelope *env)
- {
- return env->mh;
- }
- /**
- * Return next envelope in queue.
- *
- * @param env a queued envelope
- * @return next one, or NULL
- */
- const struct GNUNET_MQ_Envelope *
- GNUNET_MQ_env_next (const struct GNUNET_MQ_Envelope *env)
- {
- return env->next;
- }
- /**
- * Register function to be called whenever @a mq is being
- * destroyed.
- *
- * @param mq message queue to watch
- * @param cb function to call on @a mq destruction
- * @param cb_cls closure for @a cb
- * @return handle for #GNUNET_MQ_destroy_notify_cancel().
- */
- struct GNUNET_MQ_DestroyNotificationHandle *
- GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq,
- GNUNET_SCHEDULER_TaskCallback cb,
- void *cb_cls)
- {
- struct GNUNET_MQ_DestroyNotificationHandle *dnh;
- dnh = GNUNET_new (struct GNUNET_MQ_DestroyNotificationHandle);
- dnh->mq = mq;
- dnh->cb = cb;
- dnh->cb_cls = cb_cls;
- GNUNET_CONTAINER_DLL_insert (mq->dnh_head, mq->dnh_tail, dnh);
- return dnh;
- }
- /**
- * Cancel registration from #GNUNET_MQ_destroy_notify().
- *
- * @param dnh handle for registration to cancel
- */
- void
- GNUNET_MQ_destroy_notify_cancel (
- struct GNUNET_MQ_DestroyNotificationHandle *dnh)
- {
- struct GNUNET_MQ_Handle *mq = dnh->mq;
- GNUNET_CONTAINER_DLL_remove (mq->dnh_head, mq->dnh_tail, dnh);
- GNUNET_free (dnh);
- }
- /**
- * Insert @a env into the envelope DLL starting at @a env_head
- * Note that @a env must not be in any MQ while this function
- * is used with DLLs defined outside of the MQ module. This
- * is just in case some application needs to also manage a
- * FIFO of envelopes independent of MQ itself and wants to
- * re-use the pointers internal to @a env. Use with caution.
- *
- * @param[in|out] env_head of envelope DLL
- * @param[in|out] env_tail tail of envelope DLL
- * @param[in|out] env element to insert at the tail
- */
- void
- GNUNET_MQ_dll_insert_head (struct GNUNET_MQ_Envelope **env_head,
- struct GNUNET_MQ_Envelope **env_tail,
- struct GNUNET_MQ_Envelope *env)
- {
- GNUNET_CONTAINER_DLL_insert (*env_head, *env_tail, env);
- }
- /**
- * Insert @a env into the envelope DLL starting at @a env_head
- * Note that @a env must not be in any MQ while this function
- * is used with DLLs defined outside of the MQ module. This
- * is just in case some application needs to also manage a
- * FIFO of envelopes independent of MQ itself and wants to
- * re-use the pointers internal to @a env. Use with caution.
- *
- * @param[in|out] env_head of envelope DLL
- * @param[in|out] env_tail tail of envelope DLL
- * @param[in|out] env element to insert at the tail
- */
- void
- GNUNET_MQ_dll_insert_tail (struct GNUNET_MQ_Envelope **env_head,
- struct GNUNET_MQ_Envelope **env_tail,
- struct GNUNET_MQ_Envelope *env)
- {
- GNUNET_CONTAINER_DLL_insert_tail (*env_head, *env_tail, env);
- }
- /**
- * Remove @a env from the envelope DLL starting at @a env_head.
- * Note that @a env must not be in any MQ while this function
- * is used with DLLs defined outside of the MQ module. This
- * is just in case some application needs to also manage a
- * FIFO of envelopes independent of MQ itself and wants to
- * re-use the pointers internal to @a env. Use with caution.
- *
- * @param[in|out] env_head of envelope DLL
- * @param[in|out] env_tail tail of envelope DLL
- * @param[in|out] env element to remove from the DLL
- */
- void
- GNUNET_MQ_dll_remove (struct GNUNET_MQ_Envelope **env_head,
- struct GNUNET_MQ_Envelope **env_tail,
- struct GNUNET_MQ_Envelope *env)
- {
- GNUNET_CONTAINER_DLL_remove (*env_head, *env_tail, env);
- }
- /**
- * Copy an array of handlers.
- *
- * Useful if the array has been delared in local memory and needs to be
- * persisted for future use.
- *
- * @param handlers Array of handlers to be copied. Can be NULL (nothing done).
- * @return A newly allocated array of handlers.
- * Needs to be freed with #GNUNET_free.
- */
- struct GNUNET_MQ_MessageHandler *
- GNUNET_MQ_copy_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
- {
- struct GNUNET_MQ_MessageHandler *copy;
- unsigned int count;
- if (NULL == handlers)
- return NULL;
- count = GNUNET_MQ_count_handlers (handlers);
- copy = GNUNET_new_array (count + 1, struct GNUNET_MQ_MessageHandler);
- GNUNET_memcpy (copy,
- handlers,
- count * sizeof(struct GNUNET_MQ_MessageHandler));
- return copy;
- }
- /**
- * Copy an array of handlers, appending AGPL handler.
- *
- * Useful if the array has been delared in local memory and needs to be
- * persisted for future use.
- *
- * @param handlers Array of handlers to be copied. Can be NULL (nothing done).
- * @param agpl_handler function to call for AGPL handling
- * @param agpl_cls closure for @a agpl_handler
- * @return A newly allocated array of handlers.
- * Needs to be freed with #GNUNET_free.
- */
- struct GNUNET_MQ_MessageHandler *
- GNUNET_MQ_copy_handlers2 (const struct GNUNET_MQ_MessageHandler *handlers,
- GNUNET_MQ_MessageCallback agpl_handler,
- void *agpl_cls)
- {
- struct GNUNET_MQ_MessageHandler *copy;
- unsigned int count;
- if (NULL == handlers)
- return NULL;
- count = GNUNET_MQ_count_handlers (handlers);
- copy = GNUNET_new_array (count + 2, struct GNUNET_MQ_MessageHandler);
- GNUNET_memcpy (copy,
- handlers,
- count * sizeof(struct GNUNET_MQ_MessageHandler));
- copy[count].mv = NULL;
- copy[count].cb = agpl_handler;
- copy[count].cls = agpl_cls;
- copy[count].type = GNUNET_MESSAGE_TYPE_REQUEST_AGPL;
- copy[count].expected_size = sizeof(struct GNUNET_MessageHeader);
- return copy;
- }
- /**
- * Count the handlers in a handler array.
- *
- * @param handlers Array of handlers to be counted.
- * @return The number of handlers in the array.
- */
- unsigned int
- GNUNET_MQ_count_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
- {
- unsigned int i;
- if (NULL == handlers)
- return 0;
- for (i = 0; NULL != handlers[i].cb; i++)
- ;
- return i;
- }
- /**
- * Convert an `enum GNUNET_MQ_PreferenceType` to a string
- *
- * @param type the preference type
- * @return a string or NULL if invalid
- */
- const char *
- GNUNET_MQ_preference_to_string (enum GNUNET_MQ_PreferenceKind type)
- {
- switch (type)
- {
- case GNUNET_MQ_PREFERENCE_NONE:
- return "NONE";
- case GNUNET_MQ_PREFERENCE_BANDWIDTH:
- return "BANDWIDTH";
- case GNUNET_MQ_PREFERENCE_LATENCY:
- return "LATENCY";
- case GNUNET_MQ_PREFERENCE_RELIABILITY:
- return "RELIABILITY";
- }
- ;
- return NULL;
- }
- /* end of mq.c */
|