|
@@ -134,6 +134,7 @@ struct cf_h2_ctx {
|
|
|
BIT(conn_closed);
|
|
|
BIT(goaway);
|
|
|
BIT(enable_push);
|
|
|
+ BIT(nw_out_blocked);
|
|
|
};
|
|
|
|
|
|
/* How to access `call_data` from a cf_h2 filter */
|
|
@@ -176,6 +177,7 @@ struct stream_ctx {
|
|
|
struct bufq sendbuf; /* request buffer */
|
|
|
struct dynhds resp_trailers; /* response trailer fields */
|
|
|
size_t resp_hds_len; /* amount of response header bytes in recvbuf */
|
|
|
+ size_t upload_blocked_len;
|
|
|
curl_off_t upload_left; /* number of request bytes left to upload */
|
|
|
|
|
|
char **push_headers; /* allocated array */
|
|
@@ -211,9 +213,12 @@ static void drain_stream(struct Curl_cfilter *cf,
|
|
|
|
|
|
(void)cf;
|
|
|
bits = CURL_CSELECT_IN;
|
|
|
- if(!stream->send_closed && stream->upload_left)
|
|
|
+ if(!stream->send_closed &&
|
|
|
+ (stream->upload_left || stream->upload_blocked_len))
|
|
|
bits |= CURL_CSELECT_OUT;
|
|
|
if(data->state.dselect_bits != bits) {
|
|
|
+ DEBUGF(LOG_CF(data, cf, "[h2sid=%d] DRAIN dselect_bits=%x",
|
|
|
+ stream->id, bits));
|
|
|
data->state.dselect_bits = bits;
|
|
|
Curl_expire(data, 0, EXPIRE_RUN_NOW);
|
|
|
}
|
|
@@ -647,13 +652,17 @@ static CURLcode nw_out_flush(struct Curl_cfilter *cf,
|
|
|
if(Curl_bufq_is_empty(&ctx->outbufq))
|
|
|
return CURLE_OK;
|
|
|
|
|
|
- DEBUGF(LOG_CF(data, cf, "h2 conn flush %zu bytes",
|
|
|
- Curl_bufq_len(&ctx->outbufq)));
|
|
|
nwritten = Curl_bufq_pass(&ctx->outbufq, nw_out_writer, cf, &result);
|
|
|
- if(nwritten < 0 && result != CURLE_AGAIN) {
|
|
|
+ if(nwritten < 0) {
|
|
|
+ if(result == CURLE_AGAIN) {
|
|
|
+ DEBUGF(LOG_CF(data, cf, "flush nw send buffer(%zu) -> EAGAIN",
|
|
|
+ Curl_bufq_len(&ctx->outbufq)));
|
|
|
+ ctx->nw_out_blocked = 1;
|
|
|
+ }
|
|
|
return result;
|
|
|
}
|
|
|
- return CURLE_OK;
|
|
|
+ DEBUGF(LOG_CF(data, cf, "nw send buffer flushed"));
|
|
|
+ return Curl_bufq_is_empty(&ctx->outbufq)? CURLE_OK: CURLE_AGAIN;
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -679,15 +688,17 @@ static ssize_t send_callback(nghttp2_session *h2,
|
|
|
nw_out_writer, cf, &result);
|
|
|
if(nwritten < 0) {
|
|
|
if(result == CURLE_AGAIN) {
|
|
|
+ ctx->nw_out_blocked = 1;
|
|
|
return NGHTTP2_ERR_WOULDBLOCK;
|
|
|
}
|
|
|
failf(data, "Failed sending HTTP2 data");
|
|
|
return NGHTTP2_ERR_CALLBACK_FAILURE;
|
|
|
}
|
|
|
|
|
|
- if(!nwritten)
|
|
|
+ if(!nwritten) {
|
|
|
+ ctx->nw_out_blocked = 1;
|
|
|
return NGHTTP2_ERR_WOULDBLOCK;
|
|
|
-
|
|
|
+ }
|
|
|
return nwritten;
|
|
|
}
|
|
|
|
|
@@ -1693,7 +1704,8 @@ static CURLcode h2_progress_egress(struct Curl_cfilter *cf,
|
|
|
goto out;
|
|
|
}
|
|
|
|
|
|
- while(!rv && nghttp2_session_want_write(ctx->h2))
|
|
|
+ ctx->nw_out_blocked = 0;
|
|
|
+ while(!rv && !ctx->nw_out_blocked && nghttp2_session_want_write(ctx->h2))
|
|
|
rv = nghttp2_session_send(ctx->h2);
|
|
|
|
|
|
out:
|
|
@@ -1854,7 +1866,7 @@ static ssize_t cf_h2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
|
|
|
|
|
|
out:
|
|
|
result = h2_progress_egress(cf, data);
|
|
|
- if(result) {
|
|
|
+ if(result && result != CURLE_AGAIN) {
|
|
|
*err = result;
|
|
|
nread = -1;
|
|
|
}
|
|
@@ -2012,17 +2024,13 @@ out:
|
|
|
static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
|
|
|
const void *buf, size_t len, CURLcode *err)
|
|
|
{
|
|
|
- /*
|
|
|
- * Currently, we send request in this function, but this function is also
|
|
|
- * used to send request body. It would be nice to add dedicated function for
|
|
|
- * request.
|
|
|
- */
|
|
|
struct cf_h2_ctx *ctx = cf->ctx;
|
|
|
struct stream_ctx *stream = H2_STREAM_CTX(data);
|
|
|
struct cf_call_data save;
|
|
|
int rv;
|
|
|
ssize_t nwritten;
|
|
|
CURLcode result;
|
|
|
+ int blocked = 0;
|
|
|
|
|
|
CF_DATA_SAVE(save, cf, data);
|
|
|
|
|
@@ -2037,18 +2045,34 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
|
|
|
nwritten = http2_handle_stream_close(cf, data, stream, err);
|
|
|
goto out;
|
|
|
}
|
|
|
- /* If stream_id != -1, we have dispatched request HEADERS, and now
|
|
|
- are going to send or sending request body in DATA frame */
|
|
|
- nwritten = Curl_bufq_write(&stream->sendbuf, buf, len, err);
|
|
|
- if(nwritten < 0) {
|
|
|
- if(*err != CURLE_AGAIN)
|
|
|
+ else if(stream->upload_blocked_len) {
|
|
|
+ /* the data in `buf` has alread been submitted or added to the
|
|
|
+ * buffers, but have been EAGAINed on the last invocation. */
|
|
|
+ DEBUGASSERT(len >= stream->upload_blocked_len);
|
|
|
+ if(len < stream->upload_blocked_len) {
|
|
|
+ /* Did we get called again with a smaller `len`? This should not
|
|
|
+ * happend. We are not prepared to handle that. */
|
|
|
+ failf(data, "HTTP/2 send again with decreased length");
|
|
|
+ *err = CURLE_HTTP2;
|
|
|
+ nwritten = -1;
|
|
|
goto out;
|
|
|
- nwritten = 0;
|
|
|
+ }
|
|
|
+ nwritten = (ssize_t)stream->upload_blocked_len;
|
|
|
+ stream->upload_blocked_len = 0;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ /* If stream_id != -1, we have dispatched request HEADERS, and now
|
|
|
+ are going to send or sending request body in DATA frame */
|
|
|
+ nwritten = Curl_bufq_write(&stream->sendbuf, buf, len, err);
|
|
|
+ if(nwritten < 0) {
|
|
|
+ if(*err != CURLE_AGAIN)
|
|
|
+ goto out;
|
|
|
+ nwritten = 0;
|
|
|
+ }
|
|
|
}
|
|
|
- DEBUGF(LOG_CF(data, cf, "[h2sid=%u] bufq_write(len=%zu) -> %zd, %d",
|
|
|
- stream->id, len, nwritten, *err));
|
|
|
|
|
|
if(!Curl_bufq_is_empty(&stream->sendbuf)) {
|
|
|
+ /* req body data is buffered, resume the potentially suspended stream */
|
|
|
rv = nghttp2_session_resume_data(ctx->h2, stream->id);
|
|
|
if(nghttp2_is_fatal(rv)) {
|
|
|
*err = CURLE_SEND_ERROR;
|
|
@@ -2056,105 +2080,93 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
|
|
|
goto out;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- result = h2_progress_ingress(cf, data);
|
|
|
- if(result) {
|
|
|
- *err = result;
|
|
|
- nwritten = -1;
|
|
|
- goto out;
|
|
|
- }
|
|
|
-
|
|
|
- result = h2_progress_egress(cf, data);
|
|
|
- if(result) {
|
|
|
- *err = result;
|
|
|
- nwritten = -1;
|
|
|
- goto out;
|
|
|
- }
|
|
|
-
|
|
|
- if(should_close_session(ctx)) {
|
|
|
- if(stream->closed) {
|
|
|
- nwritten = http2_handle_stream_close(cf, data, stream, err);
|
|
|
- }
|
|
|
- else {
|
|
|
- DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session"));
|
|
|
- *err = CURLE_HTTP2;
|
|
|
- nwritten = -1;
|
|
|
- }
|
|
|
- goto out;
|
|
|
- }
|
|
|
-
|
|
|
- if(!nwritten) {
|
|
|
- size_t rwin = nghttp2_session_get_stream_remote_window_size(ctx->h2,
|
|
|
- stream->id);
|
|
|
- DEBUGF(LOG_CF(data, cf, "[h2sid=%d] cf_send: win %u/%zu",
|
|
|
- stream->id,
|
|
|
- nghttp2_session_get_remote_window_size(ctx->h2), rwin));
|
|
|
- if(rwin == 0) {
|
|
|
- /* We cannot upload more as the stream's remote window size
|
|
|
- * is 0. We need to receive WIN_UPDATEs before we can continue.
|
|
|
- */
|
|
|
- data->req.keepon |= KEEP_SEND_HOLD;
|
|
|
- DEBUGF(LOG_CF(data, cf, "[h2sid=%d] holding send as remote flow "
|
|
|
- "window is exhausted", stream->id));
|
|
|
- }
|
|
|
- nwritten = -1;
|
|
|
- *err = CURLE_AGAIN;
|
|
|
- }
|
|
|
- /* handled writing BODY for open stream. */
|
|
|
- goto out;
|
|
|
}
|
|
|
else {
|
|
|
nwritten = h2_submit(&stream, cf, data, buf, len, err);
|
|
|
if(nwritten < 0) {
|
|
|
goto out;
|
|
|
}
|
|
|
+ DEBUGASSERT(stream);
|
|
|
+ }
|
|
|
|
|
|
- result = h2_progress_ingress(cf, data);
|
|
|
- if(result) {
|
|
|
- *err = result;
|
|
|
- nwritten = -1;
|
|
|
- goto out;
|
|
|
+ /* Call the nghttp2 send loop and flush to write ALL buffered data,
|
|
|
+ * headers and/or request body completely out to the network */
|
|
|
+ result = h2_progress_egress(cf, data);
|
|
|
+ if(result == CURLE_AGAIN) {
|
|
|
+ blocked = 1;
|
|
|
+ }
|
|
|
+ else if(result) {
|
|
|
+ *err = result;
|
|
|
+ nwritten = -1;
|
|
|
+ goto out;
|
|
|
+ }
|
|
|
+ else if(!Curl_bufq_is_empty(&stream->sendbuf)) {
|
|
|
+ /* although we wrote everything that nghttp2 wants to send now,
|
|
|
+ * there is data left in our stream send buffer unwritten. This may
|
|
|
+ * be due to the stream's HTTP/2 flow window being exhausted. */
|
|
|
+ blocked = 1;
|
|
|
+ }
|
|
|
+
|
|
|
+ if(blocked) {
|
|
|
+ /* Unable to send all data, due to connection blocked or H2 window
|
|
|
+ * exhaustion. Data is left in our stream buffer, or nghttp2's internal
|
|
|
+ * frame buffer or our network out buffer. */
|
|
|
+ size_t rwin = nghttp2_session_get_stream_remote_window_size(ctx->h2,
|
|
|
+ stream->id);
|
|
|
+ if(rwin == 0) {
|
|
|
+ /* H2 flow window exhaustion. We need to HOLD upload until we get
|
|
|
+ * a WINDOW_UPDATE from the server. */
|
|
|
+ data->req.keepon |= KEEP_SEND_HOLD;
|
|
|
+ DEBUGF(LOG_CF(data, cf, "[h2sid=%d] holding send as remote flow "
|
|
|
+ "window is exhausted", stream->id));
|
|
|
+ }
|
|
|
+
|
|
|
+ /* Whatever the cause, we need to return CURL_EAGAIN for this call.
|
|
|
+ * We have unwritten state that needs us being invoked again and EAGAIN
|
|
|
+ * is the only way to ensure that. */
|
|
|
+ stream->upload_blocked_len = nwritten;
|
|
|
+ DEBUGF(LOG_CF(data, cf, "[h2sid=%d] cf_send(len=%zu) BLOCK: win %u/%zu "
|
|
|
+ "blocked_len=%zu",
|
|
|
+ stream->id, len,
|
|
|
+ nghttp2_session_get_remote_window_size(ctx->h2), rwin,
|
|
|
+ nwritten));
|
|
|
+ *err = CURLE_AGAIN;
|
|
|
+ nwritten = -1;
|
|
|
+ goto out;
|
|
|
+ }
|
|
|
+ else if(should_close_session(ctx)) {
|
|
|
+ /* nghttp2 thinks this session is done. If the stream has not been
|
|
|
+ * closed, this is an error state for out transfer */
|
|
|
+ if(stream->closed) {
|
|
|
+ nwritten = http2_handle_stream_close(cf, data, stream, err);
|
|
|
}
|
|
|
-
|
|
|
- result = h2_progress_egress(cf, data);
|
|
|
- if(result) {
|
|
|
- *err = result;
|
|
|
+ else {
|
|
|
+ DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session"));
|
|
|
+ *err = CURLE_HTTP2;
|
|
|
nwritten = -1;
|
|
|
- goto out;
|
|
|
- }
|
|
|
-
|
|
|
- if(should_close_session(ctx)) {
|
|
|
- if(stream->closed) {
|
|
|
- nwritten = http2_handle_stream_close(cf, data, stream, err);
|
|
|
- }
|
|
|
- else {
|
|
|
- DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session"));
|
|
|
- *err = CURLE_HTTP2;
|
|
|
- nwritten = -1;
|
|
|
- }
|
|
|
- goto out;
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
|
|
|
out:
|
|
|
if(stream) {
|
|
|
DEBUGF(LOG_CF(data, cf, "[h2sid=%d] cf_send(len=%zu) -> %zd, %d, "
|
|
|
- "buffered=%zu, upload_left=%zu, stream-window=%d, "
|
|
|
- "connection-window=%d",
|
|
|
+ "upload_left=%" CURL_FORMAT_CURL_OFF_T ", "
|
|
|
+ "h2 windows %d-%d (stream-conn), "
|
|
|
+ "buffers %zu-%zu (stream-conn)",
|
|
|
stream->id, len, nwritten, *err,
|
|
|
- Curl_bufq_len(&stream->sendbuf),
|
|
|
(ssize_t)stream->upload_left,
|
|
|
nghttp2_session_get_stream_remote_window_size(
|
|
|
ctx->h2, stream->id),
|
|
|
- nghttp2_session_get_remote_window_size(ctx->h2)));
|
|
|
- drain_stream(cf, data, stream);
|
|
|
+ nghttp2_session_get_remote_window_size(ctx->h2),
|
|
|
+ Curl_bufq_len(&stream->sendbuf),
|
|
|
+ Curl_bufq_len(&ctx->outbufq)));
|
|
|
}
|
|
|
else {
|
|
|
DEBUGF(LOG_CF(data, cf, "cf_send(len=%zu) -> %zd, %d, "
|
|
|
- "connection-window=%d",
|
|
|
+ "connection-window=%d, nw_send_buffer(%zu)",
|
|
|
len, nwritten, *err,
|
|
|
- nghttp2_session_get_remote_window_size(ctx->h2)));
|
|
|
+ nghttp2_session_get_remote_window_size(ctx->h2),
|
|
|
+ Curl_bufq_len(&ctx->outbufq)));
|
|
|
}
|
|
|
CF_DATA_RESTORE(cf, save);
|
|
|
return nwritten;
|
|
@@ -2273,7 +2285,6 @@ static CURLcode http2_data_pause(struct Curl_cfilter *cf,
|
|
|
DEBUGASSERT(data);
|
|
|
if(ctx && ctx->h2 && stream) {
|
|
|
uint32_t window = pause? 0 : stream->local_window_size;
|
|
|
- CURLcode result;
|
|
|
|
|
|
int rv = nghttp2_session_set_local_window_size(ctx->h2,
|
|
|
NGHTTP2_FLAG_NONE,
|
|
@@ -2288,10 +2299,8 @@ static CURLcode http2_data_pause(struct Curl_cfilter *cf,
|
|
|
if(!pause)
|
|
|
drain_stream(cf, data, stream);
|
|
|
|
|
|
- /* make sure the window update gets sent */
|
|
|
- result = h2_progress_egress(cf, data);
|
|
|
- if(result)
|
|
|
- return result;
|
|
|
+ /* attempt to send the window update */
|
|
|
+ (void)h2_progress_egress(cf, data);
|
|
|
|
|
|
if(!pause) {
|
|
|
/* Unpausing a h2 transfer, requires it to be run again. The server
|