2
0

mq.c 34 KB


  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2012-2019 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @author Florian Dold
  18. * @file util/mq.c
  19. * @brief general purpose request queue
  20. */
  21. #include "platform.h"
  22. #include "gnunet_util_lib.h"
  23. #define LOG(kind, ...) GNUNET_log_from (kind, "util-mq", __VA_ARGS__)
  24. struct GNUNET_MQ_Envelope
  25. {
  26. /**
  27. * Messages are stored in a linked list.
  28. * Each queue has its own list of envelopes.
  29. */
  30. struct GNUNET_MQ_Envelope *next;
  31. /**
  32. * Messages are stored in a linked list
  33. * Each queue has its own list of envelopes.
  34. */
  35. struct GNUNET_MQ_Envelope *prev;
  36. /**
  37. * Actual allocated message header.
  38. * The GNUNET_MQ_Envelope header is allocated at
  39. * the end of the message.
  40. */
  41. struct GNUNET_MessageHeader *mh;
  42. /**
  43. * Queue the message is queued in, NULL if message is not queued.
  44. */
  45. struct GNUNET_MQ_Handle *parent_queue;
  46. /**
  47. * Called after the message was sent irrevocably.
  48. */
  49. GNUNET_SCHEDULER_TaskCallback sent_cb;
  50. /**
  51. * Closure for @e send_cb
  52. */
  53. void *sent_cls;
  54. /**
  55. * Flags that were set for this envelope by
  56. * #GNUNET_MQ_env_set_options(). Only valid if
  57. * @e have_custom_options is set.
  58. */
  59. enum GNUNET_MQ_PriorityPreferences priority;
  60. /**
  61. * Did the application call #GNUNET_MQ_env_set_options()?
  62. */
  63. int have_custom_options;
  64. };
  65. /**
  66. * Handle to a message queue.
  67. */
  68. struct GNUNET_MQ_Handle
  69. {
  70. /**
  71. * Handlers array, or NULL if the queue should not receive messages
  72. */
  73. struct GNUNET_MQ_MessageHandler *handlers;
  74. /**
  75. * Actual implementation of message sending,
  76. * called when a message is added
  77. */
  78. GNUNET_MQ_SendImpl send_impl;
  79. /**
  80. * Implementation-dependent queue destruction function
  81. */
  82. GNUNET_MQ_DestroyImpl destroy_impl;
  83. /**
  84. * Implementation-dependent send cancel function
  85. */
  86. GNUNET_MQ_CancelImpl cancel_impl;
  87. /**
  88. * Implementation-specific state
  89. */
  90. void *impl_state;
  91. /**
  92. * Callback will be called when an error occurs.
  93. */
  94. GNUNET_MQ_ErrorHandler error_handler;
  95. /**
  96. * Closure for the error handler.
  97. */
  98. void *error_handler_cls;
  99. /**
  100. * Task to asynchronously run #impl_send_continue().
  101. */
  102. struct GNUNET_SCHEDULER_Task *send_task;
  103. /**
  104. * Linked list of messages pending to be sent
  105. */
  106. struct GNUNET_MQ_Envelope *envelope_head;
  107. /**
  108. * Linked list of messages pending to be sent
  109. */
  110. struct GNUNET_MQ_Envelope *envelope_tail;
  111. /**
  112. * Message that is currently scheduled to be
  113. * sent. Not the head of the message queue, as the implementation
  114. * needs to know if sending has been already scheduled or not.
  115. */
  116. struct GNUNET_MQ_Envelope *current_envelope;
  117. /**
  118. * Map of associations, lazily allocated
  119. */
  120. struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
  121. /**
  122. * Functions to call on queue destruction; kept in a DLL.
  123. */
  124. struct GNUNET_MQ_DestroyNotificationHandle *dnh_head;
  125. /**
  126. * Functions to call on queue destruction; kept in a DLL.
  127. */
  128. struct GNUNET_MQ_DestroyNotificationHandle *dnh_tail;
  129. /**
  130. * Flags that were set for this queue by
  131. * #GNUNET_MQ_set_options(). Default is 0.
  132. */
  133. enum GNUNET_MQ_PriorityPreferences priority;
  134. /**
  135. * Next id that should be used for the @e assoc_map,
  136. * initialized lazily to a random value together with
  137. * @e assoc_map
  138. */
  139. uint32_t assoc_id;
  140. /**
  141. * Number of entries we have in the envelope-DLL.
  142. */
  143. unsigned int queue_length;
  144. /**
  145. * #GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
  146. * FIXME: is this dead?
  147. */
  148. int evacuate_called;
  149. /**
  150. * #GNUNET_YES if GNUNET_MQ_impl_send_in_flight() was called.
  151. */
  152. int in_flight;
  153. };
  154. /**
  155. * Call the message message handler that was registered
  156. * for the type of the given message in the given message queue.
  157. *
  158. * This function is indended to be used for the implementation
  159. * of message queues.
  160. *
  161. * @param mq message queue with the handlers
  162. * @param mh message to dispatch
  163. */
  164. void
  165. GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
  166. const struct GNUNET_MessageHeader *mh)
  167. {
  168. int ret;
  169. ret = GNUNET_MQ_handle_message (mq->handlers, mh);
  170. if (GNUNET_SYSERR == ret)
  171. {
  172. GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_MALFORMED);
  173. return;
  174. }
  175. }
  176. /**
  177. * Call the message message handler that was registered
  178. * for the type of the given message in the given @a handlers list.
  179. *
  180. * This function is indended to be used for the implementation
  181. * of message queues.
  182. *
  183. * @param handlers a set of handlers
  184. * @param mh message to dispatch
  185. * @return #GNUNET_OK on success, #GNUNET_NO if no handler matched,
  186. * #GNUNET_SYSERR if message was rejected by check function
  187. */
  188. int
  189. GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers,
  190. const struct GNUNET_MessageHeader *mh)
  191. {
  192. const struct GNUNET_MQ_MessageHandler *handler;
  193. int handled = GNUNET_NO;
  194. uint16_t msize = ntohs (mh->size);
  195. uint16_t mtype = ntohs (mh->type);
  196. LOG (GNUNET_ERROR_TYPE_DEBUG,
  197. "Received message of type %u and size %u\n",
  198. mtype,
  199. msize);
  200. if (NULL == handlers)
  201. goto done;
  202. for (handler = handlers; NULL != handler->cb; handler++)
  203. {
  204. if (handler->type == mtype)
  205. {
  206. handled = GNUNET_YES;
  207. if ((handler->expected_size > msize) ||
  208. ((handler->expected_size != msize) && (NULL == handler->mv)))
  209. {
  210. /* Too small, or not an exact size and
  211. no 'mv' handler to check rest */
  212. LOG (GNUNET_ERROR_TYPE_ERROR,
  213. "Received malformed message of type %u\n",
  214. (unsigned int) handler->type);
  215. return GNUNET_SYSERR;
  216. }
  217. if ((NULL == handler->mv) ||
  218. (GNUNET_OK == handler->mv (handler->cls, mh)))
  219. {
  220. /* message well-formed, pass to handler */
  221. handler->cb (handler->cls, mh);
  222. }
  223. else
  224. {
  225. /* Message rejected by check routine */
  226. LOG (GNUNET_ERROR_TYPE_ERROR,
  227. "Received malformed message of type %u\n",
  228. (unsigned int) handler->type);
  229. return GNUNET_SYSERR;
  230. }
  231. break;
  232. }
  233. }
  234. done:
  235. if (GNUNET_NO == handled)
  236. {
  237. LOG (GNUNET_ERROR_TYPE_INFO,
  238. "No handler for message of type %u and size %u\n",
  239. mtype,
  240. msize);
  241. return GNUNET_NO;
  242. }
  243. return GNUNET_OK;
  244. }
  245. /**
  246. * Call the error handler of a message queue with the given
  247. * error code. If there is no error handler, log a warning.
  248. *
  249. * This function is intended to be used by the implementation
  250. * of message queues.
  251. *
  252. * @param mq message queue
  253. * @param error the error type
  254. */
  255. void
  256. GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq, enum GNUNET_MQ_Error error)
  257. {
  258. if (NULL == mq->error_handler)
  259. {
  260. LOG (GNUNET_ERROR_TYPE_WARNING,
  261. "Got error %d, but no handler installed\n",
  262. (int) error);
  263. return;
  264. }
  265. mq->error_handler (mq->error_handler_cls, error);
  266. }
  267. /**
  268. * Discard the message queue message, free all
  269. * allocated resources. Must be called in the event
  270. * that a message is created but should not actually be sent.
  271. *
  272. * @param mqm the message to discard
  273. */
  274. void
  275. GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *ev)
  276. {
  277. GNUNET_assert (NULL == ev->parent_queue);
  278. GNUNET_free (ev);
  279. }
  280. /**
  281. * Obtain the current length of the message queue.
  282. *
  283. * @param mq queue to inspect
  284. * @return number of queued, non-transmitted messages
  285. */
  286. unsigned int
  287. GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
  288. {
  289. if (GNUNET_YES != mq->in_flight)
  290. {
  291. return mq->queue_length;
  292. }
  293. return mq->queue_length - 1;
  294. }
  295. /**
  296. * Send a message with the given message queue.
  297. * May only be called once per message.
  298. *
  299. * @param mq message queue
  300. * @param ev the envelope with the message to send.
  301. */
  302. void
  303. GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
  304. {
  305. GNUNET_assert (NULL != mq);
  306. GNUNET_assert (NULL == ev->parent_queue);
  307. mq->queue_length++;
  308. if (mq->queue_length >= 10000)
  309. {
  310. /* This would seem like a bug... */
  311. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  312. "MQ with %u entries extended by message of type %u (FC broken?)\n",
  313. (unsigned int) mq->queue_length,
  314. (unsigned int) ntohs (ev->mh->type));
  315. }
  316. ev->parent_queue = mq;
  317. /* is the implementation busy? queue it! */
  318. if ((NULL != mq->current_envelope) || (NULL != mq->send_task))
  319. {
  320. GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, mq->envelope_tail, ev);
  321. return;
  322. }
  323. GNUNET_assert (NULL == mq->envelope_head);
  324. mq->current_envelope = ev;
  325. LOG (GNUNET_ERROR_TYPE_DEBUG,
  326. "sending message of type %u, queue empty (MQ: %p)\n",
  327. ntohs (ev->mh->type),
  328. mq);
  329. mq->send_impl (mq, ev->mh, mq->impl_state);
  330. }
  331. /**
  332. * Remove the first envelope that has not yet been sent from the message
  333. * queue and return it.
  334. *
  335. * @param mq queue to remove envelope from
  336. * @return NULL if queue is empty (or has no envelope that is not under transmission)
  337. */
  338. struct GNUNET_MQ_Envelope *
  339. GNUNET_MQ_unsent_head (struct GNUNET_MQ_Handle *mq)
  340. {
  341. struct GNUNET_MQ_Envelope *env;
  342. env = mq->envelope_head;
  343. GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, env);
  344. mq->queue_length--;
  345. env->parent_queue = NULL;
  346. return env;
  347. }
  348. /**
  349. * Function to copy an envelope. The envelope must not yet
  350. * be in any queue or have any options or callbacks set.
  351. *
  352. * @param env envelope to copy
  353. * @return copy of @a env
  354. */
  355. struct GNUNET_MQ_Envelope *
  356. GNUNET_MQ_env_copy (struct GNUNET_MQ_Envelope *env)
  357. {
  358. GNUNET_assert (NULL == env->next);
  359. GNUNET_assert (NULL == env->parent_queue);
  360. GNUNET_assert (NULL == env->sent_cb);
  361. GNUNET_assert (GNUNET_NO == env->have_custom_options);
  362. return GNUNET_MQ_msg_copy (env->mh);
  363. }
  364. /**
  365. * Send a copy of a message with the given message queue.
  366. * Can be called repeatedly on the same envelope.
  367. *
  368. * @param mq message queue
  369. * @param ev the envelope with the message to send.
  370. */
  371. void
  372. GNUNET_MQ_send_copy (struct GNUNET_MQ_Handle *mq,
  373. const struct GNUNET_MQ_Envelope *ev)
  374. {
  375. struct GNUNET_MQ_Envelope *env;
  376. uint16_t msize;
  377. msize = ntohs (ev->mh->size);
  378. env = GNUNET_malloc (sizeof(struct GNUNET_MQ_Envelope) + msize);
  379. env->mh = (struct GNUNET_MessageHeader *) &env[1];
  380. env->sent_cb = ev->sent_cb;
  381. env->sent_cls = ev->sent_cls;
  382. GNUNET_memcpy (&env[1], ev->mh, msize);
  383. GNUNET_MQ_send (mq, env);
  384. }
  385. /**
  386. * Task run to call the send implementation for the next queued
  387. * message, if any. Only useful for implementing message queues,
  388. * results in undefined behavior if not used carefully.
  389. *
  390. * @param cls message queue to send the next message with
  391. */
  392. static void
  393. impl_send_continue (void *cls)
  394. {
  395. struct GNUNET_MQ_Handle *mq = cls;
  396. mq->send_task = NULL;
  397. /* call is only valid if we're actually currently sending
  398. * a message */
  399. if (NULL == mq->envelope_head)
  400. return;
  401. mq->current_envelope = mq->envelope_head;
  402. GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
  403. mq->envelope_tail,
  404. mq->current_envelope);
  405. LOG (GNUNET_ERROR_TYPE_DEBUG,
  406. "sending message of type %u from queue\n",
  407. ntohs (mq->current_envelope->mh->type));
  408. mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
  409. }
  410. /**
  411. * Call the send implementation for the next queued message, if any.
  412. * Only useful for implementing message queues, results in undefined
  413. * behavior if not used carefully.
  414. *
  415. * @param mq message queue to send the next message with
  416. */
  417. void
  418. GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
  419. {
  420. struct GNUNET_MQ_Envelope *current_envelope;
  421. GNUNET_SCHEDULER_TaskCallback cb;
  422. GNUNET_assert (0 < mq->queue_length);
  423. mq->queue_length--;
  424. mq->in_flight = GNUNET_NO;
  425. current_envelope = mq->current_envelope;
  426. current_envelope->parent_queue = NULL;
  427. mq->current_envelope = NULL;
  428. GNUNET_assert (NULL == mq->send_task);
  429. mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue, mq);
  430. if (NULL != (cb = current_envelope->sent_cb))
  431. {
  432. current_envelope->sent_cb = NULL;
  433. cb (current_envelope->sent_cls);
  434. }
  435. GNUNET_free (current_envelope);
  436. }
  437. /**
  438. * Call the send notification for the current message, but do not
  439. * try to send the next message until #GNUNET_MQ_impl_send_continue
  440. * is called.
  441. *
  442. * Only useful for implementing message queues, results in undefined
  443. * behavior if not used carefully.
  444. *
  445. * @param mq message queue to send the next message with
  446. */
  447. void
  448. GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq)
  449. {
  450. struct GNUNET_MQ_Envelope *current_envelope;
  451. GNUNET_SCHEDULER_TaskCallback cb;
  452. mq->in_flight = GNUNET_YES;
  453. /* call is only valid if we're actually currently sending
  454. * a message */
  455. current_envelope = mq->current_envelope;
  456. GNUNET_assert (NULL != current_envelope);
  457. /* can't call cancel from now on anymore */
  458. current_envelope->parent_queue = NULL;
  459. if (NULL != (cb = current_envelope->sent_cb))
  460. {
  461. current_envelope->sent_cb = NULL;
  462. cb (current_envelope->sent_cls);
  463. }
  464. }
  465. /**
  466. * Create a message queue for the specified handlers.
  467. *
  468. * @param send function the implements sending messages
  469. * @param destroy function that implements destroying the queue
  470. * @param cancel function that implements canceling a message
  471. * @param impl_state for the queue, passed to 'send' and 'destroy'
  472. * @param handlers array of message handlers
  473. * @param error_handler handler for read and write errors
  474. * @param error_handler_cls closure for @a error_handler
  475. * @return a new message queue
  476. */
  477. struct GNUNET_MQ_Handle *
  478. GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
  479. GNUNET_MQ_DestroyImpl destroy,
  480. GNUNET_MQ_CancelImpl cancel,
  481. void *impl_state,
  482. const struct GNUNET_MQ_MessageHandler *handlers,
  483. GNUNET_MQ_ErrorHandler error_handler,
  484. void *error_handler_cls)
  485. {
  486. struct GNUNET_MQ_Handle *mq;
  487. mq = GNUNET_new (struct GNUNET_MQ_Handle);
  488. mq->send_impl = send;
  489. mq->destroy_impl = destroy;
  490. mq->cancel_impl = cancel;
  491. mq->handlers = GNUNET_MQ_copy_handlers (handlers);
  492. mq->error_handler = error_handler;
  493. mq->error_handler_cls = error_handler_cls;
  494. mq->impl_state = impl_state;
  495. return mq;
  496. }
  497. /**
  498. * Change the closure argument in all of the `handlers` of the
  499. * @a mq.
  500. *
  501. * @param mq to modify
  502. * @param handlers_cls new closure to use
  503. */
  504. void
  505. GNUNET_MQ_set_handlers_closure (struct GNUNET_MQ_Handle *mq, void *handlers_cls)
  506. {
  507. if (NULL == mq->handlers)
  508. return;
  509. for (unsigned int i = 0; NULL != mq->handlers[i].cb; i++)
  510. mq->handlers[i].cls = handlers_cls;
  511. }
  512. /**
  513. * Get the message that should currently be sent.
  514. * Fails if there is no current message.
  515. * Only useful for implementing message queues,
  516. * results in undefined behavior if not used carefully.
  517. *
  518. * @param mq message queue with the current message
  519. * @return message to send, never NULL
  520. */
  521. const struct GNUNET_MessageHeader *
  522. GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq)
  523. {
  524. GNUNET_assert (NULL != mq->current_envelope);
  525. GNUNET_assert (NULL != mq->current_envelope->mh);
  526. return mq->current_envelope->mh;
  527. }
  528. /**
  529. * Get the implementation state associated with the
  530. * message queue.
  531. *
  532. * While the GNUNET_MQ_Impl* callbacks receive the
  533. * implementation state, continuations that are scheduled
  534. * by the implementation function often only have one closure
  535. * argument, with this function it is possible to get at the
  536. * implementation state when only passing the GNUNET_MQ_Handle
  537. * as closure.
  538. *
  539. * @param mq message queue with the current message
  540. * @return message to send, never NULL
  541. */
  542. void *
  543. GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq)
  544. {
  545. return mq->impl_state;
  546. }
  547. struct GNUNET_MQ_Envelope *
  548. GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
  549. {
  550. struct GNUNET_MQ_Envelope *ev;
  551. ev = GNUNET_malloc (size + sizeof(struct GNUNET_MQ_Envelope));
  552. ev->mh = (struct GNUNET_MessageHeader *) &ev[1];
  553. ev->mh->size = htons (size);
  554. ev->mh->type = htons (type);
  555. if (NULL != mhp)
  556. *mhp = ev->mh;
  557. return ev;
  558. }
  559. /**
  560. * Create a new envelope by copying an existing message.
  561. *
  562. * @param hdr header of the message to copy
  563. * @return envelope containing @a hdr
  564. */
  565. struct GNUNET_MQ_Envelope *
  566. GNUNET_MQ_msg_copy (const struct GNUNET_MessageHeader *hdr)
  567. {
  568. struct GNUNET_MQ_Envelope *mqm;
  569. uint16_t size = ntohs (hdr->size);
  570. mqm = GNUNET_malloc (sizeof(*mqm) + size);
  571. mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
  572. GNUNET_memcpy (mqm->mh, hdr, size);
  573. return mqm;
  574. }
  575. /**
  576. * Implementation of the #GNUNET_MQ_msg_nested_mh macro.
  577. *
  578. * @param mhp pointer to the message header pointer that will be changed to allocate at
  579. * the newly allocated space for the message.
  580. * @param base_size size of the data before the nested message
  581. * @param type type of the message in the envelope
  582. * @param nested_mh the message to append to the message after base_size
  583. */
  584. struct GNUNET_MQ_Envelope *
  585. GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp,
  586. uint16_t base_size,
  587. uint16_t type,
  588. const struct GNUNET_MessageHeader *nested_mh)
  589. {
  590. struct GNUNET_MQ_Envelope *mqm;
  591. uint16_t size;
  592. if (NULL == nested_mh)
  593. return GNUNET_MQ_msg_ (mhp, base_size, type);
  594. size = base_size + ntohs (nested_mh->size);
  595. /* check for uint16_t overflow */
  596. if (size < base_size)
  597. return NULL;
  598. mqm = GNUNET_MQ_msg_ (mhp, size, type);
  599. GNUNET_memcpy ((char *) mqm->mh + base_size,
  600. nested_mh,
  601. ntohs (nested_mh->size));
  602. return mqm;
  603. }
  604. /**
  605. * Associate the assoc_data in mq with a unique request id.
  606. *
  607. * @param mq message queue, id will be unique for the queue
  608. * @param assoc_data to associate
  609. */
  610. uint32_t
  611. GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, void *assoc_data)
  612. {
  613. uint32_t id;
  614. if (NULL == mq->assoc_map)
  615. {
  616. mq->assoc_map = GNUNET_CONTAINER_multihashmap32_create (8);
  617. mq->assoc_id = 1;
  618. }
  619. id = mq->assoc_id++;
  620. GNUNET_assert (GNUNET_OK ==
  621. GNUNET_CONTAINER_multihashmap32_put (
  622. mq->assoc_map,
  623. id,
  624. assoc_data,
  625. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
  626. return id;
  627. }
  628. /**
  629. * Get the data associated with a @a request_id in a queue
  630. *
  631. * @param mq the message queue with the association
  632. * @param request_id the request id we are interested in
  633. * @return the associated data
  634. */
  635. void *
  636. GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
  637. {
  638. if (NULL == mq->assoc_map)
  639. return NULL;
  640. return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
  641. }
  642. /**
  643. * Remove the association for a @a request_id
  644. *
  645. * @param mq the message queue with the association
  646. * @param request_id the request id we want to remove
  647. * @return the associated data
  648. */
  649. void *
  650. GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
  651. {
  652. void *val;
  653. if (NULL == mq->assoc_map)
  654. return NULL;
  655. val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
  656. GNUNET_CONTAINER_multihashmap32_remove_all (mq->assoc_map, request_id);
  657. return val;
  658. }
  659. /**
  660. * Call a callback once the envelope has been sent, that is,
  661. * sending it can not be canceled anymore.
  662. * There can be only one notify sent callback per envelope.
  663. *
  664. * @param ev message to call the notify callback for
  665. * @param cb the notify callback
  666. * @param cb_cls closure for the callback
  667. */
  668. void
  669. GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev,
  670. GNUNET_SCHEDULER_TaskCallback cb,
  671. void *cb_cls)
  672. {
  673. /* allow setting *OR* clearing callback */
  674. GNUNET_assert ((NULL == ev->sent_cb) || (NULL == cb));
  675. ev->sent_cb = cb;
  676. ev->sent_cls = cb_cls;
  677. }
  678. /**
  679. * Handle we return for callbacks registered to be
  680. * notified when #GNUNET_MQ_destroy() is called on a queue.
  681. */
  682. struct GNUNET_MQ_DestroyNotificationHandle
  683. {
  684. /**
  685. * Kept in a DLL.
  686. */
  687. struct GNUNET_MQ_DestroyNotificationHandle *prev;
  688. /**
  689. * Kept in a DLL.
  690. */
  691. struct GNUNET_MQ_DestroyNotificationHandle *next;
  692. /**
  693. * Queue to notify about.
  694. */
  695. struct GNUNET_MQ_Handle *mq;
  696. /**
  697. * Function to call.
  698. */
  699. GNUNET_SCHEDULER_TaskCallback cb;
  700. /**
  701. * Closure for @e cb.
  702. */
  703. void *cb_cls;
  704. };
  705. /**
  706. * Destroy the message queue.
  707. *
  708. * @param mq message queue to destroy
  709. */
  710. void
  711. GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
  712. {
  713. struct GNUNET_MQ_DestroyNotificationHandle *dnh;
  714. if (NULL != mq->destroy_impl)
  715. {
  716. mq->destroy_impl (mq, mq->impl_state);
  717. }
  718. if (NULL != mq->send_task)
  719. {
  720. GNUNET_SCHEDULER_cancel (mq->send_task);
  721. mq->send_task = NULL;
  722. }
  723. while (NULL != mq->envelope_head)
  724. {
  725. struct GNUNET_MQ_Envelope *ev;
  726. ev = mq->envelope_head;
  727. ev->parent_queue = NULL;
  728. GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
  729. GNUNET_assert (0 < mq->queue_length);
  730. mq->queue_length--;
  731. LOG (GNUNET_ERROR_TYPE_DEBUG,
  732. "MQ destroy drops message of type %u\n",
  733. ntohs (ev->mh->type));
  734. GNUNET_MQ_discard (ev);
  735. }
  736. if (NULL != mq->current_envelope)
  737. {
  738. /* we can only discard envelopes that
  739. * are not queued! */
  740. mq->current_envelope->parent_queue = NULL;
  741. LOG (GNUNET_ERROR_TYPE_DEBUG,
  742. "MQ destroy drops current message of type %u\n",
  743. ntohs (mq->current_envelope->mh->type));
  744. GNUNET_MQ_discard (mq->current_envelope);
  745. mq->current_envelope = NULL;
  746. GNUNET_assert (0 < mq->queue_length);
  747. mq->queue_length--;
  748. }
  749. GNUNET_assert (0 == mq->queue_length);
  750. while (NULL != (dnh = mq->dnh_head))
  751. {
  752. dnh->cb (dnh->cb_cls);
  753. GNUNET_MQ_destroy_notify_cancel (dnh);
  754. }
  755. if (NULL != mq->assoc_map)
  756. {
  757. GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
  758. mq->assoc_map = NULL;
  759. }
  760. GNUNET_free_non_null (mq->handlers);
  761. GNUNET_free (mq);
  762. }
  763. const struct GNUNET_MessageHeader *
  764. GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh,
  765. uint16_t base_size)
  766. {
  767. uint16_t whole_size;
  768. uint16_t nested_size;
  769. const struct GNUNET_MessageHeader *nested_msg;
  770. whole_size = ntohs (mh->size);
  771. GNUNET_assert (whole_size >= base_size);
  772. nested_size = whole_size - base_size;
  773. if (0 == nested_size)
  774. return NULL;
  775. if (nested_size < sizeof(struct GNUNET_MessageHeader))
  776. {
  777. GNUNET_break_op (0);
  778. return NULL;
  779. }
  780. nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
  781. if (ntohs (nested_msg->size) != nested_size)
  782. {
  783. GNUNET_break_op (0);
  784. return NULL;
  785. }
  786. return nested_msg;
  787. }
  788. /**
  789. * Cancel sending the message. Message must have been sent with
  790. * #GNUNET_MQ_send before. May not be called after the notify sent
  791. * callback has been called
  792. *
  793. * @param ev queued envelope to cancel
  794. */
  795. void
  796. GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev)
  797. {
  798. struct GNUNET_MQ_Handle *mq = ev->parent_queue;
  799. GNUNET_assert (NULL != mq);
  800. GNUNET_assert (NULL != mq->cancel_impl);
  801. mq->evacuate_called = GNUNET_NO;
  802. if (mq->current_envelope == ev)
  803. {
  804. /* complex case, we already started with transmitting
  805. the message using the callbacks. */
  806. GNUNET_assert (GNUNET_NO == mq->in_flight);
  807. GNUNET_assert (0 < mq->queue_length);
  808. mq->queue_length--;
  809. mq->cancel_impl (mq, mq->impl_state);
  810. /* continue sending the next message, if any */
  811. mq->current_envelope = mq->envelope_head;
  812. if (NULL != mq->current_envelope)
  813. {
  814. GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
  815. mq->envelope_tail,
  816. mq->current_envelope);
  817. LOG (GNUNET_ERROR_TYPE_DEBUG,
  818. "sending canceled message of type %u queue\n",
  819. ntohs (ev->mh->type));
  820. mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state);
  821. }
  822. }
  823. else
  824. {
  825. /* simple case, message is still waiting in the queue */
  826. GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
  827. GNUNET_assert (0 < mq->queue_length);
  828. mq->queue_length--;
  829. }
  830. if (GNUNET_YES != mq->evacuate_called)
  831. {
  832. ev->parent_queue = NULL;
  833. ev->mh = NULL;
  834. /* also frees ev */
  835. GNUNET_free (ev);
  836. }
  837. }
  838. /**
  839. * Function to obtain the current envelope
  840. * from within #GNUNET_MQ_SendImpl implementations.
  841. *
  842. * @param mq message queue to interrogate
  843. * @return the current envelope
  844. */
  845. struct GNUNET_MQ_Envelope *
  846. GNUNET_MQ_get_current_envelope (struct GNUNET_MQ_Handle *mq)
  847. {
  848. return mq->current_envelope;
  849. }
  850. /**
  851. * Function to obtain the last envelope in the queue.
  852. *
  853. * @param mq message queue to interrogate
  854. * @return the last envelope in the queue
  855. */
  856. struct GNUNET_MQ_Envelope *
  857. GNUNET_MQ_get_last_envelope (struct GNUNET_MQ_Handle *mq)
  858. {
  859. if (NULL != mq->envelope_tail)
  860. return mq->envelope_tail;
  861. return mq->current_envelope;
  862. }
  863. /**
  864. * Set application-specific preferences for this envelope.
  865. * Overrides the options set for the queue with
  866. * #GNUNET_MQ_set_options() for this message only.
  867. *
  868. * @param env message to set options for
  869. * @param pp priorities and preferences to apply
  870. */
  871. void
  872. GNUNET_MQ_env_set_options (struct GNUNET_MQ_Envelope *env,
  873. enum GNUNET_MQ_PriorityPreferences pp)
  874. {
  875. env->priority = pp;
  876. env->have_custom_options = GNUNET_YES;
  877. }
  878. /**
  879. * Get application-specific options for this envelope.
  880. *
  881. * @param env message to set options for
  882. * @return priorities and preferences to apply for @a env
  883. */
  884. enum GNUNET_MQ_PriorityPreferences
  885. GNUNET_MQ_env_get_options (struct GNUNET_MQ_Envelope *env)
  886. {
  887. struct GNUNET_MQ_Handle *mq = env->parent_queue;
  888. if (GNUNET_YES == env->have_custom_options)
  889. return env->priority;
  890. if (NULL == mq)
  891. return 0;
  892. return mq->priority;
  893. }
  894. /**
  895. * Combine performance preferences set for different
  896. * envelopes that are being combined into one larger envelope.
  897. *
  898. * @param p1 one set of preferences
  899. * @param p2 second set of preferences
  900. * @return combined priority and preferences to use
  901. */
  902. enum GNUNET_MQ_PriorityPreferences
  903. GNUNET_MQ_env_combine_options (enum GNUNET_MQ_PriorityPreferences p1,
  904. enum GNUNET_MQ_PriorityPreferences p2)
  905. {
  906. enum GNUNET_MQ_PriorityPreferences ret;
  907. ret = GNUNET_MAX (p1 & GNUNET_MQ_PRIORITY_MASK, p2 & GNUNET_MQ_PRIORITY_MASK);
  908. ret |= ((p1 & GNUNET_MQ_PREF_UNRELIABLE) & (p2 & GNUNET_MQ_PREF_UNRELIABLE));
  909. ret |=
  910. ((p1 & GNUNET_MQ_PREF_LOW_LATENCY) | (p2 & GNUNET_MQ_PREF_LOW_LATENCY));
  911. ret |=
  912. ((p1 & GNUNET_MQ_PREF_CORK_ALLOWED) & (p2 & GNUNET_MQ_PREF_CORK_ALLOWED));
  913. ret |= ((p1 & GNUNET_MQ_PREF_GOODPUT) & (p2 & GNUNET_MQ_PREF_GOODPUT));
  914. ret |=
  915. ((p1 & GNUNET_MQ_PREF_OUT_OF_ORDER) & (p2 & GNUNET_MQ_PREF_OUT_OF_ORDER));
  916. return ret;
  917. }
  918. /**
  919. * Set application-specific default options for this queue.
  920. *
  921. * @param mq message queue to set options for
  922. * @param pp priorities and preferences to apply
  923. */
  924. void
  925. GNUNET_MQ_set_options (struct GNUNET_MQ_Handle *mq,
  926. enum GNUNET_MQ_PriorityPreferences pp)
  927. {
  928. mq->priority = pp;
  929. }
  930. /**
  931. * Obtain message contained in envelope.
  932. *
  933. * @param env the envelope
  934. * @return message contained in the envelope
  935. */
  936. const struct GNUNET_MessageHeader *
  937. GNUNET_MQ_env_get_msg (const struct GNUNET_MQ_Envelope *env)
  938. {
  939. return env->mh;
  940. }
  941. /**
  942. * Return next envelope in queue.
  943. *
  944. * @param env a queued envelope
  945. * @return next one, or NULL
  946. */
  947. const struct GNUNET_MQ_Envelope *
  948. GNUNET_MQ_env_next (const struct GNUNET_MQ_Envelope *env)
  949. {
  950. return env->next;
  951. }
  952. /**
  953. * Register function to be called whenever @a mq is being
  954. * destroyed.
  955. *
  956. * @param mq message queue to watch
  957. * @param cb function to call on @a mq destruction
  958. * @param cb_cls closure for @a cb
  959. * @return handle for #GNUNET_MQ_destroy_notify_cancel().
  960. */
  961. struct GNUNET_MQ_DestroyNotificationHandle *
  962. GNUNET_MQ_destroy_notify (struct GNUNET_MQ_Handle *mq,
  963. GNUNET_SCHEDULER_TaskCallback cb,
  964. void *cb_cls)
  965. {
  966. struct GNUNET_MQ_DestroyNotificationHandle *dnh;
  967. dnh = GNUNET_new (struct GNUNET_MQ_DestroyNotificationHandle);
  968. dnh->mq = mq;
  969. dnh->cb = cb;
  970. dnh->cb_cls = cb_cls;
  971. GNUNET_CONTAINER_DLL_insert (mq->dnh_head, mq->dnh_tail, dnh);
  972. return dnh;
  973. }
  974. /**
  975. * Cancel registration from #GNUNET_MQ_destroy_notify().
  976. *
  977. * @param dnh handle for registration to cancel
  978. */
  979. void
  980. GNUNET_MQ_destroy_notify_cancel (
  981. struct GNUNET_MQ_DestroyNotificationHandle *dnh)
  982. {
  983. struct GNUNET_MQ_Handle *mq = dnh->mq;
  984. GNUNET_CONTAINER_DLL_remove (mq->dnh_head, mq->dnh_tail, dnh);
  985. GNUNET_free (dnh);
  986. }
  987. /**
  988. * Insert @a env into the envelope DLL starting at @a env_head
  989. * Note that @a env must not be in any MQ while this function
  990. * is used with DLLs defined outside of the MQ module. This
  991. * is just in case some application needs to also manage a
  992. * FIFO of envelopes independent of MQ itself and wants to
  993. * re-use the pointers internal to @a env. Use with caution.
  994. *
  995. * @param[in|out] env_head of envelope DLL
  996. * @param[in|out] env_tail tail of envelope DLL
  997. * @param[in|out] env element to insert at the tail
  998. */
  999. void
  1000. GNUNET_MQ_dll_insert_head (struct GNUNET_MQ_Envelope **env_head,
  1001. struct GNUNET_MQ_Envelope **env_tail,
  1002. struct GNUNET_MQ_Envelope *env)
  1003. {
  1004. GNUNET_CONTAINER_DLL_insert (*env_head, *env_tail, env);
  1005. }
  1006. /**
  1007. * Insert @a env into the envelope DLL starting at @a env_head
  1008. * Note that @a env must not be in any MQ while this function
  1009. * is used with DLLs defined outside of the MQ module. This
  1010. * is just in case some application needs to also manage a
  1011. * FIFO of envelopes independent of MQ itself and wants to
  1012. * re-use the pointers internal to @a env. Use with caution.
  1013. *
  1014. * @param[in|out] env_head of envelope DLL
  1015. * @param[in|out] env_tail tail of envelope DLL
  1016. * @param[in|out] env element to insert at the tail
  1017. */
  1018. void
  1019. GNUNET_MQ_dll_insert_tail (struct GNUNET_MQ_Envelope **env_head,
  1020. struct GNUNET_MQ_Envelope **env_tail,
  1021. struct GNUNET_MQ_Envelope *env)
  1022. {
  1023. GNUNET_CONTAINER_DLL_insert_tail (*env_head, *env_tail, env);
  1024. }
  1025. /**
  1026. * Remove @a env from the envelope DLL starting at @a env_head.
  1027. * Note that @a env must not be in any MQ while this function
  1028. * is used with DLLs defined outside of the MQ module. This
  1029. * is just in case some application needs to also manage a
  1030. * FIFO of envelopes independent of MQ itself and wants to
  1031. * re-use the pointers internal to @a env. Use with caution.
  1032. *
  1033. * @param[in|out] env_head of envelope DLL
  1034. * @param[in|out] env_tail tail of envelope DLL
  1035. * @param[in|out] env element to remove from the DLL
  1036. */
  1037. void
  1038. GNUNET_MQ_dll_remove (struct GNUNET_MQ_Envelope **env_head,
  1039. struct GNUNET_MQ_Envelope **env_tail,
  1040. struct GNUNET_MQ_Envelope *env)
  1041. {
  1042. GNUNET_CONTAINER_DLL_remove (*env_head, *env_tail, env);
  1043. }
  1044. /**
  1045. * Copy an array of handlers.
  1046. *
  1047. * Useful if the array has been delared in local memory and needs to be
  1048. * persisted for future use.
  1049. *
  1050. * @param handlers Array of handlers to be copied. Can be NULL (nothing done).
  1051. * @return A newly allocated array of handlers.
  1052. * Needs to be freed with #GNUNET_free.
  1053. */
  1054. struct GNUNET_MQ_MessageHandler *
  1055. GNUNET_MQ_copy_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
  1056. {
  1057. struct GNUNET_MQ_MessageHandler *copy;
  1058. unsigned int count;
  1059. if (NULL == handlers)
  1060. return NULL;
  1061. count = GNUNET_MQ_count_handlers (handlers);
  1062. copy = GNUNET_new_array (count + 1, struct GNUNET_MQ_MessageHandler);
  1063. GNUNET_memcpy (copy,
  1064. handlers,
  1065. count * sizeof(struct GNUNET_MQ_MessageHandler));
  1066. return copy;
  1067. }
  1068. /**
  1069. * Copy an array of handlers, appending AGPL handler.
  1070. *
  1071. * Useful if the array has been delared in local memory and needs to be
  1072. * persisted for future use.
  1073. *
  1074. * @param handlers Array of handlers to be copied. Can be NULL (nothing done).
  1075. * @param agpl_handler function to call for AGPL handling
  1076. * @param agpl_cls closure for @a agpl_handler
  1077. * @return A newly allocated array of handlers.
  1078. * Needs to be freed with #GNUNET_free.
  1079. */
  1080. struct GNUNET_MQ_MessageHandler *
  1081. GNUNET_MQ_copy_handlers2 (const struct GNUNET_MQ_MessageHandler *handlers,
  1082. GNUNET_MQ_MessageCallback agpl_handler,
  1083. void *agpl_cls)
  1084. {
  1085. struct GNUNET_MQ_MessageHandler *copy;
  1086. unsigned int count;
  1087. if (NULL == handlers)
  1088. return NULL;
  1089. count = GNUNET_MQ_count_handlers (handlers);
  1090. copy = GNUNET_new_array (count + 2, struct GNUNET_MQ_MessageHandler);
  1091. GNUNET_memcpy (copy,
  1092. handlers,
  1093. count * sizeof(struct GNUNET_MQ_MessageHandler));
  1094. copy[count].mv = NULL;
  1095. copy[count].cb = agpl_handler;
  1096. copy[count].cls = agpl_cls;
  1097. copy[count].type = GNUNET_MESSAGE_TYPE_REQUEST_AGPL;
  1098. copy[count].expected_size = sizeof(struct GNUNET_MessageHeader);
  1099. return copy;
  1100. }
  1101. /**
  1102. * Count the handlers in a handler array.
  1103. *
  1104. * @param handlers Array of handlers to be counted.
  1105. * @return The number of handlers in the array.
  1106. */
  1107. unsigned int
  1108. GNUNET_MQ_count_handlers (const struct GNUNET_MQ_MessageHandler *handlers)
  1109. {
  1110. unsigned int i;
  1111. if (NULL == handlers)
  1112. return 0;
  1113. for (i = 0; NULL != handlers[i].cb; i++)
  1114. ;
  1115. return i;
  1116. }
  1117. /**
  1118. * Convert an `enum GNUNET_MQ_PreferenceType` to a string
  1119. *
  1120. * @param type the preference type
  1121. * @return a string or NULL if invalid
  1122. */
  1123. const char *
  1124. GNUNET_MQ_preference_to_string (enum GNUNET_MQ_PreferenceKind type)
  1125. {
  1126. switch (type)
  1127. {
  1128. case GNUNET_MQ_PREFERENCE_NONE:
  1129. return "NONE";
  1130. case GNUNET_MQ_PREFERENCE_BANDWIDTH:
  1131. return "BANDWIDTH";
  1132. case GNUNET_MQ_PREFERENCE_LATENCY:
  1133. return "LATENCY";
  1134. case GNUNET_MQ_PREFERENCE_RELIABILITY:
  1135. return "RELIABILITY";
  1136. }
  1137. ;
  1138. return NULL;
  1139. }
  1140. /* end of mq.c */