123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737 |
- /* Copyright StrongLoop, Inc. All rights reserved.
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to
- * deal in the Software without restriction, including without limitation the
- * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
- * sell copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
- * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
- * IN THE SOFTWARE.
- */
- #include "defs.h"
- #include <errno.h>
- #include <stdlib.h>
- #include <string.h>
- /* A connection is modeled as an abstraction on top of two simple state
- * machines, one for reading and one for writing. Either state machine
- * is, when active, in one of three states: busy, done or stop; the fourth
- * and final state, dead, is an end state and only relevant when shutting
- * down the connection. A short overview:
- *
- * busy done stop
- * ----------|---------------------------|--------------------|------|
- * readable | waiting for incoming data | have incoming data | idle |
- * writable | busy writing out data | completed write | idle |
- *
- * We could remove the done state from the writable state machine. For our
- * purposes, it's functionally equivalent to the stop state.
- *
- * When the connection with upstream has been established, the client_ctx
- * moves into a state where incoming data from the client is sent upstream
- * and vice versa, incoming data from upstream is sent to the client. In
- * other words, we're just piping data back and forth. See conn_cycle()
- * for details.
- *
- * An interesting deviation from libuv's I/O model is that reads are discrete
- * rather than continuous events. In layman's terms, when a read operation
- * completes, the connection stops reading until further notice.
- *
- * The rationale for this approach is that we have to wait until the data
- * has been sent out again before we can reuse the read buffer.
- *
- * It also pleasingly unifies with the request model that libuv uses for
- * writes and everything else; libuv may switch to a request model for
- * reads in the future.
- */
- enum conn_state {
- c_busy, /* Busy; waiting for incoming data or for a write to complete. */
- c_done, /* Done; read incoming data or write finished. */
- c_stop, /* Stopped. */
- c_dead
- };
- /* Session states. */
- enum sess_state {
- s_handshake, /* Wait for client handshake. */
- s_handshake_auth, /* Wait for client authentication data. */
- s_req_start, /* Start waiting for request data. */
- s_req_parse, /* Wait for request data. */
- s_req_lookup, /* Wait for upstream hostname DNS lookup to complete. */
- s_req_connect, /* Wait for uv_tcp_connect() to complete. */
- s_proxy_start, /* Connected. Start piping data. */
- s_proxy, /* Connected. Pipe data back and forth. */
- s_kill, /* Tear down session. */
- s_almost_dead_0, /* Waiting for finalizers to complete. */
- s_almost_dead_1, /* Waiting for finalizers to complete. */
- s_almost_dead_2, /* Waiting for finalizers to complete. */
- s_almost_dead_3, /* Waiting for finalizers to complete. */
- s_almost_dead_4, /* Waiting for finalizers to complete. */
- s_dead /* Dead. Safe to free now. */
- };
- static void do_next(client_ctx *cx);
- static int do_handshake(client_ctx *cx);
- static int do_handshake_auth(client_ctx *cx);
- static int do_req_start(client_ctx *cx);
- static int do_req_parse(client_ctx *cx);
- static int do_req_lookup(client_ctx *cx);
- static int do_req_connect_start(client_ctx *cx);
- static int do_req_connect(client_ctx *cx);
- static int do_proxy_start(client_ctx *cx);
- static int do_proxy(client_ctx *cx);
- static int do_kill(client_ctx *cx);
- static int do_almost_dead(client_ctx *cx);
- static int conn_cycle(const char *who, conn *a, conn *b);
- static void conn_timer_reset(conn *c);
- static void conn_timer_expire(uv_timer_t *handle, int status);
- static void conn_getaddrinfo(conn *c, const char *hostname);
- static void conn_getaddrinfo_done(uv_getaddrinfo_t *req,
- int status,
- struct addrinfo *ai);
- static int conn_connect(conn *c);
- static void conn_connect_done(uv_connect_t *req, int status);
- static void conn_read(conn *c);
- static void conn_read_done(uv_stream_t *handle,
- ssize_t nread,
- const uv_buf_t *buf);
- static void conn_alloc(uv_handle_t *handle, size_t size, uv_buf_t *buf);
- static void conn_write(conn *c, const void *data, unsigned int len);
- static void conn_write_done(uv_write_t *req, int status);
- static void conn_close(conn *c);
- static void conn_close_done(uv_handle_t *handle);
- /* |incoming| has been initialized by server.c when this is called. */
- void client_finish_init(server_ctx *sx, client_ctx *cx) {
- conn *incoming;
- conn *outgoing;
- cx->sx = sx;
- cx->state = s_handshake;
- s5_init(&cx->parser);
- incoming = &cx->incoming;
- incoming->client = cx;
- incoming->result = 0;
- incoming->rdstate = c_stop;
- incoming->wrstate = c_stop;
- incoming->idle_timeout = sx->idle_timeout;
- CHECK(0 == uv_timer_init(sx->loop, &incoming->timer_handle));
- outgoing = &cx->outgoing;
- outgoing->client = cx;
- outgoing->result = 0;
- outgoing->rdstate = c_stop;
- outgoing->wrstate = c_stop;
- outgoing->idle_timeout = sx->idle_timeout;
- CHECK(0 == uv_tcp_init(cx->sx->loop, &outgoing->handle.tcp));
- CHECK(0 == uv_timer_init(cx->sx->loop, &outgoing->timer_handle));
- /* Wait for the initial packet. */
- conn_read(incoming);
- }
- /* This is the core state machine that drives the client <-> upstream proxy.
- * We move through the initial handshake and authentication steps first and
- * end up (if all goes well) in the proxy state where we're just proxying
- * data between the client and upstream.
- */
- static void do_next(client_ctx *cx) {
- int new_state;
- ASSERT(cx->state != s_dead);
- switch (cx->state) {
- case s_handshake:
- new_state = do_handshake(cx);
- break;
- case s_handshake_auth:
- new_state = do_handshake_auth(cx);
- break;
- case s_req_start:
- new_state = do_req_start(cx);
- break;
- case s_req_parse:
- new_state = do_req_parse(cx);
- break;
- case s_req_lookup:
- new_state = do_req_lookup(cx);
- break;
- case s_req_connect:
- new_state = do_req_connect(cx);
- break;
- case s_proxy_start:
- new_state = do_proxy_start(cx);
- break;
- case s_proxy:
- new_state = do_proxy(cx);
- break;
- case s_kill:
- new_state = do_kill(cx);
- break;
- case s_almost_dead_0:
- case s_almost_dead_1:
- case s_almost_dead_2:
- case s_almost_dead_3:
- case s_almost_dead_4:
- new_state = do_almost_dead(cx);
- break;
- default:
- UNREACHABLE();
- }
- cx->state = new_state;
- if (cx->state == s_dead) {
- if (DEBUG_CHECKS) {
- memset(cx, -1, sizeof(*cx));
- }
- free(cx);
- }
- }
- static int do_handshake(client_ctx *cx) {
- unsigned int methods;
- conn *incoming;
- s5_ctx *parser;
- uint8_t *data;
- size_t size;
- int err;
- parser = &cx->parser;
- incoming = &cx->incoming;
- ASSERT(incoming->rdstate == c_done);
- ASSERT(incoming->wrstate == c_stop);
- incoming->rdstate = c_stop;
- if (incoming->result < 0) {
- pr_err("read error: %s", uv_strerror(incoming->result));
- return do_kill(cx);
- }
- data = (uint8_t *) incoming->t.buf;
- size = (size_t) incoming->result;
- err = s5_parse(parser, &data, &size);
- if (err == s5_ok) {
- conn_read(incoming);
- return s_handshake; /* Need more data. */
- }
- if (size != 0) {
- /* Could allow a round-trip saving shortcut here if the requested auth
- * method is S5_AUTH_NONE (provided unauthenticated traffic is allowed.)
- * Requires client support however.
- */
- pr_err("junk in handshake");
- return do_kill(cx);
- }
- if (err != s5_auth_select) {
- pr_err("handshake error: %s", s5_strerror(err));
- return do_kill(cx);
- }
- methods = s5_auth_methods(parser);
- if ((methods & S5_AUTH_NONE) && can_auth_none(cx->sx, cx)) {
- s5_select_auth(parser, S5_AUTH_NONE);
- conn_write(incoming, "\5\0", 2); /* No auth required. */
- return s_req_start;
- }
- if ((methods & S5_AUTH_PASSWD) && can_auth_passwd(cx->sx, cx)) {
- /* TODO(bnoordhuis) Implement username/password auth. */
- }
- conn_write(incoming, "\5\377", 2); /* No acceptable auth. */
- return s_kill;
- }
- /* TODO(bnoordhuis) Implement username/password auth. */
- static int do_handshake_auth(client_ctx *cx) {
- UNREACHABLE();
- return do_kill(cx);
- }
- static int do_req_start(client_ctx *cx) {
- conn *incoming;
- incoming = &cx->incoming;
- ASSERT(incoming->rdstate == c_stop);
- ASSERT(incoming->wrstate == c_done);
- incoming->wrstate = c_stop;
- if (incoming->result < 0) {
- pr_err("write error: %s", uv_strerror(incoming->result));
- return do_kill(cx);
- }
- conn_read(incoming);
- return s_req_parse;
- }
- static int do_req_parse(client_ctx *cx) {
- conn *incoming;
- conn *outgoing;
- s5_ctx *parser;
- uint8_t *data;
- size_t size;
- int err;
- parser = &cx->parser;
- incoming = &cx->incoming;
- outgoing = &cx->outgoing;
- ASSERT(incoming->rdstate == c_done);
- ASSERT(incoming->wrstate == c_stop);
- ASSERT(outgoing->rdstate == c_stop);
- ASSERT(outgoing->wrstate == c_stop);
- incoming->rdstate = c_stop;
- if (incoming->result < 0) {
- pr_err("read error: %s", uv_strerror(incoming->result));
- return do_kill(cx);
- }
- data = (uint8_t *) incoming->t.buf;
- size = (size_t) incoming->result;
- err = s5_parse(parser, &data, &size);
- if (err == s5_ok) {
- conn_read(incoming);
- return s_req_parse; /* Need more data. */
- }
- if (size != 0) {
- pr_err("junk in request %u", (unsigned) size);
- return do_kill(cx);
- }
- if (err != s5_exec_cmd) {
- pr_err("request error: %s", s5_strerror(err));
- return do_kill(cx);
- }
- if (parser->cmd == s5_cmd_tcp_bind) {
- /* Not supported but relatively straightforward to implement. */
- pr_warn("BIND requests are not supported.");
- return do_kill(cx);
- }
- if (parser->cmd == s5_cmd_udp_assoc) {
- /* Not supported. Might be hard to implement because libuv has no
- * functionality for detecting the MTU size which the RFC mandates.
- */
- pr_warn("UDP ASSOC requests are not supported.");
- return do_kill(cx);
- }
- ASSERT(parser->cmd == s5_cmd_tcp_connect);
- if (parser->atyp == s5_atyp_host) {
- conn_getaddrinfo(outgoing, (const char *) parser->daddr);
- return s_req_lookup;
- }
- if (parser->atyp == s5_atyp_ipv4) {
- memset(&outgoing->t.addr4, 0, sizeof(outgoing->t.addr4));
- outgoing->t.addr4.sin_family = AF_INET;
- outgoing->t.addr4.sin_port = htons(parser->dport);
- memcpy(&outgoing->t.addr4.sin_addr,
- parser->daddr,
- sizeof(outgoing->t.addr4.sin_addr));
- } else if (parser->atyp == s5_atyp_ipv6) {
- memset(&outgoing->t.addr6, 0, sizeof(outgoing->t.addr6));
- outgoing->t.addr6.sin6_family = AF_INET6;
- outgoing->t.addr6.sin6_port = htons(parser->dport);
- memcpy(&outgoing->t.addr6.sin6_addr,
- parser->daddr,
- sizeof(outgoing->t.addr6.sin6_addr));
- } else {
- UNREACHABLE();
- }
- return do_req_connect_start(cx);
- }
- static int do_req_lookup(client_ctx *cx) {
- s5_ctx *parser;
- conn *incoming;
- conn *outgoing;
- parser = &cx->parser;
- incoming = &cx->incoming;
- outgoing = &cx->outgoing;
- ASSERT(incoming->rdstate == c_stop);
- ASSERT(incoming->wrstate == c_stop);
- ASSERT(outgoing->rdstate == c_stop);
- ASSERT(outgoing->wrstate == c_stop);
- if (outgoing->result < 0) {
- /* TODO(bnoordhuis) Escape control characters in parser->daddr. */
- pr_err("lookup error for \"%s\": %s",
- parser->daddr,
- uv_strerror(outgoing->result));
- /* Send back a 'Host unreachable' reply. */
- conn_write(incoming, "\5\4\0\1\0\0\0\0\0\0", 10);
- return s_kill;
- }
- /* Don't make assumptions about the offset of sin_port/sin6_port. */
- switch (outgoing->t.addr.sa_family) {
- case AF_INET:
- outgoing->t.addr4.sin_port = htons(parser->dport);
- break;
- case AF_INET6:
- outgoing->t.addr6.sin6_port = htons(parser->dport);
- break;
- default:
- UNREACHABLE();
- }
- return do_req_connect_start(cx);
- }
- /* Assumes that cx->outgoing.t.sa contains a valid AF_INET/AF_INET6 address. */
- static int do_req_connect_start(client_ctx *cx) {
- conn *incoming;
- conn *outgoing;
- int err;
- incoming = &cx->incoming;
- outgoing = &cx->outgoing;
- ASSERT(incoming->rdstate == c_stop);
- ASSERT(incoming->wrstate == c_stop);
- ASSERT(outgoing->rdstate == c_stop);
- ASSERT(outgoing->wrstate == c_stop);
- if (!can_access(cx->sx, cx, &outgoing->t.addr)) {
- pr_warn("connection not allowed by ruleset");
- /* Send a 'Connection not allowed by ruleset' reply. */
- conn_write(incoming, "\5\2\0\1\0\0\0\0\0\0", 10);
- return s_kill;
- }
- err = conn_connect(outgoing);
- if (err != 0) {
- pr_err("connect error: %s\n", uv_strerror(err));
- return do_kill(cx);
- }
- return s_req_connect;
- }
- static int do_req_connect(client_ctx *cx) {
- const struct sockaddr_in6 *in6;
- const struct sockaddr_in *in;
- char addr_storage[sizeof(*in6)];
- conn *incoming;
- conn *outgoing;
- uint8_t *buf;
- int addrlen;
- incoming = &cx->incoming;
- outgoing = &cx->outgoing;
- ASSERT(incoming->rdstate == c_stop);
- ASSERT(incoming->wrstate == c_stop);
- ASSERT(outgoing->rdstate == c_stop);
- ASSERT(outgoing->wrstate == c_stop);
- /* Build and send the reply. Not very pretty but gets the job done. */
- buf = (uint8_t *) incoming->t.buf;
- if (outgoing->result == 0) {
- /* The RFC mandates that the SOCKS server must include the local port
- * and address in the reply. So that's what we do.
- */
- addrlen = sizeof(addr_storage);
- CHECK(0 == uv_tcp_getsockname(&outgoing->handle.tcp,
- (struct sockaddr *) addr_storage,
- &addrlen));
- buf[0] = 5; /* Version. */
- buf[1] = 0; /* Success. */
- buf[2] = 0; /* Reserved. */
- if (addrlen == sizeof(*in)) {
- buf[3] = 1; /* IPv4. */
- in = (const struct sockaddr_in *) &addr_storage;
- memcpy(buf + 4, &in->sin_addr, 4);
- memcpy(buf + 8, &in->sin_port, 2);
- conn_write(incoming, buf, 10);
- } else if (addrlen == sizeof(*in6)) {
- buf[3] = 4; /* IPv6. */
- in6 = (const struct sockaddr_in6 *) &addr_storage;
- memcpy(buf + 4, &in6->sin6_addr, 16);
- memcpy(buf + 20, &in6->sin6_port, 2);
- conn_write(incoming, buf, 22);
- } else {
- UNREACHABLE();
- }
- return s_proxy_start;
- } else {
- pr_err("upstream connection error: %s\n", uv_strerror(outgoing->result));
- /* Send a 'Connection refused' reply. */
- conn_write(incoming, "\5\5\0\1\0\0\0\0\0\0", 10);
- return s_kill;
- }
- UNREACHABLE();
- return s_kill;
- }
- static int do_proxy_start(client_ctx *cx) {
- conn *incoming;
- conn *outgoing;
- incoming = &cx->incoming;
- outgoing = &cx->outgoing;
- ASSERT(incoming->rdstate == c_stop);
- ASSERT(incoming->wrstate == c_done);
- ASSERT(outgoing->rdstate == c_stop);
- ASSERT(outgoing->wrstate == c_stop);
- incoming->wrstate = c_stop;
- if (incoming->result < 0) {
- pr_err("write error: %s", uv_strerror(incoming->result));
- return do_kill(cx);
- }
- conn_read(incoming);
- conn_read(outgoing);
- return s_proxy;
- }
- /* Proxy incoming data back and forth. */
- static int do_proxy(client_ctx *cx) {
- if (conn_cycle("client", &cx->incoming, &cx->outgoing)) {
- return do_kill(cx);
- }
- if (conn_cycle("upstream", &cx->outgoing, &cx->incoming)) {
- return do_kill(cx);
- }
- return s_proxy;
- }
- static int do_kill(client_ctx *cx) {
- int new_state;
- if (cx->state >= s_almost_dead_0) {
- return cx->state;
- }
- /* Try to cancel the request. The callback still runs but if the
- * cancellation succeeded, it gets called with status=UV_ECANCELED.
- */
- new_state = s_almost_dead_1;
- if (cx->state == s_req_lookup) {
- new_state = s_almost_dead_0;
- uv_cancel(&cx->outgoing.t.req);
- }
- conn_close(&cx->incoming);
- conn_close(&cx->outgoing);
- return new_state;
- }
- static int do_almost_dead(client_ctx *cx) {
- ASSERT(cx->state >= s_almost_dead_0);
- return cx->state + 1; /* Another finalizer completed. */
- }
- static int conn_cycle(const char *who, conn *a, conn *b) {
- if (a->result < 0) {
- if (a->result != UV_EOF) {
- pr_err("%s error: %s", who, uv_strerror(a->result));
- }
- return -1;
- }
- if (b->result < 0) {
- return -1;
- }
- if (a->wrstate == c_done) {
- a->wrstate = c_stop;
- }
- /* The logic is as follows: read when we don't write and write when we don't
- * read. That gives us back-pressure handling for free because if the peer
- * sends data faster than we consume it, TCP congestion control kicks in.
- */
- if (a->wrstate == c_stop) {
- if (b->rdstate == c_stop) {
- conn_read(b);
- } else if (b->rdstate == c_done) {
- conn_write(a, b->t.buf, b->result);
- b->rdstate = c_stop; /* Triggers the call to conn_read() above. */
- }
- }
- return 0;
- }
- static void conn_timer_reset(conn *c) {
- CHECK(0 == uv_timer_start(&c->timer_handle,
- conn_timer_expire,
- c->idle_timeout,
- 0));
- }
- static void conn_timer_expire(uv_timer_t *handle, int status) {
- conn *c;
- CHECK(0 == status);
- c = CONTAINER_OF(handle, conn, timer_handle);
- c->result = UV_ETIMEDOUT;
- do_next(c->client);
- }
- static void conn_getaddrinfo(conn *c, const char *hostname) {
- struct addrinfo hints;
- memset(&hints, 0, sizeof(hints));
- hints.ai_family = AF_UNSPEC;
- hints.ai_socktype = SOCK_STREAM;
- hints.ai_protocol = IPPROTO_TCP;
- CHECK(0 == uv_getaddrinfo(c->client->sx->loop,
- &c->t.addrinfo_req,
- conn_getaddrinfo_done,
- hostname,
- NULL,
- &hints));
- conn_timer_reset(c);
- }
- static void conn_getaddrinfo_done(uv_getaddrinfo_t *req,
- int status,
- struct addrinfo *ai) {
- conn *c;
- c = CONTAINER_OF(req, conn, t.addrinfo_req);
- c->result = status;
- if (status == 0) {
- /* FIXME(bnoordhuis) Should try all addresses. */
- if (ai->ai_family == AF_INET) {
- c->t.addr4 = *(const struct sockaddr_in *) ai->ai_addr;
- } else if (ai->ai_family == AF_INET6) {
- c->t.addr6 = *(const struct sockaddr_in6 *) ai->ai_addr;
- } else {
- UNREACHABLE();
- }
- }
- uv_freeaddrinfo(ai);
- do_next(c->client);
- }
- /* Assumes that c->t.sa contains a valid AF_INET or AF_INET6 address. */
- static int conn_connect(conn *c) {
- ASSERT(c->t.addr.sa_family == AF_INET ||
- c->t.addr.sa_family == AF_INET6);
- conn_timer_reset(c);
- return uv_tcp_connect(&c->t.connect_req,
- &c->handle.tcp,
- &c->t.addr,
- conn_connect_done);
- }
- static void conn_connect_done(uv_connect_t *req, int status) {
- conn *c;
- if (status == UV_ECANCELED) {
- return; /* Handle has been closed. */
- }
- c = CONTAINER_OF(req, conn, t.connect_req);
- c->result = status;
- do_next(c->client);
- }
- static void conn_read(conn *c) {
- ASSERT(c->rdstate == c_stop);
- CHECK(0 == uv_read_start(&c->handle.stream, conn_alloc, conn_read_done));
- c->rdstate = c_busy;
- conn_timer_reset(c);
- }
- static void conn_read_done(uv_stream_t *handle,
- ssize_t nread,
- const uv_buf_t *buf) {
- conn *c;
- c = CONTAINER_OF(handle, conn, handle);
- ASSERT(c->t.buf == buf->base);
- ASSERT(c->rdstate == c_busy);
- c->rdstate = c_done;
- c->result = nread;
- uv_read_stop(&c->handle.stream);
- do_next(c->client);
- }
- static void conn_alloc(uv_handle_t *handle, size_t size, uv_buf_t *buf) {
- conn *c;
- c = CONTAINER_OF(handle, conn, handle);
- ASSERT(c->rdstate == c_busy);
- buf->base = c->t.buf;
- buf->len = sizeof(c->t.buf);
- }
- static void conn_write(conn *c, const void *data, unsigned int len) {
- uv_buf_t buf;
- ASSERT(c->wrstate == c_stop || c->wrstate == c_done);
- c->wrstate = c_busy;
- /* It's okay to cast away constness here, uv_write() won't modify the
- * memory.
- */
- buf.base = (char *) data;
- buf.len = len;
- CHECK(0 == uv_write(&c->write_req,
- &c->handle.stream,
- &buf,
- 1,
- conn_write_done));
- conn_timer_reset(c);
- }
- static void conn_write_done(uv_write_t *req, int status) {
- conn *c;
- if (status == UV_ECANCELED) {
- return; /* Handle has been closed. */
- }
- c = CONTAINER_OF(req, conn, write_req);
- ASSERT(c->wrstate == c_busy);
- c->wrstate = c_done;
- c->result = status;
- do_next(c->client);
- }
- static void conn_close(conn *c) {
- ASSERT(c->rdstate != c_dead);
- ASSERT(c->wrstate != c_dead);
- c->rdstate = c_dead;
- c->wrstate = c_dead;
- c->timer_handle.data = c;
- c->handle.handle.data = c;
- uv_close(&c->handle.handle, conn_close_done);
- uv_close((uv_handle_t *) &c->timer_handle, conn_close_done);
- }
- static void conn_close_done(uv_handle_t *handle) {
- conn *c;
- c = handle->data;
- do_next(c->client);
- }
|