123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525 |
- /*
- This file is part of GNUnet
- Copyright (C) 2009-2013 GNUnet e.V.
- GNUnet is free software: you can redistribute it and/or modify it
- under the terms of the GNU Affero General Public License as published
- by the Free Software Foundation, either version 3 of the License,
- or (at your option) any later version.
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
- SPDX-License-Identifier: AGPL3.0-or-later
- */
- /**
- * @file src/fragmentation/fragmentation.c
- * @brief library to help fragment messages
- * @author Christian Grothoff
- */
- #include "platform.h"
- #include "gnunet_fragmentation_lib.h"
- #include "gnunet_protocols.h"
- #include "fragmentation.h"
- /**
- * Absolute minimum delay we impose between sending and expecting ACK to arrive.
- */
- #define MIN_ACK_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 1)
- /**
- * Fragmentation context.
- */
- struct GNUNET_FRAGMENT_Context
- {
- /**
- * Statistics to use.
- */
- struct GNUNET_STATISTICS_Handle *stats;
- /**
- * Tracker for flow control.
- */
- struct GNUNET_BANDWIDTH_Tracker *tracker;
- /**
- * Current expected delay for ACKs.
- */
- struct GNUNET_TIME_Relative ack_delay;
- /**
- * Current expected delay between messages.
- */
- struct GNUNET_TIME_Relative msg_delay;
- /**
- * Next allowed transmission time.
- */
- struct GNUNET_TIME_Absolute delay_until;
- /**
- * Time we transmitted the last message of the last round.
- */
- struct GNUNET_TIME_Absolute last_round;
- /**
- * Message to fragment (allocated at the end of this struct).
- */
- const struct GNUNET_MessageHeader *msg;
- /**
- * Function to call for transmissions.
- */
- GNUNET_FRAGMENT_MessageProcessor proc;
- /**
- * Closure for @e proc.
- */
- void *proc_cls;
- /**
- * Bitfield, set to 1 for each unacknowledged fragment.
- */
- uint64_t acks;
- /**
- * Bitfield with all possible bits for @e acks (used to mask the
- * ack we get back).
- */
- uint64_t acks_mask;
- /**
- * Task performing work for the fragmenter.
- */
- struct GNUNET_SCHEDULER_Task *task;
- /**
- * Our fragmentation ID. (chosen at random)
- */
- uint32_t fragment_id;
- /**
- * Round-robin selector for the next transmission.
- */
- unsigned int next_transmission;
- /**
- * How many rounds of transmission have we completed so far?
- */
- unsigned int num_rounds;
- /**
- * How many transmission have we completed in this round?
- */
- unsigned int num_transmissions;
- /**
- * #GNUNET_YES if we called @e proc and are now waiting for #GNUNET_FRAGMENT_context_transmission_done()
- */
- int8_t proc_busy;
- /**
- * #GNUNET_YES if we are waiting for an ACK.
- */
- int8_t wack;
- /**
- * Target fragment size.
- */
- uint16_t mtu;
- };
- /**
- * Convert an ACK message to a printable format suitable for logging.
- *
- * @param ack message to print
- * @return ack in human-readable format
- */
- const char *
- GNUNET_FRAGMENT_print_ack (const struct GNUNET_MessageHeader *ack)
- {
- static char buf[128];
- const struct FragmentAcknowledgement *fa;
- if (sizeof (struct FragmentAcknowledgement) !=
- htons (ack->size))
- return "<malformed ack>";
- fa = (const struct FragmentAcknowledgement *) ack;
- GNUNET_snprintf (buf,
- sizeof (buf),
- "%u-%llX",
- ntohl (fa->fragment_id),
- GNUNET_ntohll (fa->bits));
- return buf;
- }
- /**
- * Transmit the next fragment to the other peer.
- *
- * @param cls the `struct GNUNET_FRAGMENT_Context`
- */
- static void
- transmit_next (void *cls)
- {
- struct GNUNET_FRAGMENT_Context *fc = cls;
- char msg[fc->mtu];
- const char *mbuf;
- struct FragmentHeader *fh;
- struct GNUNET_TIME_Relative delay;
- unsigned int bit;
- size_t size;
- size_t fsize;
- int wrap;
- fc->task = NULL;
- GNUNET_assert (GNUNET_NO == fc->proc_busy);
- if (0 == fc->acks)
- return; /* all done */
- /* calculate delay */
- wrap = 0;
- while (0 == (fc->acks & (1LLU << fc->next_transmission)))
- {
- fc->next_transmission = (fc->next_transmission + 1) % 64;
- wrap |= (0 == fc->next_transmission);
- }
- bit = fc->next_transmission;
- size = ntohs (fc->msg->size);
- if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
- fsize =
- (size % (fc->mtu - sizeof (struct FragmentHeader))) +
- sizeof (struct FragmentHeader);
- else
- fsize = fc->mtu;
- if (NULL != fc->tracker)
- delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
- fsize);
- else
- delay = GNUNET_TIME_UNIT_ZERO;
- if (delay.rel_value_us > 0)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Fragmentation logic delays transmission of next fragment by %s\n",
- GNUNET_STRINGS_relative_time_to_string (delay,
- GNUNET_YES));
- fc->task = GNUNET_SCHEDULER_add_delayed (delay,
- &transmit_next,
- fc);
- return;
- }
- fc->next_transmission = (fc->next_transmission + 1) % 64;
- wrap |= (0 == fc->next_transmission);
- while (0 == (fc->acks & (1LLU << fc->next_transmission)))
- {
- fc->next_transmission = (fc->next_transmission + 1) % 64;
- wrap |= (0 == fc->next_transmission);
- }
- /* assemble fragmentation message */
- mbuf = (const char *) &fc[1];
- fh = (struct FragmentHeader *) msg;
- fh->header.size = htons (fsize);
- fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT);
- fh->fragment_id = htonl (fc->fragment_id);
- fh->total_size = fc->msg->size; /* already in big-endian */
- fh->offset = htons ((fc->mtu - sizeof (struct FragmentHeader)) * bit);
- GNUNET_memcpy (&fh[1], &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))],
- fsize - sizeof (struct FragmentHeader));
- if (NULL != fc->tracker)
- GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize);
- GNUNET_STATISTICS_update (fc->stats,
- _("# fragments transmitted"),
- 1,
- GNUNET_NO);
- if (0 != fc->last_round.abs_value_us)
- GNUNET_STATISTICS_update (fc->stats,
- _("# fragments retransmitted"),
- 1,
- GNUNET_NO);
- /* select next message to calculate delay */
- bit = fc->next_transmission;
- size = ntohs (fc->msg->size);
- if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
- fsize = size % (fc->mtu - sizeof (struct FragmentHeader));
- else
- fsize = fc->mtu;
- if (NULL != fc->tracker)
- delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
- fsize);
- else
- delay = GNUNET_TIME_UNIT_ZERO;
- if (fc->num_rounds < 64)
- delay = GNUNET_TIME_relative_max (delay,
- GNUNET_TIME_relative_saturating_multiply
- (fc->msg_delay,
- (1ULL << fc->num_rounds)));
- else
- delay = GNUNET_TIME_UNIT_FOREVER_REL;
- if (wrap)
- {
- /* full round transmitted wait 2x delay for ACK before going again */
- fc->num_rounds++;
- delay = GNUNET_TIME_relative_saturating_multiply (fc->ack_delay, 2);
- /* never use zero, need some time for ACK always */
- delay = GNUNET_TIME_relative_max (MIN_ACK_DELAY, delay);
- fc->wack = GNUNET_YES;
- fc->last_round = GNUNET_TIME_absolute_get ();
- GNUNET_STATISTICS_update (fc->stats,
- _("# fragments wrap arounds"),
- 1,
- GNUNET_NO);
- }
- fc->proc_busy = GNUNET_YES;
- fc->delay_until = GNUNET_TIME_relative_to_absolute (delay);
- fc->num_transmissions++;
- fc->proc (fc->proc_cls,
- &fh->header);
- }
- /**
- * Create a fragmentation context for the given message.
- * Fragments the message into fragments of size @a mtu or
- * less. Calls @a proc on each un-acknowledged fragment,
- * using both the expected @a msg_delay between messages and
- * acknowledgements and the given @a tracker to guide the
- * frequency of calls to @a proc.
- *
- * @param stats statistics context
- * @param mtu the maximum message size for each fragment
- * @param tracker bandwidth tracker to use for flow control (can be NULL)
- * @param msg_delay initial delay to insert between fragment transmissions
- * based on previous messages
- * @param ack_delay expected delay between fragment transmission
- * and ACK based on previous messages
- * @param msg the message to fragment
- * @param proc function to call for each fragment to transmit
- * @param proc_cls closure for @a proc
- * @return the fragmentation context
- */
- struct GNUNET_FRAGMENT_Context *
- GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
- uint16_t mtu,
- struct GNUNET_BANDWIDTH_Tracker *tracker,
- struct GNUNET_TIME_Relative msg_delay,
- struct GNUNET_TIME_Relative ack_delay,
- const struct GNUNET_MessageHeader *msg,
- GNUNET_FRAGMENT_MessageProcessor proc,
- void *proc_cls)
- {
- struct GNUNET_FRAGMENT_Context *fc;
- size_t size;
- uint64_t bits;
- GNUNET_STATISTICS_update (stats,
- _("# messages fragmented"),
- 1,
- GNUNET_NO);
- GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader));
- size = ntohs (msg->size);
- GNUNET_STATISTICS_update (stats,
- _("# total size of fragmented messages"),
- size, GNUNET_NO);
- GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
- fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size);
- fc->stats = stats;
- fc->mtu = mtu;
- fc->tracker = tracker;
- fc->ack_delay = ack_delay;
- fc->msg_delay = msg_delay;
- fc->msg = (const struct GNUNET_MessageHeader *) &fc[1];
- fc->proc = proc;
- fc->proc_cls = proc_cls;
- fc->fragment_id =
- GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
- UINT32_MAX);
- GNUNET_memcpy (&fc[1], msg, size);
- bits =
- (size + mtu - sizeof (struct FragmentHeader) - 1) / (mtu -
- sizeof (struct
- FragmentHeader));
- GNUNET_assert (bits <= 64);
- if (bits == 64)
- fc->acks_mask = UINT64_MAX; /* set all 64 bit */
- else
- fc->acks_mask = (1LLU << bits) - 1; /* set lowest 'bits' bit */
- fc->acks = fc->acks_mask;
- fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
- return fc;
- }
- /**
- * Continuation to call from the 'proc' function after the fragment
- * has been transmitted (and hence the next fragment can now be
- * given to proc).
- *
- * @param fc fragmentation context
- */
- void
- GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc)
- {
- GNUNET_assert (fc->proc_busy == GNUNET_YES);
- fc->proc_busy = GNUNET_NO;
- GNUNET_assert (fc->task == NULL);
- fc->task =
- GNUNET_SCHEDULER_add_at (fc->delay_until,
- &transmit_next,
- fc);
- }
- /**
- * Process an acknowledgement message we got from the other
- * side (to control re-transmits).
- *
- * @param fc fragmentation context
- * @param msg acknowledgement message we received
- * @return #GNUNET_OK if this ack completes the work of the 'fc'
- * (all fragments have been received);
- * #GNUNET_NO if more messages are pending
- * #GNUNET_SYSERR if this ack is not valid for this fc
- */
- int
- GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
- const struct GNUNET_MessageHeader *msg)
- {
- const struct FragmentAcknowledgement *fa;
- uint64_t abits;
- struct GNUNET_TIME_Relative ndelay;
- unsigned int ack_cnt;
- unsigned int snd_cnt;
- unsigned int i;
- if (sizeof (struct FragmentAcknowledgement) != ntohs (msg->size))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- fa = (const struct FragmentAcknowledgement *) msg;
- if (ntohl (fa->fragment_id) != fc->fragment_id)
- return GNUNET_SYSERR; /* not our ACK */
- abits = GNUNET_ntohll (fa->bits);
- if ( (GNUNET_YES == fc->wack) &&
- (0 != fc->num_transmissions) )
- {
- /* normal ACK, can update running average of delay... */
- fc->wack = GNUNET_NO;
- ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round);
- fc->ack_delay.rel_value_us =
- (ndelay.rel_value_us / fc->num_transmissions + 3 * fc->ack_delay.rel_value_us) / 4;
- /* calculate ratio msg sent vs. msg acked */
- ack_cnt = 0;
- snd_cnt = 0;
- for (i=0;i<64;i++)
- {
- if (1 == (fc->acks_mask & (1ULL << i)))
- {
- snd_cnt++;
- if (0 == (abits & (1ULL << i)))
- ack_cnt++;
- }
- }
- if (0 == ack_cnt)
- {
- /* complete loss */
- fc->msg_delay = GNUNET_TIME_relative_saturating_multiply (fc->msg_delay,
- snd_cnt);
- }
- else if (snd_cnt > ack_cnt)
- {
- /* some loss, slow down proportionally */
- fc->msg_delay.rel_value_us = ((fc->msg_delay.rel_value_us * ack_cnt) / snd_cnt);
- }
- else if (snd_cnt == ack_cnt)
- {
- fc->msg_delay.rel_value_us =
- (ndelay.rel_value_us / fc->num_transmissions + 3 * fc->msg_delay.rel_value_us) / 5;
- }
- fc->num_transmissions = 0;
- fc->msg_delay = GNUNET_TIME_relative_min (fc->msg_delay,
- GNUNET_TIME_UNIT_SECONDS);
- fc->ack_delay = GNUNET_TIME_relative_min (fc->ack_delay,
- GNUNET_TIME_UNIT_SECONDS);
- }
- GNUNET_STATISTICS_update (fc->stats,
- _("# fragment acknowledgements received"),
- 1,
- GNUNET_NO);
- if (abits != (fc->acks & abits))
- {
- /* ID collission or message reordering, count! This should be rare! */
- GNUNET_STATISTICS_update (fc->stats,
- _("# bits removed from fragmentation ACKs"), 1,
- GNUNET_NO);
- }
- fc->acks = abits & fc->acks_mask;
- if (0 != fc->acks)
- {
- /* more to transmit, do so right now (if tracker permits...) */
- if (fc->task != NULL)
- {
- /* schedule next transmission now, no point in waiting... */
- GNUNET_SCHEDULER_cancel (fc->task);
- fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, fc);
- }
- else
- {
- /* only case where there is no task should be if we're waiting
- * for the right to transmit again (proc_busy set to YES) */
- GNUNET_assert (GNUNET_YES == fc->proc_busy);
- }
- return GNUNET_NO;
- }
- /* all done */
- GNUNET_STATISTICS_update (fc->stats,
- _("# fragmentation transmissions completed"),
- 1,
- GNUNET_NO);
- if (NULL != fc->task)
- {
- GNUNET_SCHEDULER_cancel (fc->task);
- fc->task = NULL;
- }
- return GNUNET_OK;
- }
- /**
- * Destroy the given fragmentation context (stop calling 'proc', free
- * resources).
- *
- * @param fc fragmentation context
- * @param msg_delay where to store average delay between individual message transmissions the
- * last message (OUT only)
- * @param ack_delay where to store average delay between transmission and ACK for the
- * last message, set to FOREVER if the message was not fully transmitted (OUT only)
- */
- void
- GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc,
- struct GNUNET_TIME_Relative *msg_delay,
- struct GNUNET_TIME_Relative *ack_delay)
- {
- if (fc->task != NULL)
- GNUNET_SCHEDULER_cancel (fc->task);
- if (NULL != ack_delay)
- *ack_delay = fc->ack_delay;
- if (NULL != msg_delay)
- *msg_delay = GNUNET_TIME_relative_saturating_multiply (fc->msg_delay,
- fc->num_rounds);
- GNUNET_free (fc);
- }
- /* end of fragmentation.c */
|