123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590 |
- /*
- This file is part of GNUnet
- Copyright (C) 2009, 2011 GNUnet e.V.
- GNUnet is free software: you can redistribute it and/or modify it
- under the terms of the GNU Affero General Public License as published
- by the Free Software Foundation, either version 3 of the License,
- or (at your option) any later version.
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
- SPDX-License-Identifier: AGPL3.0-or-later
- */
- /**
- * @file src/fragmentation/defragmentation.c
- * @brief library to help defragment messages
- * @author Christian Grothoff
- */
- #include "platform.h"
- #include "gnunet_fragmentation_lib.h"
- #include "fragmentation.h"
- /**
- * Timestamps for fragments.
- */
- struct FragTimes
- {
- /**
- * The time the fragment was received.
- */
- struct GNUNET_TIME_Absolute time;
- /**
- * Number of the bit for the fragment (in [0,..,63]).
- */
- unsigned int bit;
- };
- /**
- * Information we keep for one message that is being assembled. Note
- * that we keep the context around even after the assembly is done to
- * handle 'stray' messages that are received 'late'. A message
- * context is ONLY discarded when the queue gets too big.
- */
- struct MessageContext
- {
- /**
- * This is a DLL.
- */
- struct MessageContext *next;
- /**
- * This is a DLL.
- */
- struct MessageContext *prev;
- /**
- * Associated defragmentation context.
- */
- struct GNUNET_DEFRAGMENT_Context *dc;
- /**
- * Pointer to the assembled message, allocated at the
- * end of this struct.
- */
- const struct GNUNET_MessageHeader *msg;
- /**
- * Last time we received any update for this message
- * (least-recently updated message will be discarded
- * if we hit the queue size).
- */
- struct GNUNET_TIME_Absolute last_update;
- /**
- * Task scheduled for transmitting the next ACK to the
- * other peer.
- */
- struct GNUNET_SCHEDULER_Task * ack_task;
- /**
- * When did we receive which fragment? Used to calculate
- * the time we should send the ACK.
- */
- struct FragTimes frag_times[64];
- /**
- * Which fragments have we gotten yet? bits that are 1
- * indicate missing fragments.
- */
- uint64_t bits;
- /**
- * Unique ID for this message.
- */
- uint32_t fragment_id;
- /**
- * Which 'bit' did the last fragment we received correspond to?
- */
- unsigned int last_bit;
- /**
- * For the current ACK round, which is the first relevant
- * offset in @e frag_times?
- */
- unsigned int frag_times_start_offset;
- /**
- * Which offset whould we write the next frag value into
- * in the @e frag_times array? All smaller entries are valid.
- */
- unsigned int frag_times_write_offset;
- /**
- * Total size of the message that we are assembling.
- */
- uint16_t total_size;
- /**
- * Was the last fragment we got a duplicate?
- */
- int16_t last_duplicate;
- };
- /**
- * Defragmentation context (one per connection).
- */
- struct GNUNET_DEFRAGMENT_Context
- {
- /**
- * For statistics.
- */
- struct GNUNET_STATISTICS_Handle *stats;
- /**
- * Head of list of messages we're defragmenting.
- */
- struct MessageContext *head;
- /**
- * Tail of list of messages we're defragmenting.
- */
- struct MessageContext *tail;
- /**
- * Closure for @e proc and @e ackp.
- */
- void *cls;
- /**
- * Function to call with defragmented messages.
- */
- GNUNET_FRAGMENT_MessageProcessor proc;
- /**
- * Function to call with acknowledgements.
- */
- GNUNET_DEFRAGMENT_AckProcessor ackp;
- /**
- * Running average of the latency (delay between messages) for this
- * connection.
- */
- struct GNUNET_TIME_Relative latency;
- /**
- * num_msgs how many fragmented messages
- * to we defragment at most at the same time?
- */
- unsigned int num_msgs;
- /**
- * Current number of messages in the 'struct MessageContext'
- * DLL (smaller or equal to 'num_msgs').
- */
- unsigned int list_size;
- /**
- * Maximum message size for each fragment.
- */
- uint16_t mtu;
- };
- /**
- * Create a defragmentation context.
- *
- * @param stats statistics context
- * @param mtu the maximum message size for each fragment
- * @param num_msgs how many fragmented messages
- * to we defragment at most at the same time?
- * @param cls closure for @a proc and @a ackp
- * @param proc function to call with defragmented messages
- * @param ackp function to call with acknowledgements (to send
- * back to the other side)
- * @return the defragmentation context
- */
- struct GNUNET_DEFRAGMENT_Context *
- GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
- uint16_t mtu, unsigned int num_msgs,
- void *cls,
- GNUNET_FRAGMENT_MessageProcessor proc,
- GNUNET_DEFRAGMENT_AckProcessor ackp)
- {
- struct GNUNET_DEFRAGMENT_Context *dc;
- dc = GNUNET_new (struct GNUNET_DEFRAGMENT_Context);
- dc->stats = stats;
- dc->cls = cls;
- dc->proc = proc;
- dc->ackp = ackp;
- dc->num_msgs = num_msgs;
- dc->mtu = mtu;
- dc->latency = GNUNET_TIME_UNIT_SECONDS; /* start with likely overestimate */
- return dc;
- }
- /**
- * Destroy the given defragmentation context.
- *
- * @param dc defragmentation context
- */
- void
- GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc)
- {
- struct MessageContext *mc;
- while (NULL != (mc = dc->head))
- {
- GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, mc);
- dc->list_size--;
- if (NULL != mc->ack_task)
- {
- GNUNET_SCHEDULER_cancel (mc->ack_task);
- mc->ack_task = NULL;
- }
- GNUNET_free (mc);
- }
- GNUNET_assert (0 == dc->list_size);
- GNUNET_free (dc);
- }
- /**
- * Send acknowledgement to the other peer now.
- *
- * @param cls the message context
- */
- static void
- send_ack (void *cls)
- {
- struct MessageContext *mc = cls;
- struct GNUNET_DEFRAGMENT_Context *dc = mc->dc;
- struct FragmentAcknowledgement fa;
- mc->ack_task = NULL;
- fa.header.size = htons (sizeof (struct FragmentAcknowledgement));
- fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK);
- fa.fragment_id = htonl (mc->fragment_id);
- fa.bits = GNUNET_htonll (mc->bits);
- GNUNET_STATISTICS_update (mc->dc->stats,
- _("# acknowledgements sent for fragment"),
- 1,
- GNUNET_NO);
- mc->last_duplicate = GNUNET_NO; /* clear flag */
- dc->ackp (dc->cls,
- mc->fragment_id,
- &fa.header);
- }
- /**
- * This function is from the GNU Scientific Library, linear/fit.c,
- * Copyright (C) 2000 Brian Gough
- */
- static void
- gsl_fit_mul (const double *x, const size_t xstride, const double *y,
- const size_t ystride, const size_t n, double *c1, double *cov_11,
- double *sumsq)
- {
- double m_x = 0, m_y = 0, m_dx2 = 0, m_dxdy = 0;
- size_t i;
- for (i = 0; i < n; i++)
- {
- m_x += (x[i * xstride] - m_x) / (i + 1.0);
- m_y += (y[i * ystride] - m_y) / (i + 1.0);
- }
- for (i = 0; i < n; i++)
- {
- const double dx = x[i * xstride] - m_x;
- const double dy = y[i * ystride] - m_y;
- m_dx2 += (dx * dx - m_dx2) / (i + 1.0);
- m_dxdy += (dx * dy - m_dxdy) / (i + 1.0);
- }
- /* In terms of y = b x */
- {
- double s2 = 0, d2 = 0;
- double b = (m_x * m_y + m_dxdy) / (m_x * m_x + m_dx2);
- *c1 = b;
- /* Compute chi^2 = \sum (y_i - b * x_i)^2 */
- for (i = 0; i < n; i++)
- {
- const double dx = x[i * xstride] - m_x;
- const double dy = y[i * ystride] - m_y;
- const double d = (m_y - b * m_x) + dy - b * dx;
- d2 += d * d;
- }
- s2 = d2 / (n - 1.0); /* chisq per degree of freedom */
- *cov_11 = s2 * 1.0 / (n * (m_x * m_x + m_dx2));
- *sumsq = d2;
- }
- }
- /**
- * Estimate the latency between messages based on the most recent
- * message time stamps.
- *
- * @param mc context with time stamps
- * @return average delay between time stamps (based on least-squares fit)
- */
- static struct GNUNET_TIME_Relative
- estimate_latency (struct MessageContext *mc)
- {
- struct FragTimes *first;
- size_t total = mc->frag_times_write_offset - mc->frag_times_start_offset;
- double x[total];
- double y[total];
- size_t i;
- double c1;
- double cov11;
- double sumsq;
- struct GNUNET_TIME_Relative ret;
- first = &mc->frag_times[mc->frag_times_start_offset];
- GNUNET_assert (total > 1);
- for (i = 0; i < total; i++)
- {
- x[i] = (double) i;
- y[i] = (double) (first[i].time.abs_value_us - first[0].time.abs_value_us);
- }
- gsl_fit_mul (x, 1, y, 1, total, &c1, &cov11, &sumsq);
- c1 += sqrt (sumsq); /* add 1 std dev */
- ret.rel_value_us = (uint64_t) c1;
- if (0 == ret.rel_value_us)
- ret = GNUNET_TIME_UNIT_MICROSECONDS; /* always at least 1 */
- return ret;
- }
- /**
- * Discard the message context that was inactive for the longest time.
- *
- * @param dc defragmentation context
- */
- static void
- discard_oldest_mc (struct GNUNET_DEFRAGMENT_Context *dc)
- {
- struct MessageContext *old;
- struct MessageContext *pos;
- old = NULL;
- pos = dc->head;
- while (NULL != pos)
- {
- if ((old == NULL) ||
- (old->last_update.abs_value_us > pos->last_update.abs_value_us))
- old = pos;
- pos = pos->next;
- }
- GNUNET_assert (NULL != old);
- GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, old);
- dc->list_size--;
- if (NULL != old->ack_task)
- {
- GNUNET_SCHEDULER_cancel (old->ack_task);
- old->ack_task = NULL;
- }
- GNUNET_free (old);
- }
- /**
- * We have received a fragment. Process it.
- *
- * @param dc the context
- * @param msg the message that was received
- * @return #GNUNET_OK on success,
- * #GNUNET_NO if this was a duplicate,
- * #GNUNET_SYSERR on error
- */
- int
- GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc,
- const struct GNUNET_MessageHeader *msg)
- {
- struct MessageContext *mc;
- const struct FragmentHeader *fh;
- uint16_t msize;
- uint16_t foff;
- uint32_t fid;
- char *mbuf;
- unsigned int bit;
- struct GNUNET_TIME_Absolute now;
- struct GNUNET_TIME_Relative delay;
- unsigned int bc;
- unsigned int b;
- unsigned int n;
- unsigned int num_fragments;
- int duplicate;
- int last;
- if (ntohs (msg->size) < sizeof (struct FragmentHeader))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- if (ntohs (msg->size) > dc->mtu)
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- fh = (const struct FragmentHeader *) msg;
- msize = ntohs (fh->total_size);
- if (msize < sizeof (struct GNUNET_MessageHeader))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- fid = ntohl (fh->fragment_id);
- foff = ntohs (fh->offset);
- if (foff >= msize)
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- if (0 != (foff % (dc->mtu - sizeof (struct FragmentHeader))))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- GNUNET_STATISTICS_update (dc->stats,
- _("# fragments received"),
- 1,
- GNUNET_NO);
- num_fragments = (ntohs (msg->size) + dc->mtu - sizeof (struct FragmentHeader)-1) / (dc->mtu - sizeof (struct FragmentHeader));
- last = 0;
- for (mc = dc->head; NULL != mc; mc = mc->next)
- if (mc->fragment_id > fid)
- last++;
- mc = dc->head;
- while ((NULL != mc) && (fid != mc->fragment_id))
- mc = mc->next;
- bit = foff / (dc->mtu - sizeof (struct FragmentHeader));
- if (bit * (dc->mtu - sizeof (struct FragmentHeader)) + ntohs (msg->size) -
- sizeof (struct FragmentHeader) > msize)
- {
- /* payload extends past total message size */
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- if ((NULL != mc) && (msize != mc->total_size))
- {
- /* inconsistent message size */
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- now = GNUNET_TIME_absolute_get ();
- if (NULL == mc)
- {
- mc = GNUNET_malloc (sizeof (struct MessageContext) + msize);
- mc->msg = (const struct GNUNET_MessageHeader *) &mc[1];
- mc->dc = dc;
- mc->total_size = msize;
- mc->fragment_id = fid;
- mc->last_update = now;
- n = (msize + dc->mtu - sizeof (struct FragmentHeader) - 1) / (dc->mtu -
- sizeof (struct
- FragmentHeader));
- if (n == 64)
- mc->bits = UINT64_MAX; /* set all 64 bit */
- else
- mc->bits = (1LLU << n) - 1; /* set lowest 'bits' bit */
- if (dc->list_size >= dc->num_msgs)
- discard_oldest_mc (dc);
- GNUNET_CONTAINER_DLL_insert (dc->head,
- dc->tail,
- mc);
- dc->list_size++;
- }
- /* copy data to 'mc' */
- if (0 != (mc->bits & (1LLU << bit)))
- {
- mc->bits -= 1LLU << bit;
- mbuf = (char *) &mc[1];
- GNUNET_memcpy (&mbuf[bit * (dc->mtu - sizeof (struct FragmentHeader))], &fh[1],
- ntohs (msg->size) - sizeof (struct FragmentHeader));
- mc->last_update = now;
- if (bit < mc->last_bit)
- mc->frag_times_start_offset = mc->frag_times_write_offset;
- mc->last_bit = bit;
- mc->frag_times[mc->frag_times_write_offset].time = now;
- mc->frag_times[mc->frag_times_write_offset].bit = bit;
- mc->frag_times_write_offset++;
- duplicate = GNUNET_NO;
- }
- else
- {
- duplicate = GNUNET_YES;
- GNUNET_STATISTICS_update (dc->stats,
- _("# duplicate fragments received"),
- 1,
- GNUNET_NO);
- }
- /* count number of missing fragments after the current one */
- bc = 0;
- for (b = bit; b < 64; b++)
- if (0 != (mc->bits & (1LLU << b)))
- bc++;
- else
- bc = 0;
- /* notify about complete message */
- if ( (GNUNET_NO == duplicate) &&
- (0 == mc->bits) )
- {
- GNUNET_STATISTICS_update (dc->stats,
- _("# messages defragmented"),
- 1,
- GNUNET_NO);
- /* message complete, notify! */
- dc->proc (dc->cls, mc->msg);
- }
- /* send ACK */
- if (mc->frag_times_write_offset - mc->frag_times_start_offset > 1)
- {
- dc->latency = estimate_latency (mc);
- }
- delay = GNUNET_TIME_relative_saturating_multiply (dc->latency,
- bc + 1);
- if ( (last + fid == num_fragments) ||
- (0 == mc->bits) ||
- (GNUNET_YES == duplicate) )
- {
- /* message complete or duplicate or last missing fragment in
- linear sequence; ACK now! */
- delay = GNUNET_TIME_UNIT_ZERO;
- }
- if (NULL != mc->ack_task)
- GNUNET_SCHEDULER_cancel (mc->ack_task);
- mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay,
- &send_ack,
- mc);
- if (GNUNET_YES == duplicate)
- {
- mc->last_duplicate = GNUNET_YES;
- return GNUNET_NO;
- }
- return GNUNET_YES;
- }
- /* end of defragmentation.c */
|