12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043 |
- /*
- This file is part of GNUnet.
- Copyright (C) 2009-2014, 2016 GNUnet e.V.
- GNUnet is free software: you can redistribute it and/or modify it
- under the terms of the GNU Affero General Public License as published
- by the Free Software Foundation, either version 3 of the License,
- or (at your option) any later version.
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Affero General Public License for more details.
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
- SPDX-License-Identifier: AGPL3.0-or-later
- */
- /**
- * @file core/gnunet-service-core_sessions.c
- * @brief code for managing of 'encrypted' sessions (key exchange done)
- * @author Christian Grothoff
- */
- #include "platform.h"
- #include "gnunet-service-core.h"
- #include "gnunet-service-core_kx.h"
- #include "gnunet-service-core_typemap.h"
- #include "gnunet-service-core_sessions.h"
- #include "gnunet_constants.h"
- #include "core.h"
- /**
- * How many encrypted messages do we queue at most?
- * Needed to bound memory consumption.
- */
- #define MAX_ENCRYPTED_MESSAGE_QUEUE_SIZE 4
- /**
- * Message ready for encryption. This struct is followed by the
- * actual content of the message.
- */
- struct SessionMessageEntry
- {
- /**
- * We keep messages in a doubly linked list.
- */
- struct SessionMessageEntry *next;
- /**
- * We keep messages in a doubly linked list.
- */
- struct SessionMessageEntry *prev;
- /**
- * How important is this message.
- */
- enum GNUNET_MQ_PriorityPreferences priority;
- /**
- * Flag set to #GNUNET_YES if this is a typemap message.
- */
- int is_typemap;
- /**
- * Flag set to #GNUNET_YES if this is a typemap confirmation message.
- */
- int is_typemap_confirm;
- /**
- * Deadline for transmission, 1s after we received it (if we
- * are not corking), otherwise "now". Note that this message
- * does NOT expire past its deadline.
- */
- struct GNUNET_TIME_Absolute deadline;
- /**
- * How long is the message? (number of bytes following the `struct
- * MessageEntry`, but not including the size of `struct
- * MessageEntry` itself!)
- */
- size_t size;
- };
- /**
- * Data kept per session.
- */
- struct Session
- {
- /**
- * Identity of the other peer.
- */
- const struct GNUNET_PeerIdentity *peer;
- /**
- * Key exchange state for this peer.
- */
- struct GSC_KeyExchangeInfo *kx;
- /**
- * Head of list of requests from clients for transmission to
- * this peer.
- */
- struct GSC_ClientActiveRequest *active_client_request_head;
- /**
- * Tail of list of requests from clients for transmission to
- * this peer.
- */
- struct GSC_ClientActiveRequest *active_client_request_tail;
- /**
- * Head of list of messages ready for encryption.
- */
- struct SessionMessageEntry *sme_head;
- /**
- * Tail of list of messages ready for encryption.
- */
- struct SessionMessageEntry *sme_tail;
- /**
- * Current type map for this peer.
- */
- struct GSC_TypeMap *tmap;
- /**
- * Task to transmit corked messages with a delay.
- */
- struct GNUNET_SCHEDULER_Task *cork_task;
- /**
- * Task to transmit our type map.
- */
- struct GNUNET_SCHEDULER_Task *typemap_task;
- /**
- * Retransmission delay we currently use for the typemap
- * transmissions (if not confirmed).
- */
- struct GNUNET_TIME_Relative typemap_delay;
- /**
- * Is this the first time we're sending the typemap? If so,
- * we want to send it a bit faster the second time. 0 if
- * we are sending for the first time, 1 if not.
- */
- int first_typemap;
- };
- GNUNET_NETWORK_STRUCT_BEGIN
- /**
- * Message sent to confirm that a typemap was received.
- */
- struct TypeMapConfirmationMessage
- {
- /**
- * Header with type #GNUNET_MESSAGE_TYPE_CORE_CONFIRM_TYPE_MAP.
- */
- struct GNUNET_MessageHeader header;
- /**
- * Reserved, always zero.
- */
- uint32_t reserved GNUNET_PACKED;
- /**
- * Hash of the (decompressed) type map that was received.
- */
- struct GNUNET_HashCode tm_hash;
- };
- GNUNET_NETWORK_STRUCT_END
- /**
- * Map of peer identities to `struct Session`.
- */
- static struct GNUNET_CONTAINER_MultiPeerMap *sessions;
- /**
- * Find the session for the given peer.
- *
- * @param peer identity of the peer
- * @return NULL if we are not connected, otherwise the
- * session handle
- */
- static struct Session *
- find_session (const struct GNUNET_PeerIdentity *peer)
- {
- if (NULL == sessions)
- return NULL;
- return GNUNET_CONTAINER_multipeermap_get (sessions, peer);
- }
- /**
- * End the session with the given peer (we are no longer
- * connected).
- *
- * @param pid identity of peer to kill session with
- */
- void
- GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid)
- {
- struct Session *session;
- struct GSC_ClientActiveRequest *car;
- struct SessionMessageEntry *sme;
- session = find_session (pid);
- if (NULL == session)
- return;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Destroying session for peer `%s'\n",
- GNUNET_i2s (session->peer));
- if (NULL != session->cork_task)
- {
- GNUNET_SCHEDULER_cancel (session->cork_task);
- session->cork_task = NULL;
- }
- while (NULL != (car = session->active_client_request_head))
- {
- GNUNET_CONTAINER_DLL_remove (session->active_client_request_head,
- session->active_client_request_tail,
- car);
- GSC_CLIENTS_reject_request (car, GNUNET_NO);
- }
- while (NULL != (sme = session->sme_head))
- {
- GNUNET_CONTAINER_DLL_remove (session->sme_head, session->sme_tail, sme);
- GNUNET_free (sme);
- }
- if (NULL != session->typemap_task)
- {
- GNUNET_SCHEDULER_cancel (session->typemap_task);
- session->typemap_task = NULL;
- }
- GSC_CLIENTS_notify_clients_about_neighbour (session->peer,
- session->tmap,
- NULL);
- GNUNET_assert (
- GNUNET_YES ==
- GNUNET_CONTAINER_multipeermap_remove (sessions, session->peer, session));
- GNUNET_STATISTICS_set (GSC_stats,
- gettext_noop ("# peers connected"),
- GNUNET_CONTAINER_multipeermap_size (sessions),
- GNUNET_NO);
- GSC_TYPEMAP_destroy (session->tmap);
- session->tmap = NULL;
- GNUNET_free (session);
- }
- /**
- * Transmit our current typemap message to the other peer.
- * (Done periodically until the typemap is confirmed).
- *
- * @param cls the `struct Session *`
- */
- static void
- transmit_typemap_task (void *cls)
- {
- struct Session *session = cls;
- struct GNUNET_MessageHeader *hdr;
- struct GNUNET_TIME_Relative delay;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending TYPEMAP to %s\n",
- GNUNET_i2s (session->peer));
- session->typemap_delay = GNUNET_TIME_STD_BACKOFF (session->typemap_delay);
- delay = session->typemap_delay;
- /* randomize a bit to avoid spont. sync */
- delay.rel_value_us +=
- GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000 * 1000);
- session->typemap_task =
- GNUNET_SCHEDULER_add_delayed (delay, &transmit_typemap_task, session);
- GNUNET_STATISTICS_update (GSC_stats,
- gettext_noop ("# type map refreshes sent"),
- 1,
- GNUNET_NO);
- hdr = GSC_TYPEMAP_compute_type_map_message ();
- GSC_KX_encrypt_and_transmit (session->kx, hdr, ntohs (hdr->size));
- GNUNET_free (hdr);
- }
- /**
- * Restart the typemap task for the given session.
- *
- * @param session session to restart typemap transmission for
- */
- static void
- start_typemap_task (struct Session *session)
- {
- if (NULL != session->typemap_task)
- GNUNET_SCHEDULER_cancel (session->typemap_task);
- session->typemap_delay = GNUNET_TIME_UNIT_SECONDS;
- session->typemap_task = GNUNET_SCHEDULER_add_delayed (session->typemap_delay,
- &transmit_typemap_task,
- session);
- }
- /**
- * Create a session, a key exchange was just completed.
- *
- * @param peer peer that is now connected
- * @param kx key exchange that completed
- */
- void
- GSC_SESSIONS_create (const struct GNUNET_PeerIdentity *peer,
- struct GSC_KeyExchangeInfo *kx)
- {
- struct Session *session;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Creating session for peer `%s'\n",
- GNUNET_i2s (peer));
- session = GNUNET_new (struct Session);
- session->tmap = GSC_TYPEMAP_create ();
- session->peer = peer;
- session->kx = kx;
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multipeermap_put (
- sessions,
- session->peer,
- session,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- GNUNET_STATISTICS_set (GSC_stats,
- gettext_noop ("# peers connected"),
- GNUNET_CONTAINER_multipeermap_size (sessions),
- GNUNET_NO);
- GSC_CLIENTS_notify_clients_about_neighbour (peer, NULL, session->tmap);
- start_typemap_task (session);
- }
- /**
- * The other peer has indicated that it 'lost' the session
- * (KX down), reinitialize the session on our end, in particular
- * this means to restart the typemap transmission.
- *
- * @param peer peer that is now connected
- */
- void
- GSC_SESSIONS_reinit (const struct GNUNET_PeerIdentity *peer)
- {
- struct Session *session;
- session = find_session (peer);
- if (NULL == session)
- {
- /* KX/session is new for both sides; thus no need to restart what
- has not yet begun */
- return;
- }
- start_typemap_task (session);
- }
- /**
- * The other peer has confirmed receiving our type map,
- * check if it is current and if so, stop retransmitting it.
- *
- * @param peer peer that confirmed the type map
- * @param msg confirmation message we received
- */
- void
- GSC_SESSIONS_confirm_typemap (const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *msg)
- {
- const struct TypeMapConfirmationMessage *cmsg;
- struct Session *session;
- session = find_session (peer);
- if (NULL == session)
- {
- GNUNET_break (0);
- return;
- }
- if (ntohs (msg->size) != sizeof(struct TypeMapConfirmationMessage))
- {
- GNUNET_break_op (0);
- return;
- }
- cmsg = (const struct TypeMapConfirmationMessage *) msg;
- if (GNUNET_YES != GSC_TYPEMAP_check_hash (&cmsg->tm_hash))
- {
- /* our typemap has changed in the meantime, do not
- accept confirmation */
- GNUNET_STATISTICS_update (GSC_stats,
- gettext_noop (
- "# outdated typemap confirmations received"),
- 1,
- GNUNET_NO);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Got outdated typemap confirmated from peer `%s'\n",
- GNUNET_i2s (session->peer));
- return;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Got typemap confirmation from peer `%s'\n",
- GNUNET_i2s (session->peer));
- if (NULL != session->typemap_task)
- {
- GNUNET_SCHEDULER_cancel (session->typemap_task);
- session->typemap_task = NULL;
- }
- GNUNET_STATISTICS_update (GSC_stats,
- gettext_noop (
- "# valid typemap confirmations received"),
- 1,
- GNUNET_NO);
- }
- /**
- * Notify the given client about the session (client is new).
- *
- * @param cls the `struct GSC_Client`
- * @param key peer identity
- * @param value the `struct Session`
- * @return #GNUNET_OK (continue to iterate)
- */
- static int
- notify_client_about_session (void *cls,
- const struct GNUNET_PeerIdentity *key,
- void *value)
- {
- struct GSC_Client *client = cls;
- struct Session *session = value;
- GSC_CLIENTS_notify_client_about_neighbour (client,
- session->peer,
- NULL, /* old TMAP: none */
- session->tmap);
- return GNUNET_OK;
- }
- /**
- * We have a new client, notify it about all current sessions.
- *
- * @param client the new client
- */
- void
- GSC_SESSIONS_notify_client_about_sessions (struct GSC_Client *client)
- {
- /* notify new client about existing sessions */
- GNUNET_CONTAINER_multipeermap_iterate (sessions,
- ¬ify_client_about_session,
- client);
- }
- /**
- * Try to perform a transmission on the given session. Will solicit
- * additional messages if the 'sme' queue is not full enough.
- *
- * @param session session to transmit messages from
- */
- static void
- try_transmission (struct Session *session);
- /**
- * Queue a request from a client for transmission to a particular peer.
- *
- * @param car request to queue; this handle is then shared between
- * the caller (CLIENTS subsystem) and SESSIONS and must not
- * be released by either until either #GSC_SESSIONS_dequeue(),
- * #GSC_SESSIONS_transmit() or #GSC_CLIENTS_failed()
- * have been invoked on it
- */
- void
- GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car)
- {
- struct Session *session;
- session = find_session (&car->target);
- if (NULL == session)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Dropped client request for transmission (am disconnected)\n");
- GNUNET_break (0); /* should have been rejected earlier */
- GSC_CLIENTS_reject_request (car, GNUNET_NO);
- return;
- }
- if (car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
- {
- GNUNET_break (0);
- GSC_CLIENTS_reject_request (car, GNUNET_YES);
- return;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received client transmission request. queueing\n");
- GNUNET_CONTAINER_DLL_insert_tail (session->active_client_request_head,
- session->active_client_request_tail,
- car);
- try_transmission (session);
- }
- /**
- * Dequeue a request from a client from transmission to a particular peer.
- *
- * @param car request to dequeue; this handle will then be 'owned' by
- * the caller (CLIENTS sysbsystem)
- */
- void
- GSC_SESSIONS_dequeue_request (struct GSC_ClientActiveRequest *car)
- {
- struct Session *session;
- if (0 == memcmp (&car->target,
- &GSC_my_identity,
- sizeof(struct GNUNET_PeerIdentity)))
- return;
- session = find_session (&car->target);
- GNUNET_assert (NULL != session);
- GNUNET_CONTAINER_DLL_remove (session->active_client_request_head,
- session->active_client_request_tail,
- car);
- /* dequeueing of 'high' priority messages may unblock
- transmission for lower-priority messages, so we also
- need to try in this case. */
- try_transmission (session);
- }
- /**
- * Solicit messages for transmission, starting with those of the highest
- * priority.
- *
- * @param session session to solict messages for
- * @param msize how many bytes do we have already
- */
- static void
- solicit_messages (struct Session *session, size_t msize)
- {
- struct GSC_ClientActiveRequest *car;
- struct GSC_ClientActiveRequest *nxt;
- size_t so_size;
- enum GNUNET_MQ_PriorityPreferences pmax;
- so_size = msize;
- pmax = GNUNET_MQ_PRIO_BACKGROUND;
- for (car = session->active_client_request_head; NULL != car; car = car->next)
- {
- if (GNUNET_YES == car->was_solicited)
- continue;
- pmax = GNUNET_MAX (pmax, car->priority & GNUNET_MQ_PRIORITY_MASK);
- }
- nxt = session->active_client_request_head;
- while (NULL != (car = nxt))
- {
- nxt = car->next;
- if (car->priority < pmax)
- continue;
- if (so_size + car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
- break;
- so_size += car->msize;
- if (GNUNET_YES == car->was_solicited)
- continue;
- car->was_solicited = GNUNET_YES;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Soliciting message with priority %u\n",
- car->priority);
- GSC_CLIENTS_solicit_request (car);
- /* The above call may *dequeue* requests and thereby
- clobber 'nxt'. Hence we need to restart from the
- head of the list. */
- nxt = session->active_client_request_head;
- so_size = msize;
- }
- }
- /**
- * Some messages were delayed (corked), but the timeout has now expired.
- * Send them now.
- *
- * @param cls `struct Session` with the messages to transmit now
- */
- static void
- pop_cork_task (void *cls)
- {
- struct Session *session = cls;
- session->cork_task = NULL;
- try_transmission (session);
- }
- /**
- * Try to perform a transmission on the given session. Will solicit
- * additional messages if the 'sme' queue is not full enough or has
- * only low-priority messages.
- *
- * @param session session to transmit messages from
- */
- static void
- try_transmission (struct Session *session)
- {
- struct SessionMessageEntry *pos;
- size_t msize;
- struct GNUNET_TIME_Absolute now;
- struct GNUNET_TIME_Absolute min_deadline;
- enum GNUNET_MQ_PriorityPreferences maxp;
- enum GNUNET_MQ_PriorityPreferences maxpc;
- struct GSC_ClientActiveRequest *car;
- int excess;
- msize = 0;
- min_deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
- /* if the peer has excess bandwidth, background traffic is allowed,
- otherwise not */
- if (MAX_ENCRYPTED_MESSAGE_QUEUE_SIZE <=
- GSC_NEIGHBOURS_get_queue_length (session->kx))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmission queue already very long, waiting...\n");
- return; /* queue already too long */
- }
- excess = GSC_NEIGHBOURS_check_excess_bandwidth (session->kx);
- if (GNUNET_YES == excess)
- maxp = GNUNET_MQ_PRIO_BACKGROUND;
- else
- maxp = GNUNET_MQ_PRIO_BEST_EFFORT;
- /* determine highest priority of 'ready' messages we already solicited from clients */
- pos = session->sme_head;
- while ((NULL != pos) &&
- (msize + pos->size <= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE))
- {
- GNUNET_assert (pos->size < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE);
- msize += pos->size;
- maxp = GNUNET_MAX (maxp, pos->priority & GNUNET_MQ_PRIORITY_MASK);
- min_deadline = GNUNET_TIME_absolute_min (min_deadline, pos->deadline);
- pos = pos->next;
- }
- GNUNET_log (
- GNUNET_ERROR_TYPE_DEBUG,
- "Calculating transmission set with %u priority (%s) and %s earliest deadline\n",
- maxp,
- (GNUNET_YES == excess) ? "excess bandwidth" : "limited bandwidth",
- GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (
- min_deadline),
- GNUNET_YES));
- if (maxp < GNUNET_MQ_PRIO_CRITICAL_CONTROL)
- {
- /* if highest already solicited priority from clients is not critical,
- check if there are higher-priority messages to be solicited from clients */
- if (GNUNET_YES == excess)
- maxpc = GNUNET_MQ_PRIO_BACKGROUND;
- else
- maxpc = GNUNET_MQ_PRIO_BEST_EFFORT;
- for (car = session->active_client_request_head; NULL != car;
- car = car->next)
- {
- if (GNUNET_YES == car->was_solicited)
- continue;
- maxpc = GNUNET_MAX (maxpc, car->priority & GNUNET_MQ_PRIORITY_MASK);
- }
- if (maxpc > maxp)
- {
- /* we have messages waiting for solicitation that have a higher
- priority than those that we already accepted; solicit the
- high-priority messages first */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Soliciting messages based on priority (%u > %u)\n",
- maxpc,
- maxp);
- solicit_messages (session, 0);
- return;
- }
- }
- else
- {
- /* never solicit more, we have critical messages to process */
- excess = GNUNET_NO;
- maxpc = GNUNET_MQ_PRIO_BACKGROUND;
- }
- now = GNUNET_TIME_absolute_get ();
- if (((GNUNET_YES == excess) || (maxpc >= GNUNET_MQ_PRIO_BEST_EFFORT)) &&
- ((0 == msize) ||
- ((msize < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE / 2) &&
- (min_deadline.abs_value_us > now.abs_value_us))))
- {
- /* not enough ready yet (tiny message & cork possible), or no messages at all,
- and either excess bandwidth or best-effort or higher message waiting at
- client; in this case, we try to solicit more */
- GNUNET_log (
- GNUNET_ERROR_TYPE_DEBUG,
- "Soliciting messages (excess %d, maxpc %d, message size %u, deadline %s)\n",
- excess,
- maxpc,
- (unsigned int) msize,
- GNUNET_STRINGS_relative_time_to_string (
- GNUNET_TIME_absolute_get_remaining (
- min_deadline),
- GNUNET_YES));
- solicit_messages (session, msize);
- if (msize > 0)
- {
- /* if there is data to send, just not yet, make sure we do transmit
- * it once the deadline is reached */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Corking until %s\n",
- GNUNET_STRINGS_relative_time_to_string (
- GNUNET_TIME_absolute_get_remaining (min_deadline),
- GNUNET_YES));
- if (NULL != session->cork_task)
- GNUNET_SCHEDULER_cancel (session->cork_task);
- session->cork_task =
- GNUNET_SCHEDULER_add_at (min_deadline, &pop_cork_task, session);
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Queue empty, waiting for solicitations\n");
- }
- return;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Building combined plaintext buffer to transmit message!\n");
- /* create plaintext buffer of all messages (that fit), encrypt and
- transmit */
- {
- static unsigned long long total_bytes;
- static unsigned int total_msgs;
- char pbuf[msize]; /* plaintext */
- size_t used;
- used = 0;
- while ((NULL != (pos = session->sme_head)) && (used + pos->size <= msize))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Adding message of type %d (%d/%d) to payload for %s\n",
- ntohs (((const struct GNUNET_MessageHeader *) &pos[1])->type),
- pos->is_typemap,
- pos->is_typemap_confirm,
- GNUNET_i2s (session->peer));
- GNUNET_memcpy (&pbuf[used], &pos[1], pos->size);
- used += pos->size;
- GNUNET_CONTAINER_DLL_remove (session->sme_head, session->sme_tail, pos);
- GNUNET_free (pos);
- }
- /* compute average payload size */
- total_bytes += used;
- total_msgs++;
- if (0 == total_msgs)
- {
- /* 2^32 messages, wrap around... */
- total_msgs = 1;
- total_bytes = used;
- }
- GNUNET_STATISTICS_set (GSC_stats,
- "# avg payload per encrypted message",
- total_bytes / total_msgs,
- GNUNET_NO);
- /* now actually transmit... */
- GSC_KX_encrypt_and_transmit (session->kx, pbuf, used);
- }
- }
- /**
- * Send an updated typemap message to the neighbour now,
- * and restart typemap transmissions.
- *
- * @param cls the message
- * @param key neighbour's identity
- * @param value `struct Neighbour` of the target
- * @return always #GNUNET_OK
- */
- static int
- do_restart_typemap_message (void *cls,
- const struct GNUNET_PeerIdentity *key,
- void *value)
- {
- const struct GNUNET_MessageHeader *hdr = cls;
- struct Session *session = value;
- struct SessionMessageEntry *sme;
- uint16_t size;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Restarting sending TYPEMAP to %s\n",
- GNUNET_i2s (session->peer));
- size = ntohs (hdr->size);
- for (sme = session->sme_head; NULL != sme; sme = sme->next)
- {
- if (GNUNET_YES == sme->is_typemap)
- {
- GNUNET_CONTAINER_DLL_remove (session->sme_head, session->sme_tail, sme);
- GNUNET_free (sme);
- break;
- }
- }
- sme = GNUNET_malloc (sizeof(struct SessionMessageEntry) + size);
- sme->is_typemap = GNUNET_YES;
- GNUNET_memcpy (&sme[1], hdr, size);
- sme->size = size;
- sme->priority = GNUNET_MQ_PRIO_CRITICAL_CONTROL;
- GNUNET_CONTAINER_DLL_insert (session->sme_head, session->sme_tail, sme);
- try_transmission (session);
- start_typemap_task (session);
- return GNUNET_OK;
- }
- /**
- * Broadcast an updated typemap message to all neighbours.
- * Restarts the retransmissions until the typemaps are confirmed.
- *
- * @param msg message to transmit
- */
- void
- GSC_SESSIONS_broadcast_typemap (const struct GNUNET_MessageHeader *msg)
- {
- if (NULL == sessions)
- return;
- GNUNET_CONTAINER_multipeermap_iterate (sessions,
- &do_restart_typemap_message,
- (void *) msg);
- }
- /**
- * Traffic is being solicited for the given peer. This means that the
- * message queue on the transport-level (NEIGHBOURS subsystem) is now
- * empty and it is now OK to transmit another (non-control) message.
- *
- * @param pid identity of peer ready to receive data
- */
- void
- GSC_SESSIONS_solicit (const struct GNUNET_PeerIdentity *pid)
- {
- struct Session *session;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transport solicits for %s\n",
- GNUNET_i2s (pid));
- session = find_session (pid);
- if (NULL == session)
- return;
- try_transmission (session);
- }
- /**
- * Transmit a message to a particular peer.
- *
- * @param car original request that was queued and then solicited;
- * this handle will now be 'owned' by the SESSIONS subsystem
- * @param msg message to transmit
- * @param priority how important is this message
- */
- void
- GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car,
- const struct GNUNET_MessageHeader *msg,
- enum GNUNET_MQ_PriorityPreferences priority)
- {
- struct Session *session;
- struct SessionMessageEntry *sme;
- struct SessionMessageEntry *pos;
- size_t msize;
- session = find_session (&car->target);
- if (NULL == session)
- return;
- msize = ntohs (msg->size);
- sme = GNUNET_malloc (sizeof(struct SessionMessageEntry) + msize);
- GNUNET_memcpy (&sme[1], msg, msize);
- sme->size = msize;
- sme->priority = priority;
- if (0 != (GNUNET_MQ_PREF_CORK_ALLOWED & priority))
- {
- sme->deadline =
- GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_MAX_CORK_DELAY);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Mesage corked, delaying transmission\n");
- }
- pos = session->sme_head;
- while ((NULL != pos) && (pos->priority >= sme->priority))
- pos = pos->next;
- if (NULL == pos)
- GNUNET_CONTAINER_DLL_insert_tail (session->sme_head,
- session->sme_tail,
- sme);
- else
- GNUNET_CONTAINER_DLL_insert_after (session->sme_head,
- session->sme_tail,
- pos->prev,
- sme);
- try_transmission (session);
- }
- /**
- * We have received a typemap message from a peer, update ours.
- * Notifies clients about the session.
- *
- * @param peer peer this is about
- * @param msg typemap update message
- */
- void
- GSC_SESSIONS_set_typemap (const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *msg)
- {
- struct Session *session;
- struct GSC_TypeMap *nmap;
- struct SessionMessageEntry *sme;
- struct TypeMapConfirmationMessage *tmc;
- nmap = GSC_TYPEMAP_get_from_message (msg);
- if (NULL == nmap)
- {
- GNUNET_break_op (0);
- return; /* malformed */
- }
- session = find_session (peer);
- if (NULL == session)
- {
- GSC_TYPEMAP_destroy (nmap);
- GNUNET_break (0);
- return;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received TYPEMAP from %s\n",
- GNUNET_i2s (session->peer));
- for (sme = session->sme_head; NULL != sme; sme = sme->next)
- {
- if (GNUNET_YES == sme->is_typemap_confirm)
- {
- GNUNET_CONTAINER_DLL_remove (session->sme_head, session->sme_tail, sme);
- GNUNET_free (sme);
- break;
- }
- }
- sme = GNUNET_malloc (sizeof(struct SessionMessageEntry)
- + sizeof(struct TypeMapConfirmationMessage));
- sme->deadline = GNUNET_TIME_absolute_get ();
- sme->size = sizeof(struct TypeMapConfirmationMessage);
- sme->priority = GNUNET_MQ_PRIO_CRITICAL_CONTROL;
- sme->is_typemap_confirm = GNUNET_YES;
- tmc = (struct TypeMapConfirmationMessage *) &sme[1];
- tmc->header.size = htons (sizeof(struct TypeMapConfirmationMessage));
- tmc->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIRM_TYPE_MAP);
- tmc->reserved = htonl (0);
- GSC_TYPEMAP_hash (nmap, &tmc->tm_hash);
- GNUNET_CONTAINER_DLL_insert (session->sme_head, session->sme_tail, sme);
- try_transmission (session);
- GSC_CLIENTS_notify_clients_about_neighbour (peer, session->tmap, nmap);
- GSC_TYPEMAP_destroy (session->tmap);
- session->tmap = nmap;
- }
- /**
- * The given peer send a message of the specified type. Make sure the
- * respective bit is set in its type-map and that clients are notified
- * about the session.
- *
- * @param peer peer this is about
- * @param type type of the message
- */
- void
- GSC_SESSIONS_add_to_typemap (const struct GNUNET_PeerIdentity *peer,
- uint16_t type)
- {
- struct Session *session;
- struct GSC_TypeMap *nmap;
- if (0 == memcmp (peer, &GSC_my_identity, sizeof(struct GNUNET_PeerIdentity)))
- return;
- session = find_session (peer);
- GNUNET_assert (NULL != session);
- if (GNUNET_YES == GSC_TYPEMAP_test_match (session->tmap, &type, 1))
- return; /* already in it */
- nmap = GSC_TYPEMAP_extend (session->tmap, &type, 1);
- GSC_CLIENTS_notify_clients_about_neighbour (peer, session->tmap, nmap);
- GSC_TYPEMAP_destroy (session->tmap);
- session->tmap = nmap;
- }
- /**
- * Initialize sessions subsystem.
- */
- void
- GSC_SESSIONS_init ()
- {
- sessions = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES);
- }
- /**
- * Helper function for #GSC_SESSIONS_done() to free all
- * active sessions.
- *
- * @param cls NULL
- * @param key identity of the connected peer
- * @param value the `struct Session` for the peer
- * @return #GNUNET_OK (continue to iterate)
- */
- static int
- free_session_helper (void *cls,
- const struct GNUNET_PeerIdentity *key,
- void *value)
- {
- /* struct Session *session = value; */
- GSC_SESSIONS_end (key);
- return GNUNET_OK;
- }
- /**
- * Shutdown sessions subsystem.
- */
- void
- GSC_SESSIONS_done ()
- {
- if (NULL != sessions)
- {
- GNUNET_CONTAINER_multipeermap_iterate (sessions,
- &free_session_helper,
- NULL);
- GNUNET_CONTAINER_multipeermap_destroy (sessions);
- sessions = NULL;
- }
- }
- /* end of gnunet-service-core_sessions.c */
|