tcp_server_mst_legacy.c 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2010 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file util/server_mst.c
  18. * @brief convenience functions for handling inbound message buffers
  19. * @author Christian Grothoff
  20. */
  21. #include "platform.h"
  22. #include "gnunet_util_lib.h"
  23. #if HAVE_UNALIGNED_64_ACCESS
  24. #define ALIGN_FACTOR 4
  25. #else
  26. #define ALIGN_FACTOR 8
  27. #endif
  28. /**
  29. * Handle to a message stream tokenizer.
  30. */
  31. struct GNUNET_SERVER_MessageStreamTokenizer
  32. {
  33. /**
  34. * Function to call on completed messages.
  35. */
  36. GNUNET_SERVER_MessageTokenizerCallback cb;
  37. /**
  38. * Closure for @e cb.
  39. */
  40. void *cb_cls;
  41. /**
  42. * Size of the buffer (starting at @e hdr).
  43. */
  44. size_t curr_buf;
  45. /**
  46. * How many bytes in buffer have we already processed?
  47. */
  48. size_t off;
  49. /**
  50. * How many bytes in buffer are valid right now?
  51. */
  52. size_t pos;
  53. /**
  54. * Beginning of the buffer. Typed like this to force alignment.
  55. */
  56. struct GNUNET_MessageHeader *hdr;
  57. };
  58. /**
  59. * Create a message stream tokenizer.
  60. *
  61. * @param cb function to call on completed messages
  62. * @param cb_cls closure for @a cb
  63. * @return handle to tokenizer
  64. */
  65. struct GNUNET_SERVER_MessageStreamTokenizer *
  66. GNUNET_SERVER_mst_create (GNUNET_SERVER_MessageTokenizerCallback cb,
  67. void *cb_cls)
  68. {
  69. struct GNUNET_SERVER_MessageStreamTokenizer *ret;
  70. ret = GNUNET_new (struct GNUNET_SERVER_MessageStreamTokenizer);
  71. ret->hdr = GNUNET_malloc (GNUNET_MIN_MESSAGE_SIZE);
  72. ret->curr_buf = GNUNET_MIN_MESSAGE_SIZE;
  73. ret->cb = cb;
  74. ret->cb_cls = cb_cls;
  75. return ret;
  76. }
  77. /**
  78. * Add incoming data to the receive buffer and call the
  79. * callback for all complete messages.
  80. *
  81. * @param mst tokenizer to use
  82. * @param client_identity ID of client for which this is a buffer
  83. * @param buf input data to add
  84. * @param size number of bytes in @a buf
  85. * @param purge should any excess bytes in the buffer be discarded
  86. * (i.e. for packet-based services like UDP)
  87. * @param one_shot only call callback once, keep rest of message in buffer
  88. * @return #GNUNET_OK if we are done processing (need more data)
  89. * #GNUNET_NO if @a one_shot was set and we have another message ready
  90. * #GNUNET_SYSERR if the data stream is corrupt
  91. */
  92. int
  93. GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst,
  94. void *client_identity,
  95. const char *buf, size_t size,
  96. int purge, int one_shot)
  97. {
  98. const struct GNUNET_MessageHeader *hdr;
  99. size_t delta;
  100. uint16_t want;
  101. char *ibuf;
  102. int need_align;
  103. unsigned long offset;
  104. int ret;
  105. GNUNET_assert (mst->off <= mst->pos);
  106. GNUNET_assert (mst->pos <= mst->curr_buf);
  107. LOG (GNUNET_ERROR_TYPE_DEBUG,
  108. "Server-mst receives %u bytes with %u bytes already in private buffer\n",
  109. (unsigned int) size, (unsigned int) (mst->pos - mst->off));
  110. ret = GNUNET_OK;
  111. ibuf = (char *) mst->hdr;
  112. while (mst->pos > 0)
  113. {
  114. do_align:
  115. GNUNET_assert (mst->pos >= mst->off);
  116. if ((mst->curr_buf - mst->off < sizeof(struct GNUNET_MessageHeader)) ||
  117. (0 != (mst->off % ALIGN_FACTOR)))
  118. {
  119. /* need to align or need more space */
  120. mst->pos -= mst->off;
  121. memmove (ibuf, &ibuf[mst->off], mst->pos);
  122. mst->off = 0;
  123. }
  124. if (mst->pos - mst->off < sizeof(struct GNUNET_MessageHeader))
  125. {
  126. delta =
  127. GNUNET_MIN (sizeof(struct GNUNET_MessageHeader)
  128. - (mst->pos - mst->off), size);
  129. GNUNET_memcpy (&ibuf[mst->pos], buf, delta);
  130. mst->pos += delta;
  131. buf += delta;
  132. size -= delta;
  133. }
  134. if (mst->pos - mst->off < sizeof(struct GNUNET_MessageHeader))
  135. {
  136. if (purge)
  137. {
  138. mst->off = 0;
  139. mst->pos = 0;
  140. }
  141. return GNUNET_OK;
  142. }
  143. hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
  144. want = ntohs (hdr->size);
  145. if (want < sizeof(struct GNUNET_MessageHeader))
  146. {
  147. GNUNET_break_op (0);
  148. return GNUNET_SYSERR;
  149. }
  150. if ((mst->curr_buf - mst->off < want) &&
  151. (mst->off > 0))
  152. {
  153. /* can get more space by moving */
  154. mst->pos -= mst->off;
  155. memmove (ibuf, &ibuf[mst->off], mst->pos);
  156. mst->off = 0;
  157. }
  158. if (mst->curr_buf < want)
  159. {
  160. /* need to get more space by growing buffer */
  161. GNUNET_assert (0 == mst->off);
  162. mst->hdr = GNUNET_realloc (mst->hdr, want);
  163. ibuf = (char *) mst->hdr;
  164. mst->curr_buf = want;
  165. }
  166. hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
  167. if (mst->pos - mst->off < want)
  168. {
  169. delta = GNUNET_MIN (want - (mst->pos - mst->off), size);
  170. GNUNET_assert (mst->pos + delta <= mst->curr_buf);
  171. GNUNET_memcpy (&ibuf[mst->pos], buf, delta);
  172. mst->pos += delta;
  173. buf += delta;
  174. size -= delta;
  175. }
  176. if (mst->pos - mst->off < want)
  177. {
  178. if (purge)
  179. {
  180. mst->off = 0;
  181. mst->pos = 0;
  182. }
  183. return GNUNET_OK;
  184. }
  185. if (one_shot == GNUNET_SYSERR)
  186. {
  187. /* cannot call callback again, but return value saying that
  188. * we have another full message in the buffer */
  189. ret = GNUNET_NO;
  190. goto copy;
  191. }
  192. if (one_shot == GNUNET_YES)
  193. one_shot = GNUNET_SYSERR;
  194. mst->off += want;
  195. if (GNUNET_SYSERR == mst->cb (mst->cb_cls, client_identity, hdr))
  196. return GNUNET_SYSERR;
  197. if (mst->off == mst->pos)
  198. {
  199. /* reset to beginning of buffer, it's free right now! */
  200. mst->off = 0;
  201. mst->pos = 0;
  202. }
  203. }
  204. GNUNET_assert (0 == mst->pos);
  205. while (size > 0)
  206. {
  207. LOG (GNUNET_ERROR_TYPE_DEBUG,
  208. "Server-mst has %u bytes left in inbound buffer\n",
  209. (unsigned int) size);
  210. if (size < sizeof(struct GNUNET_MessageHeader))
  211. break;
  212. offset = (unsigned long) buf;
  213. need_align = (0 != (offset % ALIGN_FACTOR)) ? GNUNET_YES : GNUNET_NO;
  214. if (GNUNET_NO == need_align)
  215. {
  216. /* can try to do zero-copy and process directly from original buffer */
  217. hdr = (const struct GNUNET_MessageHeader *) buf;
  218. want = ntohs (hdr->size);
  219. if (want < sizeof(struct GNUNET_MessageHeader))
  220. {
  221. GNUNET_break_op (0);
  222. mst->off = 0;
  223. return GNUNET_SYSERR;
  224. }
  225. if (size < want)
  226. break; /* or not: buffer incomplete, so copy to private buffer... */
  227. if (one_shot == GNUNET_SYSERR)
  228. {
  229. /* cannot call callback again, but return value saying that
  230. * we have another full message in the buffer */
  231. ret = GNUNET_NO;
  232. goto copy;
  233. }
  234. if (one_shot == GNUNET_YES)
  235. one_shot = GNUNET_SYSERR;
  236. if (GNUNET_SYSERR == mst->cb (mst->cb_cls, client_identity, hdr))
  237. return GNUNET_SYSERR;
  238. buf += want;
  239. size -= want;
  240. }
  241. else
  242. {
  243. /* need to copy to private buffer to align;
  244. * yes, we go a bit more spagetti than usual here */
  245. goto do_align;
  246. }
  247. }
  248. copy:
  249. if ((size > 0) && (! purge))
  250. {
  251. if (size + mst->pos > mst->curr_buf)
  252. {
  253. mst->hdr = GNUNET_realloc (mst->hdr, size + mst->pos);
  254. ibuf = (char *) mst->hdr;
  255. mst->curr_buf = size + mst->pos;
  256. }
  257. GNUNET_assert (size + mst->pos <= mst->curr_buf);
  258. GNUNET_memcpy (&ibuf[mst->pos], buf, size);
  259. mst->pos += size;
  260. }
  261. if (purge)
  262. {
  263. mst->off = 0;
  264. mst->pos = 0;
  265. }
  266. LOG (GNUNET_ERROR_TYPE_DEBUG,
  267. "Server-mst leaves %u bytes in private buffer\n",
  268. (unsigned int) (mst->pos - mst->off));
  269. return ret;
  270. }
  271. /**
  272. * Destroys a tokenizer.
  273. *
  274. * @param mst tokenizer to destroy
  275. */
  276. void
  277. GNUNET_SERVER_mst_destroy (struct GNUNET_SERVER_MessageStreamTokenizer *mst)
  278. {
  279. GNUNET_free (mst->hdr);
  280. GNUNET_free (mst);
  281. }
  282. /* end of server_mst.c */