defragmentation.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561
  1. /*
  2. This file is part of GNUnet
  3. (C) 2009, 2011 Christian Grothoff (and other contributing authors)
  4. GNUnet is free software; you can redistribute it and/or modify
  5. it under the terms of the GNU General Public License as published
  6. by the Free Software Foundation; either version 3, or (at your
  7. option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. General Public License for more details.
  12. You should have received a copy of the GNU General Public License
  13. along with GNUnet; see the file COPYING. If not, write to the
  14. Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  15. Boston, MA 02111-1307, USA.
  16. */
  17. /**
  18. * @file src/fragmentation/defragmentation.c
  19. * @brief library to help defragment messages
  20. * @author Christian Grothoff
  21. */
  22. #include "platform.h"
  23. #include "gnunet_fragmentation_lib.h"
  24. #include "fragmentation.h"
  25. /**
  26. * Timestamps for fragments.
  27. */
  28. struct FragTimes
  29. {
  30. /**
  31. * The time the fragment was received.
  32. */
  33. struct GNUNET_TIME_Absolute time;
  34. /**
  35. * Number of the bit for the fragment (in [0,..,63]).
  36. */
  37. unsigned int bit;
  38. };
  39. /**
  40. * Information we keep for one message that is being assembled. Note
  41. * that we keep the context around even after the assembly is done to
  42. * handle 'stray' messages that are received 'late'. A message
  43. * context is ONLY discarded when the queue gets too big.
  44. */
  45. struct MessageContext
  46. {
  47. /**
  48. * This is a DLL.
  49. */
  50. struct MessageContext *next;
  51. /**
  52. * This is a DLL.
  53. */
  54. struct MessageContext *prev;
  55. /**
  56. * Associated defragmentation context.
  57. */
  58. struct GNUNET_DEFRAGMENT_Context *dc;
  59. /**
  60. * Pointer to the assembled message, allocated at the
  61. * end of this struct.
  62. */
  63. const struct GNUNET_MessageHeader *msg;
  64. /**
  65. * Last time we received any update for this message
  66. * (least-recently updated message will be discarded
  67. * if we hit the queue size).
  68. */
  69. struct GNUNET_TIME_Absolute last_update;
  70. /**
  71. * Task scheduled for transmitting the next ACK to the
  72. * other peer.
  73. */
  74. GNUNET_SCHEDULER_TaskIdentifier ack_task;
  75. /**
  76. * When did we receive which fragment? Used to calculate
  77. * the time we should send the ACK.
  78. */
  79. struct FragTimes frag_times[64];
  80. /**
  81. * Which fragments have we gotten yet? bits that are 1
  82. * indicate missing fragments.
  83. */
  84. uint64_t bits;
  85. /**
  86. * Unique ID for this message.
  87. */
  88. uint32_t fragment_id;
  89. /**
  90. * Which 'bit' did the last fragment we received correspond to?
  91. */
  92. unsigned int last_bit;
  93. /**
  94. * For the current ACK round, which is the first relevant
  95. * offset in 'frag_times'?
  96. */
  97. unsigned int frag_times_start_offset;
  98. /**
  99. * Which offset whould we write the next frag value into
  100. * in the 'frag_times' array? All smaller entries are valid.
  101. */
  102. unsigned int frag_times_write_offset;
  103. /**
  104. * Total size of the message that we are assembling.
  105. */
  106. uint16_t total_size;
  107. };
  108. /**
  109. * Defragmentation context (one per connection).
  110. */
  111. struct GNUNET_DEFRAGMENT_Context
  112. {
  113. /**
  114. * For statistics.
  115. */
  116. struct GNUNET_STATISTICS_Handle *stats;
  117. /**
  118. * Head of list of messages we're defragmenting.
  119. */
  120. struct MessageContext *head;
  121. /**
  122. * Tail of list of messages we're defragmenting.
  123. */
  124. struct MessageContext *tail;
  125. /**
  126. * Closure for 'proc' and 'ackp'.
  127. */
  128. void *cls;
  129. /**
  130. * Function to call with defragmented messages.
  131. */
  132. GNUNET_FRAGMENT_MessageProcessor proc;
  133. /**
  134. * Function to call with acknowledgements.
  135. */
  136. GNUNET_DEFRAGMENT_AckProcessor ackp;
  137. /**
  138. * Running average of the latency (delay between messages) for this
  139. * connection.
  140. */
  141. struct GNUNET_TIME_Relative latency;
  142. /**
  143. * num_msgs how many fragmented messages
  144. * to we defragment at most at the same time?
  145. */
  146. unsigned int num_msgs;
  147. /**
  148. * Current number of messages in the 'struct MessageContext'
  149. * DLL (smaller or equal to 'num_msgs').
  150. */
  151. unsigned int list_size;
  152. /**
  153. * Maximum message size for each fragment.
  154. */
  155. uint16_t mtu;
  156. };
  157. /**
  158. * Create a defragmentation context.
  159. *
  160. * @param stats statistics context
  161. * @param mtu the maximum message size for each fragment
  162. * @param num_msgs how many fragmented messages
  163. * to we defragment at most at the same time?
  164. * @param cls closure for proc and ackp
  165. * @param proc function to call with defragmented messages
  166. * @param ackp function to call with acknowledgements (to send
  167. * back to the other side)
  168. * @return the defragmentation context
  169. */
  170. struct GNUNET_DEFRAGMENT_Context *
  171. GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
  172. uint16_t mtu, unsigned int num_msgs,
  173. void *cls,
  174. GNUNET_FRAGMENT_MessageProcessor proc,
  175. GNUNET_DEFRAGMENT_AckProcessor ackp)
  176. {
  177. struct GNUNET_DEFRAGMENT_Context *dc;
  178. dc = GNUNET_new (struct GNUNET_DEFRAGMENT_Context);
  179. dc->stats = stats;
  180. dc->cls = cls;
  181. dc->proc = proc;
  182. dc->ackp = ackp;
  183. dc->num_msgs = num_msgs;
  184. dc->mtu = mtu;
  185. dc->latency = GNUNET_TIME_UNIT_SECONDS; /* start with likely overestimate */
  186. return dc;
  187. }
  188. /**
  189. * Destroy the given defragmentation context.
  190. *
  191. * @param dc defragmentation context
  192. */
  193. void
  194. GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc)
  195. {
  196. struct MessageContext *mc;
  197. while (NULL != (mc = dc->head))
  198. {
  199. GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, mc);
  200. dc->list_size--;
  201. if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task)
  202. {
  203. GNUNET_SCHEDULER_cancel (mc->ack_task);
  204. mc->ack_task = GNUNET_SCHEDULER_NO_TASK;
  205. }
  206. GNUNET_free (mc);
  207. }
  208. GNUNET_assert (0 == dc->list_size);
  209. GNUNET_free (dc);
  210. }
  211. /**
  212. * Send acknowledgement to the other peer now.
  213. *
  214. * @param cls the message context
  215. * @param tc the scheduler context
  216. */
  217. static void
  218. send_ack (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  219. {
  220. struct MessageContext *mc = cls;
  221. struct GNUNET_DEFRAGMENT_Context *dc = mc->dc;
  222. struct FragmentAcknowledgement fa;
  223. mc->ack_task = GNUNET_SCHEDULER_NO_TASK;
  224. fa.header.size = htons (sizeof (struct FragmentAcknowledgement));
  225. fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK);
  226. fa.fragment_id = htonl (mc->fragment_id);
  227. fa.bits = GNUNET_htonll (mc->bits);
  228. GNUNET_STATISTICS_update (mc->dc->stats,
  229. _("# acknowledgements sent for fragment"), 1,
  230. GNUNET_NO);
  231. dc->ackp (dc->cls, mc->fragment_id, &fa.header);
  232. }
  233. /**
  234. * This function is from the GNU Scientific Library, linear/fit.c,
  235. * (C) 2000 Brian Gough
  236. */
  237. static void
  238. gsl_fit_mul (const double *x, const size_t xstride, const double *y,
  239. const size_t ystride, const size_t n, double *c1, double *cov_11,
  240. double *sumsq)
  241. {
  242. double m_x = 0, m_y = 0, m_dx2 = 0, m_dxdy = 0;
  243. size_t i;
  244. for (i = 0; i < n; i++)
  245. {
  246. m_x += (x[i * xstride] - m_x) / (i + 1.0);
  247. m_y += (y[i * ystride] - m_y) / (i + 1.0);
  248. }
  249. for (i = 0; i < n; i++)
  250. {
  251. const double dx = x[i * xstride] - m_x;
  252. const double dy = y[i * ystride] - m_y;
  253. m_dx2 += (dx * dx - m_dx2) / (i + 1.0);
  254. m_dxdy += (dx * dy - m_dxdy) / (i + 1.0);
  255. }
  256. /* In terms of y = b x */
  257. {
  258. double s2 = 0, d2 = 0;
  259. double b = (m_x * m_y + m_dxdy) / (m_x * m_x + m_dx2);
  260. *c1 = b;
  261. /* Compute chi^2 = \sum (y_i - b * x_i)^2 */
  262. for (i = 0; i < n; i++)
  263. {
  264. const double dx = x[i * xstride] - m_x;
  265. const double dy = y[i * ystride] - m_y;
  266. const double d = (m_y - b * m_x) + dy - b * dx;
  267. d2 += d * d;
  268. }
  269. s2 = d2 / (n - 1.0); /* chisq per degree of freedom */
  270. *cov_11 = s2 * 1.0 / (n * (m_x * m_x + m_dx2));
  271. *sumsq = d2;
  272. }
  273. }
  274. /**
  275. * Estimate the latency between messages based on the most recent
  276. * message time stamps.
  277. *
  278. * @param mc context with time stamps
  279. * @return average delay between time stamps (based on least-squares fit)
  280. */
  281. static struct GNUNET_TIME_Relative
  282. estimate_latency (struct MessageContext *mc)
  283. {
  284. struct FragTimes *first;
  285. size_t total = mc->frag_times_write_offset - mc->frag_times_start_offset;
  286. double x[total];
  287. double y[total];
  288. size_t i;
  289. double c1;
  290. double cov11;
  291. double sumsq;
  292. struct GNUNET_TIME_Relative ret;
  293. first = &mc->frag_times[mc->frag_times_start_offset];
  294. GNUNET_assert (total > 1);
  295. for (i = 0; i < total; i++)
  296. {
  297. x[i] = (double) i;
  298. y[i] = (double) (first[i].time.abs_value_us - first[0].time.abs_value_us);
  299. }
  300. gsl_fit_mul (x, 1, y, 1, total, &c1, &cov11, &sumsq);
  301. c1 += sqrt (sumsq); /* add 1 std dev */
  302. ret.rel_value_us = (uint64_t) c1;
  303. if (0 == ret.rel_value_us)
  304. ret = GNUNET_TIME_UNIT_MICROSECONDS; /* always at least 1 */
  305. return ret;
  306. }
  307. /**
  308. * Discard the message context that was inactive for the longest time.
  309. *
  310. * @param dc defragmentation context
  311. */
  312. static void
  313. discard_oldest_mc (struct GNUNET_DEFRAGMENT_Context *dc)
  314. {
  315. struct MessageContext *old;
  316. struct MessageContext *pos;
  317. old = NULL;
  318. pos = dc->head;
  319. while (NULL != pos)
  320. {
  321. if ((old == NULL) ||
  322. (old->last_update.abs_value_us > pos->last_update.abs_value_us))
  323. old = pos;
  324. pos = pos->next;
  325. }
  326. GNUNET_assert (NULL != old);
  327. GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, old);
  328. dc->list_size--;
  329. if (GNUNET_SCHEDULER_NO_TASK != old->ack_task)
  330. {
  331. GNUNET_SCHEDULER_cancel (old->ack_task);
  332. old->ack_task = GNUNET_SCHEDULER_NO_TASK;
  333. }
  334. GNUNET_free (old);
  335. }
  336. /**
  337. * We have received a fragment. Process it.
  338. *
  339. * @param dc the context
  340. * @param msg the message that was received
  341. * @return GNUNET_OK on success, GNUNET_NO if this was a duplicate, GNUNET_SYSERR on error
  342. */
  343. int
  344. GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc,
  345. const struct GNUNET_MessageHeader *msg)
  346. {
  347. struct MessageContext *mc;
  348. const struct FragmentHeader *fh;
  349. uint16_t msize;
  350. uint16_t foff;
  351. uint32_t fid;
  352. char *mbuf;
  353. unsigned int bit;
  354. struct GNUNET_TIME_Absolute now;
  355. struct GNUNET_TIME_Relative delay;
  356. unsigned int bc;
  357. unsigned int b;
  358. unsigned int n;
  359. unsigned int num_fragments;
  360. int duplicate;
  361. int last;
  362. if (ntohs (msg->size) < sizeof (struct FragmentHeader))
  363. {
  364. GNUNET_break_op (0);
  365. return GNUNET_SYSERR;
  366. }
  367. if (ntohs (msg->size) > dc->mtu)
  368. {
  369. GNUNET_break_op (0);
  370. return GNUNET_SYSERR;
  371. }
  372. fh = (const struct FragmentHeader *) msg;
  373. msize = ntohs (fh->total_size);
  374. if (msize < sizeof (struct GNUNET_MessageHeader))
  375. {
  376. GNUNET_break_op (0);
  377. return GNUNET_SYSERR;
  378. }
  379. fid = ntohl (fh->fragment_id);
  380. foff = ntohs (fh->offset);
  381. if (foff >= msize)
  382. {
  383. GNUNET_break_op (0);
  384. return GNUNET_SYSERR;
  385. }
  386. if (0 != (foff % (dc->mtu - sizeof (struct FragmentHeader))))
  387. {
  388. GNUNET_break_op (0);
  389. return GNUNET_SYSERR;
  390. }
  391. GNUNET_STATISTICS_update (dc->stats, _("# fragments received"), 1, GNUNET_NO);
  392. num_fragments = (ntohs (msg->size) + dc->mtu - sizeof (struct FragmentHeader)-1) / (dc->mtu - sizeof (struct FragmentHeader));
  393. last = 0;
  394. for (mc = dc->head; NULL != mc; mc = mc->next)
  395. if (mc->fragment_id > fid)
  396. last++;
  397. mc = dc->head;
  398. while ((NULL != mc) && (fid != mc->fragment_id))
  399. mc = mc->next;
  400. bit = foff / (dc->mtu - sizeof (struct FragmentHeader));
  401. if (bit * (dc->mtu - sizeof (struct FragmentHeader)) + ntohs (msg->size) -
  402. sizeof (struct FragmentHeader) > msize)
  403. {
  404. /* payload extends past total message size */
  405. GNUNET_break_op (0);
  406. return GNUNET_SYSERR;
  407. }
  408. if ((NULL != mc) && (msize != mc->total_size))
  409. {
  410. /* inconsistent message size */
  411. GNUNET_break_op (0);
  412. return GNUNET_SYSERR;
  413. }
  414. now = GNUNET_TIME_absolute_get ();
  415. if (NULL == mc)
  416. {
  417. mc = GNUNET_malloc (sizeof (struct MessageContext) + msize);
  418. mc->msg = (const struct GNUNET_MessageHeader *) &mc[1];
  419. mc->dc = dc;
  420. mc->total_size = msize;
  421. mc->fragment_id = fid;
  422. mc->last_update = now;
  423. n = (msize + dc->mtu - sizeof (struct FragmentHeader) - 1) / (dc->mtu -
  424. sizeof (struct
  425. FragmentHeader));
  426. if (n == 64)
  427. mc->bits = UINT64_MAX; /* set all 64 bit */
  428. else
  429. mc->bits = (1LL << n) - 1; /* set lowest 'bits' bit */
  430. if (dc->list_size >= dc->num_msgs)
  431. discard_oldest_mc (dc);
  432. GNUNET_CONTAINER_DLL_insert (dc->head, dc->tail, mc);
  433. dc->list_size++;
  434. }
  435. /* copy data to 'mc' */
  436. if (0 != (mc->bits & (1LL << bit)))
  437. {
  438. mc->bits -= 1LL << bit;
  439. mbuf = (char *) &mc[1];
  440. memcpy (&mbuf[bit * (dc->mtu - sizeof (struct FragmentHeader))], &fh[1],
  441. ntohs (msg->size) - sizeof (struct FragmentHeader));
  442. mc->last_update = now;
  443. if (bit < mc->last_bit)
  444. mc->frag_times_start_offset = mc->frag_times_write_offset;
  445. mc->last_bit = bit;
  446. mc->frag_times[mc->frag_times_write_offset].time = now;
  447. mc->frag_times[mc->frag_times_write_offset].bit = bit;
  448. mc->frag_times_write_offset++;
  449. duplicate = GNUNET_NO;
  450. }
  451. else
  452. {
  453. duplicate = GNUNET_YES;
  454. GNUNET_STATISTICS_update (dc->stats, _("# duplicate fragments received"), 1,
  455. GNUNET_NO);
  456. }
  457. /* count number of missing fragments */
  458. bc = 0;
  459. for (b = 0; b < 64; b++)
  460. if (0 != (mc->bits & (1LL << b)))
  461. bc++;
  462. /* notify about complete message */
  463. if ((duplicate == GNUNET_NO) && (0 == mc->bits))
  464. {
  465. GNUNET_STATISTICS_update (dc->stats, _("# messages defragmented"), 1,
  466. GNUNET_NO);
  467. /* message complete, notify! */
  468. dc->proc (dc->cls, mc->msg);
  469. }
  470. /* send ACK */
  471. if (mc->frag_times_write_offset - mc->frag_times_start_offset > 1)
  472. {
  473. dc->latency = estimate_latency (mc);
  474. }
  475. delay = GNUNET_TIME_relative_multiply (dc->latency, bc + 1);
  476. if ( (last + fid == num_fragments) ||
  477. (0 == mc->bits) ||
  478. (GNUNET_YES == duplicate))
  479. {
  480. /* message complete or duplicate or last missing fragment in
  481. linear sequence; ACK now! */
  482. delay = GNUNET_TIME_UNIT_ZERO;
  483. }
  484. if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task)
  485. GNUNET_SCHEDULER_cancel (mc->ack_task);
  486. mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay, &send_ack, mc);
  487. if (duplicate == GNUNET_YES)
  488. return GNUNET_NO;
  489. return GNUNET_YES;
  490. }
  491. /* end of defragmentation.c */