quic_sstream.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. /*
  2. * Copyright 2022-2023 The OpenSSL Project Authors. All Rights Reserved.
  3. *
  4. * Licensed under the Apache License 2.0 (the "License"). You may not use
  5. * this file except in compliance with the License. You can obtain a copy
  6. * in the file LICENSE in the source distribution or at
  7. * https://www.openssl.org/source/license.html
  8. */
  9. #include "internal/quic_stream.h"
  10. #include "internal/uint_set.h"
  11. #include "internal/common.h"
  12. #include "internal/ring_buf.h"
  13. /*
  14. * ==================================================================
  15. * QUIC Send Stream
  16. */
  17. struct quic_sstream_st {
  18. struct ring_buf ring_buf;
  19. /*
  20. * Any logical byte in the stream is in one of these states:
  21. *
  22. * - NEW: The byte has not yet been transmitted, or has been lost and is
  23. * in need of retransmission.
  24. *
  25. * - IN_FLIGHT: The byte has been transmitted but is awaiting
  26. * acknowledgement. We continue to store the data in case we return
  27. * to the NEW state.
  28. *
  29. * - ACKED: The byte has been acknowledged and we can cease storing it.
  30. * We do not necessarily cull it immediately, so there may be a delay
  31. * between reaching the ACKED state and the buffer space actually being
  32. * recycled.
  33. *
  34. * A logical byte in the stream is
  35. *
  36. * - in the NEW state if it is in new_set;
  37. * - is in the ACKED state if it is in acked_set
  38. * (and may or may not have been culled);
  39. * - is in the IN_FLIGHT state otherwise.
  40. *
  41. * Invariant: No logical byte is ever in both new_set and acked_set.
  42. */
  43. UINT_SET new_set, acked_set;
  44. /*
  45. * The current size of the stream is ring_buf.head_offset. If
  46. * have_final_size is true, this is also the final size of the stream.
  47. */
  48. unsigned int have_final_size : 1;
  49. unsigned int sent_final_size : 1;
  50. unsigned int acked_final_size : 1;
  51. unsigned int cleanse : 1;
  52. };
  53. static void qss_cull(QUIC_SSTREAM *qss);
  54. QUIC_SSTREAM *ossl_quic_sstream_new(size_t init_buf_size)
  55. {
  56. QUIC_SSTREAM *qss;
  57. qss = OPENSSL_zalloc(sizeof(QUIC_SSTREAM));
  58. if (qss == NULL)
  59. return NULL;
  60. ring_buf_init(&qss->ring_buf);
  61. if (!ring_buf_resize(&qss->ring_buf, init_buf_size, 0)) {
  62. ring_buf_destroy(&qss->ring_buf, 0);
  63. OPENSSL_free(qss);
  64. return NULL;
  65. }
  66. ossl_uint_set_init(&qss->new_set);
  67. ossl_uint_set_init(&qss->acked_set);
  68. return qss;
  69. }
  70. void ossl_quic_sstream_free(QUIC_SSTREAM *qss)
  71. {
  72. if (qss == NULL)
  73. return;
  74. ossl_uint_set_destroy(&qss->new_set);
  75. ossl_uint_set_destroy(&qss->acked_set);
  76. ring_buf_destroy(&qss->ring_buf, qss->cleanse);
  77. OPENSSL_free(qss);
  78. }
  79. int ossl_quic_sstream_get_stream_frame(QUIC_SSTREAM *qss,
  80. size_t skip,
  81. OSSL_QUIC_FRAME_STREAM *hdr,
  82. OSSL_QTX_IOVEC *iov,
  83. size_t *num_iov)
  84. {
  85. size_t num_iov_ = 0, src_len = 0, total_len = 0, i;
  86. uint64_t max_len;
  87. const unsigned char *src = NULL;
  88. UINT_SET_ITEM *range = ossl_list_uint_set_head(&qss->new_set);
  89. if (*num_iov < 2)
  90. return 0;
  91. for (i = 0; i < skip && range != NULL; ++i)
  92. range = ossl_list_uint_set_next(range);
  93. if (range == NULL) {
  94. if (i < skip)
  95. /* Don't return FIN for infinitely increasing skip */
  96. return 0;
  97. /* No new bytes to send, but we might have a FIN */
  98. if (!qss->have_final_size || qss->sent_final_size)
  99. return 0;
  100. hdr->offset = qss->ring_buf.head_offset;
  101. hdr->len = 0;
  102. hdr->is_fin = 1;
  103. *num_iov = 0;
  104. return 1;
  105. }
  106. /*
  107. * We can only send a contiguous range of logical bytes in a single
  108. * stream frame, so limit ourselves to the range of the first set entry.
  109. *
  110. * Set entries never have 'adjacent' entries so we don't have to worry
  111. * about them here.
  112. */
  113. max_len = range->range.end - range->range.start + 1;
  114. for (i = 0;; ++i) {
  115. if (total_len >= max_len)
  116. break;
  117. if (!ring_buf_get_buf_at(&qss->ring_buf,
  118. range->range.start + total_len,
  119. &src, &src_len))
  120. return 0;
  121. if (src_len == 0)
  122. break;
  123. assert(i < 2);
  124. if (total_len + src_len > max_len)
  125. src_len = (size_t)(max_len - total_len);
  126. iov[num_iov_].buf = src;
  127. iov[num_iov_].buf_len = src_len;
  128. total_len += src_len;
  129. ++num_iov_;
  130. }
  131. hdr->offset = range->range.start;
  132. hdr->len = total_len;
  133. hdr->is_fin = qss->have_final_size
  134. && hdr->offset + hdr->len == qss->ring_buf.head_offset;
  135. *num_iov = num_iov_;
  136. return 1;
  137. }
  138. int ossl_quic_sstream_has_pending(QUIC_SSTREAM *qss)
  139. {
  140. OSSL_QUIC_FRAME_STREAM shdr;
  141. OSSL_QTX_IOVEC iov[2];
  142. size_t num_iov = OSSL_NELEM(iov);
  143. return ossl_quic_sstream_get_stream_frame(qss, 0, &shdr, iov, &num_iov);
  144. }
  145. uint64_t ossl_quic_sstream_get_cur_size(QUIC_SSTREAM *qss)
  146. {
  147. return qss->ring_buf.head_offset;
  148. }
  149. int ossl_quic_sstream_mark_transmitted(QUIC_SSTREAM *qss,
  150. uint64_t start,
  151. uint64_t end)
  152. {
  153. UINT_RANGE r;
  154. r.start = start;
  155. r.end = end;
  156. if (!ossl_uint_set_remove(&qss->new_set, &r))
  157. return 0;
  158. return 1;
  159. }
  160. int ossl_quic_sstream_mark_transmitted_fin(QUIC_SSTREAM *qss,
  161. uint64_t final_size)
  162. {
  163. /*
  164. * We do not really need final_size since we already know the size of the
  165. * stream, but this serves as a sanity check.
  166. */
  167. if (!qss->have_final_size || final_size != qss->ring_buf.head_offset)
  168. return 0;
  169. qss->sent_final_size = 1;
  170. return 1;
  171. }
  172. int ossl_quic_sstream_mark_lost(QUIC_SSTREAM *qss,
  173. uint64_t start,
  174. uint64_t end)
  175. {
  176. UINT_RANGE r;
  177. r.start = start;
  178. r.end = end;
  179. /*
  180. * We lost a range of stream data bytes, so reinsert them into the new set,
  181. * so that they are returned once more by ossl_quic_sstream_get_stream_frame.
  182. */
  183. if (!ossl_uint_set_insert(&qss->new_set, &r))
  184. return 0;
  185. return 1;
  186. }
  187. int ossl_quic_sstream_mark_lost_fin(QUIC_SSTREAM *qss)
  188. {
  189. if (qss->acked_final_size)
  190. /* Does not make sense to lose a FIN after it has been ACKed */
  191. return 0;
  192. /* FIN was lost, so we need to transmit it again. */
  193. qss->sent_final_size = 0;
  194. return 1;
  195. }
  196. int ossl_quic_sstream_mark_acked(QUIC_SSTREAM *qss,
  197. uint64_t start,
  198. uint64_t end)
  199. {
  200. UINT_RANGE r;
  201. r.start = start;
  202. r.end = end;
  203. if (!ossl_uint_set_insert(&qss->acked_set, &r))
  204. return 0;
  205. qss_cull(qss);
  206. return 1;
  207. }
  208. int ossl_quic_sstream_mark_acked_fin(QUIC_SSTREAM *qss)
  209. {
  210. if (!qss->have_final_size)
  211. /* Cannot ack final size before we have a final size */
  212. return 0;
  213. qss->acked_final_size = 1;
  214. return 1;
  215. }
  216. void ossl_quic_sstream_fin(QUIC_SSTREAM *qss)
  217. {
  218. if (qss->have_final_size)
  219. return;
  220. qss->have_final_size = 1;
  221. }
  222. int ossl_quic_sstream_get_final_size(QUIC_SSTREAM *qss, uint64_t *final_size)
  223. {
  224. if (!qss->have_final_size)
  225. return 0;
  226. if (final_size != NULL)
  227. *final_size = qss->ring_buf.head_offset;
  228. return 1;
  229. }
  230. int ossl_quic_sstream_append(QUIC_SSTREAM *qss,
  231. const unsigned char *buf,
  232. size_t buf_len,
  233. size_t *consumed)
  234. {
  235. size_t l, consumed_ = 0;
  236. UINT_RANGE r;
  237. struct ring_buf old_ring_buf = qss->ring_buf;
  238. if (qss->have_final_size) {
  239. *consumed = 0;
  240. return 0;
  241. }
  242. /*
  243. * Note: It is assumed that ossl_quic_sstream_append will be called during a
  244. * call to e.g. SSL_write and this function is therefore designed to support
  245. * such semantics. In particular, the buffer pointed to by buf is only
  246. * assumed to be valid for the duration of this call, therefore we must copy
  247. * the data here. We will later copy-and-encrypt the data during packet
  248. * encryption, so this is a two-copy design. Supporting a one-copy design in
  249. * the future will require applications to use a different kind of API.
  250. * Supporting such changes in future will require corresponding enhancements
  251. * to this code.
  252. */
  253. while (buf_len > 0) {
  254. l = ring_buf_push(&qss->ring_buf, buf, buf_len);
  255. if (l == 0)
  256. break;
  257. buf += l;
  258. buf_len -= l;
  259. consumed_ += l;
  260. }
  261. if (consumed_ > 0) {
  262. r.start = old_ring_buf.head_offset;
  263. r.end = r.start + consumed_ - 1;
  264. assert(r.end + 1 == qss->ring_buf.head_offset);
  265. if (!ossl_uint_set_insert(&qss->new_set, &r)) {
  266. qss->ring_buf = old_ring_buf;
  267. *consumed = 0;
  268. return 0;
  269. }
  270. }
  271. *consumed = consumed_;
  272. return 1;
  273. }
  274. static void qss_cull(QUIC_SSTREAM *qss)
  275. {
  276. UINT_SET_ITEM *h = ossl_list_uint_set_head(&qss->acked_set);
  277. /*
  278. * Potentially cull data from our ring buffer. This can happen once data has
  279. * been ACKed and we know we are never going to have to transmit it again.
  280. *
  281. * Since we use a ring buffer design for simplicity, we cannot cull byte n +
  282. * k (for k > 0) from the ring buffer until byte n has also been culled.
  283. * This means if parts of the stream get acknowledged out of order we might
  284. * keep around some data we technically don't need to for a while. The
  285. * impact of this is likely to be small and limited to quite a short
  286. * duration, and doesn't justify the use of a more complex design.
  287. */
  288. /*
  289. * We only need to check the first range entry in the integer set because we
  290. * can only cull contiguous areas at the start of the ring buffer anyway.
  291. */
  292. if (h != NULL)
  293. ring_buf_cpop_range(&qss->ring_buf, h->range.start, h->range.end,
  294. qss->cleanse);
  295. }
  296. int ossl_quic_sstream_set_buffer_size(QUIC_SSTREAM *qss, size_t num_bytes)
  297. {
  298. return ring_buf_resize(&qss->ring_buf, num_bytes, qss->cleanse);
  299. }
  300. size_t ossl_quic_sstream_get_buffer_size(QUIC_SSTREAM *qss)
  301. {
  302. return qss->ring_buf.alloc;
  303. }
  304. size_t ossl_quic_sstream_get_buffer_used(QUIC_SSTREAM *qss)
  305. {
  306. return ring_buf_used(&qss->ring_buf);
  307. }
  308. size_t ossl_quic_sstream_get_buffer_avail(QUIC_SSTREAM *qss)
  309. {
  310. return ring_buf_avail(&qss->ring_buf);
  311. }
  312. int ossl_quic_sstream_is_totally_acked(QUIC_SSTREAM *qss)
  313. {
  314. UINT_RANGE r;
  315. uint64_t cur_size;
  316. if (qss->have_final_size && !qss->acked_final_size)
  317. return 0;
  318. if (ossl_quic_sstream_get_cur_size(qss) == 0)
  319. return 1;
  320. if (ossl_list_uint_set_num(&qss->acked_set) != 1)
  321. return 0;
  322. r = ossl_list_uint_set_head(&qss->acked_set)->range;
  323. cur_size = qss->ring_buf.head_offset;
  324. /*
  325. * The invariants of UINT_SET guarantee a single list element if we have a
  326. * single contiguous range, which is what we should have if everything has
  327. * been acked.
  328. */
  329. assert(r.end + 1 <= cur_size);
  330. return r.start == 0 && r.end + 1 == cur_size;
  331. }
  332. void ossl_quic_sstream_adjust_iov(size_t len,
  333. OSSL_QTX_IOVEC *iov,
  334. size_t num_iov)
  335. {
  336. size_t running = 0, i, iovlen;
  337. for (i = 0, running = 0; i < num_iov; ++i) {
  338. iovlen = iov[i].buf_len;
  339. if (running >= len)
  340. iov[i].buf_len = 0;
  341. else if (running + iovlen > len)
  342. iov[i].buf_len = len - running;
  343. running += iovlen;
  344. }
  345. }
  346. void ossl_quic_sstream_set_cleanse(QUIC_SSTREAM *qss, int cleanse)
  347. {
  348. qss->cleanse = cleanse;
  349. }