defragmentation.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590
  1. /*
  2. This file is part of GNUnet
  3. Copyright (C) 2009, 2011 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file src/fragmentation/defragmentation.c
  18. * @brief library to help defragment messages
  19. * @author Christian Grothoff
  20. */
  21. #include "platform.h"
  22. #include "gnunet_fragmentation_lib.h"
  23. #include "fragmentation.h"
  24. /**
  25. * Timestamps for fragments.
  26. */
  27. struct FragTimes
  28. {
  29. /**
  30. * The time the fragment was received.
  31. */
  32. struct GNUNET_TIME_Absolute time;
  33. /**
  34. * Number of the bit for the fragment (in [0,..,63]).
  35. */
  36. unsigned int bit;
  37. };
  38. /**
  39. * Information we keep for one message that is being assembled. Note
  40. * that we keep the context around even after the assembly is done to
  41. * handle 'stray' messages that are received 'late'. A message
  42. * context is ONLY discarded when the queue gets too big.
  43. */
  44. struct MessageContext
  45. {
  46. /**
  47. * This is a DLL.
  48. */
  49. struct MessageContext *next;
  50. /**
  51. * This is a DLL.
  52. */
  53. struct MessageContext *prev;
  54. /**
  55. * Associated defragmentation context.
  56. */
  57. struct GNUNET_DEFRAGMENT_Context *dc;
  58. /**
  59. * Pointer to the assembled message, allocated at the
  60. * end of this struct.
  61. */
  62. const struct GNUNET_MessageHeader *msg;
  63. /**
  64. * Last time we received any update for this message
  65. * (least-recently updated message will be discarded
  66. * if we hit the queue size).
  67. */
  68. struct GNUNET_TIME_Absolute last_update;
  69. /**
  70. * Task scheduled for transmitting the next ACK to the
  71. * other peer.
  72. */
  73. struct GNUNET_SCHEDULER_Task * ack_task;
  74. /**
  75. * When did we receive which fragment? Used to calculate
  76. * the time we should send the ACK.
  77. */
  78. struct FragTimes frag_times[64];
  79. /**
  80. * Which fragments have we gotten yet? bits that are 1
  81. * indicate missing fragments.
  82. */
  83. uint64_t bits;
  84. /**
  85. * Unique ID for this message.
  86. */
  87. uint32_t fragment_id;
  88. /**
  89. * Which 'bit' did the last fragment we received correspond to?
  90. */
  91. unsigned int last_bit;
  92. /**
  93. * For the current ACK round, which is the first relevant
  94. * offset in @e frag_times?
  95. */
  96. unsigned int frag_times_start_offset;
  97. /**
  98. * Which offset whould we write the next frag value into
  99. * in the @e frag_times array? All smaller entries are valid.
  100. */
  101. unsigned int frag_times_write_offset;
  102. /**
  103. * Total size of the message that we are assembling.
  104. */
  105. uint16_t total_size;
  106. /**
  107. * Was the last fragment we got a duplicate?
  108. */
  109. int16_t last_duplicate;
  110. };
  111. /**
  112. * Defragmentation context (one per connection).
  113. */
  114. struct GNUNET_DEFRAGMENT_Context
  115. {
  116. /**
  117. * For statistics.
  118. */
  119. struct GNUNET_STATISTICS_Handle *stats;
  120. /**
  121. * Head of list of messages we're defragmenting.
  122. */
  123. struct MessageContext *head;
  124. /**
  125. * Tail of list of messages we're defragmenting.
  126. */
  127. struct MessageContext *tail;
  128. /**
  129. * Closure for @e proc and @e ackp.
  130. */
  131. void *cls;
  132. /**
  133. * Function to call with defragmented messages.
  134. */
  135. GNUNET_FRAGMENT_MessageProcessor proc;
  136. /**
  137. * Function to call with acknowledgements.
  138. */
  139. GNUNET_DEFRAGMENT_AckProcessor ackp;
  140. /**
  141. * Running average of the latency (delay between messages) for this
  142. * connection.
  143. */
  144. struct GNUNET_TIME_Relative latency;
  145. /**
  146. * num_msgs how many fragmented messages
  147. * to we defragment at most at the same time?
  148. */
  149. unsigned int num_msgs;
  150. /**
  151. * Current number of messages in the 'struct MessageContext'
  152. * DLL (smaller or equal to 'num_msgs').
  153. */
  154. unsigned int list_size;
  155. /**
  156. * Maximum message size for each fragment.
  157. */
  158. uint16_t mtu;
  159. };
  160. /**
  161. * Create a defragmentation context.
  162. *
  163. * @param stats statistics context
  164. * @param mtu the maximum message size for each fragment
  165. * @param num_msgs how many fragmented messages
  166. * to we defragment at most at the same time?
  167. * @param cls closure for @a proc and @a ackp
  168. * @param proc function to call with defragmented messages
  169. * @param ackp function to call with acknowledgements (to send
  170. * back to the other side)
  171. * @return the defragmentation context
  172. */
  173. struct GNUNET_DEFRAGMENT_Context *
  174. GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
  175. uint16_t mtu, unsigned int num_msgs,
  176. void *cls,
  177. GNUNET_FRAGMENT_MessageProcessor proc,
  178. GNUNET_DEFRAGMENT_AckProcessor ackp)
  179. {
  180. struct GNUNET_DEFRAGMENT_Context *dc;
  181. dc = GNUNET_new (struct GNUNET_DEFRAGMENT_Context);
  182. dc->stats = stats;
  183. dc->cls = cls;
  184. dc->proc = proc;
  185. dc->ackp = ackp;
  186. dc->num_msgs = num_msgs;
  187. dc->mtu = mtu;
  188. dc->latency = GNUNET_TIME_UNIT_SECONDS; /* start with likely overestimate */
  189. return dc;
  190. }
  191. /**
  192. * Destroy the given defragmentation context.
  193. *
  194. * @param dc defragmentation context
  195. */
  196. void
  197. GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc)
  198. {
  199. struct MessageContext *mc;
  200. while (NULL != (mc = dc->head))
  201. {
  202. GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, mc);
  203. dc->list_size--;
  204. if (NULL != mc->ack_task)
  205. {
  206. GNUNET_SCHEDULER_cancel (mc->ack_task);
  207. mc->ack_task = NULL;
  208. }
  209. GNUNET_free (mc);
  210. }
  211. GNUNET_assert (0 == dc->list_size);
  212. GNUNET_free (dc);
  213. }
  214. /**
  215. * Send acknowledgement to the other peer now.
  216. *
  217. * @param cls the message context
  218. */
  219. static void
  220. send_ack (void *cls)
  221. {
  222. struct MessageContext *mc = cls;
  223. struct GNUNET_DEFRAGMENT_Context *dc = mc->dc;
  224. struct FragmentAcknowledgement fa;
  225. mc->ack_task = NULL;
  226. fa.header.size = htons (sizeof (struct FragmentAcknowledgement));
  227. fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK);
  228. fa.fragment_id = htonl (mc->fragment_id);
  229. fa.bits = GNUNET_htonll (mc->bits);
  230. GNUNET_STATISTICS_update (mc->dc->stats,
  231. _("# acknowledgements sent for fragment"),
  232. 1,
  233. GNUNET_NO);
  234. mc->last_duplicate = GNUNET_NO; /* clear flag */
  235. dc->ackp (dc->cls,
  236. mc->fragment_id,
  237. &fa.header);
  238. }
  239. /**
  240. * This function is from the GNU Scientific Library, linear/fit.c,
  241. * Copyright (C) 2000 Brian Gough
  242. */
  243. static void
  244. gsl_fit_mul (const double *x, const size_t xstride, const double *y,
  245. const size_t ystride, const size_t n, double *c1, double *cov_11,
  246. double *sumsq)
  247. {
  248. double m_x = 0, m_y = 0, m_dx2 = 0, m_dxdy = 0;
  249. size_t i;
  250. for (i = 0; i < n; i++)
  251. {
  252. m_x += (x[i * xstride] - m_x) / (i + 1.0);
  253. m_y += (y[i * ystride] - m_y) / (i + 1.0);
  254. }
  255. for (i = 0; i < n; i++)
  256. {
  257. const double dx = x[i * xstride] - m_x;
  258. const double dy = y[i * ystride] - m_y;
  259. m_dx2 += (dx * dx - m_dx2) / (i + 1.0);
  260. m_dxdy += (dx * dy - m_dxdy) / (i + 1.0);
  261. }
  262. /* In terms of y = b x */
  263. {
  264. double s2 = 0, d2 = 0;
  265. double b = (m_x * m_y + m_dxdy) / (m_x * m_x + m_dx2);
  266. *c1 = b;
  267. /* Compute chi^2 = \sum (y_i - b * x_i)^2 */
  268. for (i = 0; i < n; i++)
  269. {
  270. const double dx = x[i * xstride] - m_x;
  271. const double dy = y[i * ystride] - m_y;
  272. const double d = (m_y - b * m_x) + dy - b * dx;
  273. d2 += d * d;
  274. }
  275. s2 = d2 / (n - 1.0); /* chisq per degree of freedom */
  276. *cov_11 = s2 * 1.0 / (n * (m_x * m_x + m_dx2));
  277. *sumsq = d2;
  278. }
  279. }
  280. /**
  281. * Estimate the latency between messages based on the most recent
  282. * message time stamps.
  283. *
  284. * @param mc context with time stamps
  285. * @return average delay between time stamps (based on least-squares fit)
  286. */
  287. static struct GNUNET_TIME_Relative
  288. estimate_latency (struct MessageContext *mc)
  289. {
  290. struct FragTimes *first;
  291. size_t total = mc->frag_times_write_offset - mc->frag_times_start_offset;
  292. double x[total];
  293. double y[total];
  294. size_t i;
  295. double c1;
  296. double cov11;
  297. double sumsq;
  298. struct GNUNET_TIME_Relative ret;
  299. first = &mc->frag_times[mc->frag_times_start_offset];
  300. GNUNET_assert (total > 1);
  301. for (i = 0; i < total; i++)
  302. {
  303. x[i] = (double) i;
  304. y[i] = (double) (first[i].time.abs_value_us - first[0].time.abs_value_us);
  305. }
  306. gsl_fit_mul (x, 1, y, 1, total, &c1, &cov11, &sumsq);
  307. c1 += sqrt (sumsq); /* add 1 std dev */
  308. ret.rel_value_us = (uint64_t) c1;
  309. if (0 == ret.rel_value_us)
  310. ret = GNUNET_TIME_UNIT_MICROSECONDS; /* always at least 1 */
  311. return ret;
  312. }
  313. /**
  314. * Discard the message context that was inactive for the longest time.
  315. *
  316. * @param dc defragmentation context
  317. */
  318. static void
  319. discard_oldest_mc (struct GNUNET_DEFRAGMENT_Context *dc)
  320. {
  321. struct MessageContext *old;
  322. struct MessageContext *pos;
  323. old = NULL;
  324. pos = dc->head;
  325. while (NULL != pos)
  326. {
  327. if ((old == NULL) ||
  328. (old->last_update.abs_value_us > pos->last_update.abs_value_us))
  329. old = pos;
  330. pos = pos->next;
  331. }
  332. GNUNET_assert (NULL != old);
  333. GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, old);
  334. dc->list_size--;
  335. if (NULL != old->ack_task)
  336. {
  337. GNUNET_SCHEDULER_cancel (old->ack_task);
  338. old->ack_task = NULL;
  339. }
  340. GNUNET_free (old);
  341. }
  342. /**
  343. * We have received a fragment. Process it.
  344. *
  345. * @param dc the context
  346. * @param msg the message that was received
  347. * @return #GNUNET_OK on success,
  348. * #GNUNET_NO if this was a duplicate,
  349. * #GNUNET_SYSERR on error
  350. */
  351. int
  352. GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc,
  353. const struct GNUNET_MessageHeader *msg)
  354. {
  355. struct MessageContext *mc;
  356. const struct FragmentHeader *fh;
  357. uint16_t msize;
  358. uint16_t foff;
  359. uint32_t fid;
  360. char *mbuf;
  361. unsigned int bit;
  362. struct GNUNET_TIME_Absolute now;
  363. struct GNUNET_TIME_Relative delay;
  364. unsigned int bc;
  365. unsigned int b;
  366. unsigned int n;
  367. unsigned int num_fragments;
  368. int duplicate;
  369. int last;
  370. if (ntohs (msg->size) < sizeof (struct FragmentHeader))
  371. {
  372. GNUNET_break_op (0);
  373. return GNUNET_SYSERR;
  374. }
  375. if (ntohs (msg->size) > dc->mtu)
  376. {
  377. GNUNET_break_op (0);
  378. return GNUNET_SYSERR;
  379. }
  380. fh = (const struct FragmentHeader *) msg;
  381. msize = ntohs (fh->total_size);
  382. if (msize < sizeof (struct GNUNET_MessageHeader))
  383. {
  384. GNUNET_break_op (0);
  385. return GNUNET_SYSERR;
  386. }
  387. fid = ntohl (fh->fragment_id);
  388. foff = ntohs (fh->offset);
  389. if (foff >= msize)
  390. {
  391. GNUNET_break_op (0);
  392. return GNUNET_SYSERR;
  393. }
  394. if (0 != (foff % (dc->mtu - sizeof (struct FragmentHeader))))
  395. {
  396. GNUNET_break_op (0);
  397. return GNUNET_SYSERR;
  398. }
  399. GNUNET_STATISTICS_update (dc->stats,
  400. _("# fragments received"),
  401. 1,
  402. GNUNET_NO);
  403. num_fragments = (ntohs (msg->size) + dc->mtu - sizeof (struct FragmentHeader)-1) / (dc->mtu - sizeof (struct FragmentHeader));
  404. last = 0;
  405. for (mc = dc->head; NULL != mc; mc = mc->next)
  406. if (mc->fragment_id > fid)
  407. last++;
  408. mc = dc->head;
  409. while ((NULL != mc) && (fid != mc->fragment_id))
  410. mc = mc->next;
  411. bit = foff / (dc->mtu - sizeof (struct FragmentHeader));
  412. if (bit * (dc->mtu - sizeof (struct FragmentHeader)) + ntohs (msg->size) -
  413. sizeof (struct FragmentHeader) > msize)
  414. {
  415. /* payload extends past total message size */
  416. GNUNET_break_op (0);
  417. return GNUNET_SYSERR;
  418. }
  419. if ((NULL != mc) && (msize != mc->total_size))
  420. {
  421. /* inconsistent message size */
  422. GNUNET_break_op (0);
  423. return GNUNET_SYSERR;
  424. }
  425. now = GNUNET_TIME_absolute_get ();
  426. if (NULL == mc)
  427. {
  428. mc = GNUNET_malloc (sizeof (struct MessageContext) + msize);
  429. mc->msg = (const struct GNUNET_MessageHeader *) &mc[1];
  430. mc->dc = dc;
  431. mc->total_size = msize;
  432. mc->fragment_id = fid;
  433. mc->last_update = now;
  434. n = (msize + dc->mtu - sizeof (struct FragmentHeader) - 1) / (dc->mtu -
  435. sizeof (struct
  436. FragmentHeader));
  437. if (n == 64)
  438. mc->bits = UINT64_MAX; /* set all 64 bit */
  439. else
  440. mc->bits = (1LLU << n) - 1; /* set lowest 'bits' bit */
  441. if (dc->list_size >= dc->num_msgs)
  442. discard_oldest_mc (dc);
  443. GNUNET_CONTAINER_DLL_insert (dc->head,
  444. dc->tail,
  445. mc);
  446. dc->list_size++;
  447. }
  448. /* copy data to 'mc' */
  449. if (0 != (mc->bits & (1LLU << bit)))
  450. {
  451. mc->bits -= 1LLU << bit;
  452. mbuf = (char *) &mc[1];
  453. GNUNET_memcpy (&mbuf[bit * (dc->mtu - sizeof (struct FragmentHeader))], &fh[1],
  454. ntohs (msg->size) - sizeof (struct FragmentHeader));
  455. mc->last_update = now;
  456. if (bit < mc->last_bit)
  457. mc->frag_times_start_offset = mc->frag_times_write_offset;
  458. mc->last_bit = bit;
  459. mc->frag_times[mc->frag_times_write_offset].time = now;
  460. mc->frag_times[mc->frag_times_write_offset].bit = bit;
  461. mc->frag_times_write_offset++;
  462. duplicate = GNUNET_NO;
  463. }
  464. else
  465. {
  466. duplicate = GNUNET_YES;
  467. GNUNET_STATISTICS_update (dc->stats,
  468. _("# duplicate fragments received"),
  469. 1,
  470. GNUNET_NO);
  471. }
  472. /* count number of missing fragments after the current one */
  473. bc = 0;
  474. for (b = bit; b < 64; b++)
  475. if (0 != (mc->bits & (1LLU << b)))
  476. bc++;
  477. else
  478. bc = 0;
  479. /* notify about complete message */
  480. if ( (GNUNET_NO == duplicate) &&
  481. (0 == mc->bits) )
  482. {
  483. GNUNET_STATISTICS_update (dc->stats,
  484. _("# messages defragmented"),
  485. 1,
  486. GNUNET_NO);
  487. /* message complete, notify! */
  488. dc->proc (dc->cls, mc->msg);
  489. }
  490. /* send ACK */
  491. if (mc->frag_times_write_offset - mc->frag_times_start_offset > 1)
  492. {
  493. dc->latency = estimate_latency (mc);
  494. }
  495. delay = GNUNET_TIME_relative_saturating_multiply (dc->latency,
  496. bc + 1);
  497. if ( (last + fid == num_fragments) ||
  498. (0 == mc->bits) ||
  499. (GNUNET_YES == duplicate) )
  500. {
  501. /* message complete or duplicate or last missing fragment in
  502. linear sequence; ACK now! */
  503. delay = GNUNET_TIME_UNIT_ZERO;
  504. }
  505. if (NULL != mc->ack_task)
  506. GNUNET_SCHEDULER_cancel (mc->ack_task);
  507. mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay,
  508. &send_ack,
  509. mc);
  510. if (GNUNET_YES == duplicate)
  511. {
  512. mc->last_duplicate = GNUNET_YES;
  513. return GNUNET_NO;
  514. }
  515. return GNUNET_YES;
  516. }
  517. /* end of defragmentation.c */