mst.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2010, 2016, 2017 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file util/mst.c
  18. * @brief convenience functions for handling inbound message buffers
  19. * @author Christian Grothoff
  20. */
  21. #include "platform.h"
  22. #include "gnunet_util_lib.h"
  23. #if HAVE_UNALIGNED_64_ACCESS
  24. #define ALIGN_FACTOR 4
  25. #else
  26. #define ALIGN_FACTOR 8
  27. #endif
  28. #define LOG(kind, ...) GNUNET_log_from (kind, "util-mst", __VA_ARGS__)
  29. /**
  30. * Handle to a message stream tokenizer.
  31. */
  32. struct GNUNET_MessageStreamTokenizer
  33. {
  34. /**
  35. * Function to call on completed messages.
  36. */
  37. GNUNET_MessageTokenizerCallback cb;
  38. /**
  39. * Closure for @e cb.
  40. */
  41. void *cb_cls;
  42. /**
  43. * Size of the buffer (starting at @e hdr).
  44. */
  45. size_t curr_buf;
  46. /**
  47. * How many bytes in buffer have we already processed?
  48. */
  49. size_t off;
  50. /**
  51. * How many bytes in buffer are valid right now?
  52. */
  53. size_t pos;
  54. /**
  55. * Beginning of the buffer. Typed like this to force alignment.
  56. */
  57. struct GNUNET_MessageHeader *hdr;
  58. };
  59. /**
  60. * Create a message stream tokenizer.
  61. *
  62. * @param cb function to call on completed messages
  63. * @param cb_cls closure for @a cb
  64. * @return handle to tokenizer
  65. */
  66. struct GNUNET_MessageStreamTokenizer *
  67. GNUNET_MST_create (GNUNET_MessageTokenizerCallback cb,
  68. void *cb_cls)
  69. {
  70. struct GNUNET_MessageStreamTokenizer *ret;
  71. ret = GNUNET_new (struct GNUNET_MessageStreamTokenizer);
  72. ret->hdr = GNUNET_malloc (GNUNET_MIN_MESSAGE_SIZE);
  73. ret->curr_buf = GNUNET_MIN_MESSAGE_SIZE;
  74. ret->cb = cb;
  75. ret->cb_cls = cb_cls;
  76. return ret;
  77. }
  78. /**
  79. * Add incoming data to the receive buffer and call the
  80. * callback for all complete messages.
  81. *
  82. * @param mst tokenizer to use
  83. * @param buf input data to add
  84. * @param size number of bytes in @a buf
  85. * @param purge should any excess bytes in the buffer be discarded
  86. * (i.e. for packet-based services like UDP)
  87. * @param one_shot only call callback once, keep rest of message in buffer
  88. * @return #GNUNET_OK if we are done processing (need more data)
  89. * #GNUNET_NO if @a one_shot was set and we have another message ready
  90. * #GNUNET_SYSERR if the data stream is corrupt
  91. */
  92. int
  93. GNUNET_MST_from_buffer (struct GNUNET_MessageStreamTokenizer *mst,
  94. const char *buf,
  95. size_t size,
  96. int purge,
  97. int one_shot)
  98. {
  99. const struct GNUNET_MessageHeader *hdr;
  100. size_t delta;
  101. uint16_t want;
  102. char *ibuf;
  103. int need_align;
  104. unsigned long offset;
  105. int ret;
  106. int cbret;
  107. GNUNET_assert (mst->off <= mst->pos);
  108. GNUNET_assert (mst->pos <= mst->curr_buf);
  109. LOG (GNUNET_ERROR_TYPE_DEBUG,
  110. "MST receives %u bytes with %u bytes already in private buffer\n",
  111. (unsigned int) size,
  112. (unsigned int) (mst->pos - mst->off));
  113. ret = GNUNET_OK;
  114. ibuf = (char *) mst->hdr;
  115. while (mst->pos > 0)
  116. {
  117. do_align:
  118. GNUNET_assert (mst->pos >= mst->off);
  119. if ((mst->curr_buf - mst->off < sizeof(struct GNUNET_MessageHeader)) ||
  120. (0 != (mst->off % ALIGN_FACTOR)))
  121. {
  122. /* need to align or need more space */
  123. mst->pos -= mst->off;
  124. memmove (ibuf,
  125. &ibuf[mst->off],
  126. mst->pos);
  127. mst->off = 0;
  128. }
  129. if (mst->pos - mst->off < sizeof(struct GNUNET_MessageHeader))
  130. {
  131. delta
  132. = GNUNET_MIN (sizeof(struct GNUNET_MessageHeader)
  133. - (mst->pos - mst->off),
  134. size);
  135. GNUNET_memcpy (&ibuf[mst->pos],
  136. buf,
  137. delta);
  138. mst->pos += delta;
  139. buf += delta;
  140. size -= delta;
  141. }
  142. if (mst->pos - mst->off < sizeof(struct GNUNET_MessageHeader))
  143. {
  144. if (purge)
  145. {
  146. mst->off = 0;
  147. mst->pos = 0;
  148. }
  149. return GNUNET_OK;
  150. }
  151. hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
  152. want = ntohs (hdr->size);
  153. if (want < sizeof(struct GNUNET_MessageHeader))
  154. {
  155. GNUNET_break_op (0);
  156. return GNUNET_SYSERR;
  157. }
  158. if ((mst->curr_buf - mst->off < want) &&
  159. (mst->off > 0))
  160. {
  161. /* can get more space by moving */
  162. mst->pos -= mst->off;
  163. memmove (ibuf,
  164. &ibuf[mst->off],
  165. mst->pos);
  166. mst->off = 0;
  167. }
  168. if (mst->curr_buf < want)
  169. {
  170. /* need to get more space by growing buffer */
  171. GNUNET_assert (0 == mst->off);
  172. mst->hdr = GNUNET_realloc (mst->hdr,
  173. want);
  174. ibuf = (char *) mst->hdr;
  175. mst->curr_buf = want;
  176. }
  177. hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
  178. if (mst->pos - mst->off < want)
  179. {
  180. delta = GNUNET_MIN (want - (mst->pos - mst->off),
  181. size);
  182. GNUNET_assert (mst->pos + delta <= mst->curr_buf);
  183. GNUNET_memcpy (&ibuf[mst->pos],
  184. buf,
  185. delta);
  186. mst->pos += delta;
  187. buf += delta;
  188. size -= delta;
  189. }
  190. if (mst->pos - mst->off < want)
  191. {
  192. if (purge)
  193. {
  194. mst->off = 0;
  195. mst->pos = 0;
  196. }
  197. return GNUNET_OK;
  198. }
  199. if (one_shot == GNUNET_SYSERR)
  200. {
  201. /* cannot call callback again, but return value saying that
  202. * we have another full message in the buffer */
  203. ret = GNUNET_NO;
  204. goto copy;
  205. }
  206. if (one_shot == GNUNET_YES)
  207. one_shot = GNUNET_SYSERR;
  208. mst->off += want;
  209. if (GNUNET_OK !=
  210. (cbret = mst->cb (mst->cb_cls,
  211. hdr)))
  212. {
  213. if (GNUNET_SYSERR == cbret)
  214. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  215. "Failure processing message of type %u and size %u\n",
  216. ntohs (hdr->type),
  217. ntohs (hdr->size));
  218. return GNUNET_SYSERR;
  219. }
  220. if (mst->off == mst->pos)
  221. {
  222. /* reset to beginning of buffer, it's free right now! */
  223. mst->off = 0;
  224. mst->pos = 0;
  225. }
  226. }
  227. GNUNET_assert (0 == mst->pos);
  228. while (size > 0)
  229. {
  230. LOG (GNUNET_ERROR_TYPE_DEBUG,
  231. "Server-mst has %u bytes left in inbound buffer\n",
  232. (unsigned int) size);
  233. if (size < sizeof(struct GNUNET_MessageHeader))
  234. break;
  235. offset = (unsigned long) buf;
  236. need_align = (0 != (offset % ALIGN_FACTOR)) ? GNUNET_YES : GNUNET_NO;
  237. if (GNUNET_NO == need_align)
  238. {
  239. /* can try to do zero-copy and process directly from original buffer */
  240. hdr = (const struct GNUNET_MessageHeader *) buf;
  241. want = ntohs (hdr->size);
  242. if (want < sizeof(struct GNUNET_MessageHeader))
  243. {
  244. GNUNET_break_op (0);
  245. mst->off = 0;
  246. return GNUNET_SYSERR;
  247. }
  248. if (size < want)
  249. break; /* or not: buffer incomplete, so copy to private buffer... */
  250. if (one_shot == GNUNET_SYSERR)
  251. {
  252. /* cannot call callback again, but return value saying that
  253. * we have another full message in the buffer */
  254. ret = GNUNET_NO;
  255. goto copy;
  256. }
  257. if (one_shot == GNUNET_YES)
  258. one_shot = GNUNET_SYSERR;
  259. if (GNUNET_OK !=
  260. (cbret = mst->cb (mst->cb_cls,
  261. hdr)))
  262. {
  263. if (GNUNET_SYSERR == cbret)
  264. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  265. "Failure processing message of type %u and size %u\n",
  266. ntohs (hdr->type),
  267. ntohs (hdr->size));
  268. return GNUNET_SYSERR;
  269. }
  270. buf += want;
  271. size -= want;
  272. }
  273. else
  274. {
  275. /* need to copy to private buffer to align;
  276. * yes, we go a bit more spaghetti than usual here */
  277. goto do_align;
  278. }
  279. }
  280. copy:
  281. if ((size > 0) && (! purge))
  282. {
  283. if (size + mst->pos > mst->curr_buf)
  284. {
  285. mst->hdr = GNUNET_realloc (mst->hdr,
  286. size + mst->pos);
  287. ibuf = (char *) mst->hdr;
  288. mst->curr_buf = size + mst->pos;
  289. }
  290. GNUNET_assert (size + mst->pos <= mst->curr_buf);
  291. GNUNET_memcpy (&ibuf[mst->pos],
  292. buf,
  293. size);
  294. mst->pos += size;
  295. }
  296. if (purge)
  297. {
  298. mst->off = 0;
  299. mst->pos = 0;
  300. }
  301. LOG (GNUNET_ERROR_TYPE_DEBUG,
  302. "Server-mst leaves %u bytes in private buffer\n",
  303. (unsigned int) (mst->pos - mst->off));
  304. return ret;
  305. }
  306. /**
  307. * Add incoming data to the receive buffer and call the
  308. * callback for all complete messages.
  309. *
  310. * @param mst tokenizer to use
  311. * @param buf input data to add
  312. * @param size number of bytes in @a buf
  313. * @param purge should any excess bytes in the buffer be discarded
  314. * (i.e. for packet-based services like UDP)
  315. * @param one_shot only call callback once, keep rest of message in buffer
  316. * @return #GNUNET_OK if we are done processing (need more data)
  317. * #GNUNET_NO if one_shot was set and we have another message ready
  318. * #GNUNET_SYSERR if the data stream is corrupt
  319. */
  320. int
  321. GNUNET_MST_read (struct GNUNET_MessageStreamTokenizer *mst,
  322. struct GNUNET_NETWORK_Handle *sock,
  323. int purge,
  324. int one_shot)
  325. {
  326. ssize_t ret;
  327. size_t left;
  328. char *buf;
  329. left = mst->curr_buf - mst->pos;
  330. buf = (char *) mst->hdr;
  331. ret = GNUNET_NETWORK_socket_recv (sock,
  332. &buf[mst->pos],
  333. left);
  334. if (-1 == ret)
  335. {
  336. if ((EAGAIN == errno) ||
  337. (EINTR == errno))
  338. return GNUNET_OK;
  339. GNUNET_log_strerror (GNUNET_ERROR_TYPE_INFO,
  340. "recv");
  341. return GNUNET_SYSERR;
  342. }
  343. if (0 == ret)
  344. {
  345. /* other side closed connection, treat as error */
  346. return GNUNET_SYSERR;
  347. }
  348. mst->pos += ret;
  349. return GNUNET_MST_from_buffer (mst,
  350. NULL,
  351. 0,
  352. purge,
  353. one_shot);
  354. }
  355. /**
  356. * Obtain the next message from the @a mst, assuming that
  357. * there are more unprocessed messages in the internal buffer
  358. * of the @a mst.
  359. *
  360. * @param mst tokenizer to use
  361. * @param one_shot only call callback once, keep rest of message in buffer
  362. * @return #GNUNET_OK if we are done processing (need more data)
  363. * #GNUNET_NO if one_shot was set and we have another message ready
  364. * #GNUNET_SYSERR if the data stream is corrupt
  365. */
  366. int
  367. GNUNET_MST_next (struct GNUNET_MessageStreamTokenizer *mst,
  368. int one_shot)
  369. {
  370. return GNUNET_MST_from_buffer (mst,
  371. NULL,
  372. 0,
  373. GNUNET_NO,
  374. one_shot);
  375. }
  376. /**
  377. * Destroys a tokenizer.
  378. *
  379. * @param mst tokenizer to destroy
  380. */
  381. void
  382. GNUNET_MST_destroy (struct GNUNET_MessageStreamTokenizer *mst)
  383. {
  384. GNUNET_free (mst->hdr);
  385. GNUNET_free (mst);
  386. }
  387. /* end of server_mst.c */