client.c 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737
  1. /* Copyright StrongLoop, Inc. All rights reserved.
  2. *
  3. * Permission is hereby granted, free of charge, to any person obtaining a copy
  4. * of this software and associated documentation files (the "Software"), to
  5. * deal in the Software without restriction, including without limitation the
  6. * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  7. * sell copies of the Software, and to permit persons to whom the Software is
  8. * furnished to do so, subject to the following conditions:
  9. *
  10. * The above copyright notice and this permission notice shall be included in
  11. * all copies or substantial portions of the Software.
  12. *
  13. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  14. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  15. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  16. * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  17. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  18. * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  19. * IN THE SOFTWARE.
  20. */
  21. #include "defs.h"
  22. #include <errno.h>
  23. #include <stdlib.h>
  24. #include <string.h>
  25. /* A connection is modeled as an abstraction on top of two simple state
  26. * machines, one for reading and one for writing. Either state machine
  27. * is, when active, in one of three states: busy, done or stop; the fourth
  28. * and final state, dead, is an end state and only relevant when shutting
  29. * down the connection. A short overview:
  30. *
  31. * busy done stop
  32. * ----------|---------------------------|--------------------|------|
  33. * readable | waiting for incoming data | have incoming data | idle |
  34. * writable | busy writing out data | completed write | idle |
  35. *
  36. * We could remove the done state from the writable state machine. For our
  37. * purposes, it's functionally equivalent to the stop state.
  38. *
  39. * When the connection with upstream has been established, the client_ctx
  40. * moves into a state where incoming data from the client is sent upstream
  41. * and vice versa, incoming data from upstream is sent to the client. In
  42. * other words, we're just piping data back and forth. See conn_cycle()
  43. * for details.
  44. *
  45. * An interesting deviation from libuv's I/O model is that reads are discrete
  46. * rather than continuous events. In layman's terms, when a read operation
  47. * completes, the connection stops reading until further notice.
  48. *
  49. * The rationale for this approach is that we have to wait until the data
  50. * has been sent out again before we can reuse the read buffer.
  51. *
  52. * It also pleasingly unifies with the request model that libuv uses for
  53. * writes and everything else; libuv may switch to a request model for
  54. * reads in the future.
  55. */
  56. enum conn_state {
  57. c_busy, /* Busy; waiting for incoming data or for a write to complete. */
  58. c_done, /* Done; read incoming data or write finished. */
  59. c_stop, /* Stopped. */
  60. c_dead
  61. };
  62. /* Session states. */
  63. enum sess_state {
  64. s_handshake, /* Wait for client handshake. */
  65. s_handshake_auth, /* Wait for client authentication data. */
  66. s_req_start, /* Start waiting for request data. */
  67. s_req_parse, /* Wait for request data. */
  68. s_req_lookup, /* Wait for upstream hostname DNS lookup to complete. */
  69. s_req_connect, /* Wait for uv_tcp_connect() to complete. */
  70. s_proxy_start, /* Connected. Start piping data. */
  71. s_proxy, /* Connected. Pipe data back and forth. */
  72. s_kill, /* Tear down session. */
  73. s_almost_dead_0, /* Waiting for finalizers to complete. */
  74. s_almost_dead_1, /* Waiting for finalizers to complete. */
  75. s_almost_dead_2, /* Waiting for finalizers to complete. */
  76. s_almost_dead_3, /* Waiting for finalizers to complete. */
  77. s_almost_dead_4, /* Waiting for finalizers to complete. */
  78. s_dead /* Dead. Safe to free now. */
  79. };
  80. static void do_next(client_ctx *cx);
  81. static int do_handshake(client_ctx *cx);
  82. static int do_handshake_auth(client_ctx *cx);
  83. static int do_req_start(client_ctx *cx);
  84. static int do_req_parse(client_ctx *cx);
  85. static int do_req_lookup(client_ctx *cx);
  86. static int do_req_connect_start(client_ctx *cx);
  87. static int do_req_connect(client_ctx *cx);
  88. static int do_proxy_start(client_ctx *cx);
  89. static int do_proxy(client_ctx *cx);
  90. static int do_kill(client_ctx *cx);
  91. static int do_almost_dead(client_ctx *cx);
  92. static int conn_cycle(const char *who, conn *a, conn *b);
  93. static void conn_timer_reset(conn *c);
  94. static void conn_timer_expire(uv_timer_t *handle, int status);
  95. static void conn_getaddrinfo(conn *c, const char *hostname);
  96. static void conn_getaddrinfo_done(uv_getaddrinfo_t *req,
  97. int status,
  98. struct addrinfo *ai);
  99. static int conn_connect(conn *c);
  100. static void conn_connect_done(uv_connect_t *req, int status);
  101. static void conn_read(conn *c);
  102. static void conn_read_done(uv_stream_t *handle,
  103. ssize_t nread,
  104. const uv_buf_t *buf);
  105. static void conn_alloc(uv_handle_t *handle, size_t size, uv_buf_t *buf);
  106. static void conn_write(conn *c, const void *data, unsigned int len);
  107. static void conn_write_done(uv_write_t *req, int status);
  108. static void conn_close(conn *c);
  109. static void conn_close_done(uv_handle_t *handle);
  110. /* |incoming| has been initialized by server.c when this is called. */
  111. void client_finish_init(server_ctx *sx, client_ctx *cx) {
  112. conn *incoming;
  113. conn *outgoing;
  114. cx->sx = sx;
  115. cx->state = s_handshake;
  116. s5_init(&cx->parser);
  117. incoming = &cx->incoming;
  118. incoming->client = cx;
  119. incoming->result = 0;
  120. incoming->rdstate = c_stop;
  121. incoming->wrstate = c_stop;
  122. incoming->idle_timeout = sx->idle_timeout;
  123. CHECK(0 == uv_timer_init(sx->loop, &incoming->timer_handle));
  124. outgoing = &cx->outgoing;
  125. outgoing->client = cx;
  126. outgoing->result = 0;
  127. outgoing->rdstate = c_stop;
  128. outgoing->wrstate = c_stop;
  129. outgoing->idle_timeout = sx->idle_timeout;
  130. CHECK(0 == uv_tcp_init(cx->sx->loop, &outgoing->handle.tcp));
  131. CHECK(0 == uv_timer_init(cx->sx->loop, &outgoing->timer_handle));
  132. /* Wait for the initial packet. */
  133. conn_read(incoming);
  134. }
  135. /* This is the core state machine that drives the client <-> upstream proxy.
  136. * We move through the initial handshake and authentication steps first and
  137. * end up (if all goes well) in the proxy state where we're just proxying
  138. * data between the client and upstream.
  139. */
  140. static void do_next(client_ctx *cx) {
  141. int new_state;
  142. ASSERT(cx->state != s_dead);
  143. switch (cx->state) {
  144. case s_handshake:
  145. new_state = do_handshake(cx);
  146. break;
  147. case s_handshake_auth:
  148. new_state = do_handshake_auth(cx);
  149. break;
  150. case s_req_start:
  151. new_state = do_req_start(cx);
  152. break;
  153. case s_req_parse:
  154. new_state = do_req_parse(cx);
  155. break;
  156. case s_req_lookup:
  157. new_state = do_req_lookup(cx);
  158. break;
  159. case s_req_connect:
  160. new_state = do_req_connect(cx);
  161. break;
  162. case s_proxy_start:
  163. new_state = do_proxy_start(cx);
  164. break;
  165. case s_proxy:
  166. new_state = do_proxy(cx);
  167. break;
  168. case s_kill:
  169. new_state = do_kill(cx);
  170. break;
  171. case s_almost_dead_0:
  172. case s_almost_dead_1:
  173. case s_almost_dead_2:
  174. case s_almost_dead_3:
  175. case s_almost_dead_4:
  176. new_state = do_almost_dead(cx);
  177. break;
  178. default:
  179. UNREACHABLE();
  180. }
  181. cx->state = new_state;
  182. if (cx->state == s_dead) {
  183. if (DEBUG_CHECKS) {
  184. memset(cx, -1, sizeof(*cx));
  185. }
  186. free(cx);
  187. }
  188. }
  189. static int do_handshake(client_ctx *cx) {
  190. unsigned int methods;
  191. conn *incoming;
  192. s5_ctx *parser;
  193. uint8_t *data;
  194. size_t size;
  195. int err;
  196. parser = &cx->parser;
  197. incoming = &cx->incoming;
  198. ASSERT(incoming->rdstate == c_done);
  199. ASSERT(incoming->wrstate == c_stop);
  200. incoming->rdstate = c_stop;
  201. if (incoming->result < 0) {
  202. pr_err("read error: %s", uv_strerror(incoming->result));
  203. return do_kill(cx);
  204. }
  205. data = (uint8_t *) incoming->t.buf;
  206. size = (size_t) incoming->result;
  207. err = s5_parse(parser, &data, &size);
  208. if (err == s5_ok) {
  209. conn_read(incoming);
  210. return s_handshake; /* Need more data. */
  211. }
  212. if (size != 0) {
  213. /* Could allow a round-trip saving shortcut here if the requested auth
  214. * method is S5_AUTH_NONE (provided unauthenticated traffic is allowed.)
  215. * Requires client support however.
  216. */
  217. pr_err("junk in handshake");
  218. return do_kill(cx);
  219. }
  220. if (err != s5_auth_select) {
  221. pr_err("handshake error: %s", s5_strerror(err));
  222. return do_kill(cx);
  223. }
  224. methods = s5_auth_methods(parser);
  225. if ((methods & S5_AUTH_NONE) && can_auth_none(cx->sx, cx)) {
  226. s5_select_auth(parser, S5_AUTH_NONE);
  227. conn_write(incoming, "\5\0", 2); /* No auth required. */
  228. return s_req_start;
  229. }
  230. if ((methods & S5_AUTH_PASSWD) && can_auth_passwd(cx->sx, cx)) {
  231. /* TODO(bnoordhuis) Implement username/password auth. */
  232. }
  233. conn_write(incoming, "\5\377", 2); /* No acceptable auth. */
  234. return s_kill;
  235. }
  236. /* TODO(bnoordhuis) Implement username/password auth. */
  237. static int do_handshake_auth(client_ctx *cx) {
  238. UNREACHABLE();
  239. return do_kill(cx);
  240. }
  241. static int do_req_start(client_ctx *cx) {
  242. conn *incoming;
  243. incoming = &cx->incoming;
  244. ASSERT(incoming->rdstate == c_stop);
  245. ASSERT(incoming->wrstate == c_done);
  246. incoming->wrstate = c_stop;
  247. if (incoming->result < 0) {
  248. pr_err("write error: %s", uv_strerror(incoming->result));
  249. return do_kill(cx);
  250. }
  251. conn_read(incoming);
  252. return s_req_parse;
  253. }
  254. static int do_req_parse(client_ctx *cx) {
  255. conn *incoming;
  256. conn *outgoing;
  257. s5_ctx *parser;
  258. uint8_t *data;
  259. size_t size;
  260. int err;
  261. parser = &cx->parser;
  262. incoming = &cx->incoming;
  263. outgoing = &cx->outgoing;
  264. ASSERT(incoming->rdstate == c_done);
  265. ASSERT(incoming->wrstate == c_stop);
  266. ASSERT(outgoing->rdstate == c_stop);
  267. ASSERT(outgoing->wrstate == c_stop);
  268. incoming->rdstate = c_stop;
  269. if (incoming->result < 0) {
  270. pr_err("read error: %s", uv_strerror(incoming->result));
  271. return do_kill(cx);
  272. }
  273. data = (uint8_t *) incoming->t.buf;
  274. size = (size_t) incoming->result;
  275. err = s5_parse(parser, &data, &size);
  276. if (err == s5_ok) {
  277. conn_read(incoming);
  278. return s_req_parse; /* Need more data. */
  279. }
  280. if (size != 0) {
  281. pr_err("junk in request %u", (unsigned) size);
  282. return do_kill(cx);
  283. }
  284. if (err != s5_exec_cmd) {
  285. pr_err("request error: %s", s5_strerror(err));
  286. return do_kill(cx);
  287. }
  288. if (parser->cmd == s5_cmd_tcp_bind) {
  289. /* Not supported but relatively straightforward to implement. */
  290. pr_warn("BIND requests are not supported.");
  291. return do_kill(cx);
  292. }
  293. if (parser->cmd == s5_cmd_udp_assoc) {
  294. /* Not supported. Might be hard to implement because libuv has no
  295. * functionality for detecting the MTU size which the RFC mandates.
  296. */
  297. pr_warn("UDP ASSOC requests are not supported.");
  298. return do_kill(cx);
  299. }
  300. ASSERT(parser->cmd == s5_cmd_tcp_connect);
  301. if (parser->atyp == s5_atyp_host) {
  302. conn_getaddrinfo(outgoing, (const char *) parser->daddr);
  303. return s_req_lookup;
  304. }
  305. if (parser->atyp == s5_atyp_ipv4) {
  306. memset(&outgoing->t.addr4, 0, sizeof(outgoing->t.addr4));
  307. outgoing->t.addr4.sin_family = AF_INET;
  308. outgoing->t.addr4.sin_port = htons(parser->dport);
  309. memcpy(&outgoing->t.addr4.sin_addr,
  310. parser->daddr,
  311. sizeof(outgoing->t.addr4.sin_addr));
  312. } else if (parser->atyp == s5_atyp_ipv6) {
  313. memset(&outgoing->t.addr6, 0, sizeof(outgoing->t.addr6));
  314. outgoing->t.addr6.sin6_family = AF_INET6;
  315. outgoing->t.addr6.sin6_port = htons(parser->dport);
  316. memcpy(&outgoing->t.addr6.sin6_addr,
  317. parser->daddr,
  318. sizeof(outgoing->t.addr6.sin6_addr));
  319. } else {
  320. UNREACHABLE();
  321. }
  322. return do_req_connect_start(cx);
  323. }
  324. static int do_req_lookup(client_ctx *cx) {
  325. s5_ctx *parser;
  326. conn *incoming;
  327. conn *outgoing;
  328. parser = &cx->parser;
  329. incoming = &cx->incoming;
  330. outgoing = &cx->outgoing;
  331. ASSERT(incoming->rdstate == c_stop);
  332. ASSERT(incoming->wrstate == c_stop);
  333. ASSERT(outgoing->rdstate == c_stop);
  334. ASSERT(outgoing->wrstate == c_stop);
  335. if (outgoing->result < 0) {
  336. /* TODO(bnoordhuis) Escape control characters in parser->daddr. */
  337. pr_err("lookup error for \"%s\": %s",
  338. parser->daddr,
  339. uv_strerror(outgoing->result));
  340. /* Send back a 'Host unreachable' reply. */
  341. conn_write(incoming, "\5\4\0\1\0\0\0\0\0\0", 10);
  342. return s_kill;
  343. }
  344. /* Don't make assumptions about the offset of sin_port/sin6_port. */
  345. switch (outgoing->t.addr.sa_family) {
  346. case AF_INET:
  347. outgoing->t.addr4.sin_port = htons(parser->dport);
  348. break;
  349. case AF_INET6:
  350. outgoing->t.addr6.sin6_port = htons(parser->dport);
  351. break;
  352. default:
  353. UNREACHABLE();
  354. }
  355. return do_req_connect_start(cx);
  356. }
  357. /* Assumes that cx->outgoing.t.sa contains a valid AF_INET/AF_INET6 address. */
  358. static int do_req_connect_start(client_ctx *cx) {
  359. conn *incoming;
  360. conn *outgoing;
  361. int err;
  362. incoming = &cx->incoming;
  363. outgoing = &cx->outgoing;
  364. ASSERT(incoming->rdstate == c_stop);
  365. ASSERT(incoming->wrstate == c_stop);
  366. ASSERT(outgoing->rdstate == c_stop);
  367. ASSERT(outgoing->wrstate == c_stop);
  368. if (!can_access(cx->sx, cx, &outgoing->t.addr)) {
  369. pr_warn("connection not allowed by ruleset");
  370. /* Send a 'Connection not allowed by ruleset' reply. */
  371. conn_write(incoming, "\5\2\0\1\0\0\0\0\0\0", 10);
  372. return s_kill;
  373. }
  374. err = conn_connect(outgoing);
  375. if (err != 0) {
  376. pr_err("connect error: %s\n", uv_strerror(err));
  377. return do_kill(cx);
  378. }
  379. return s_req_connect;
  380. }
  381. static int do_req_connect(client_ctx *cx) {
  382. const struct sockaddr_in6 *in6;
  383. const struct sockaddr_in *in;
  384. char addr_storage[sizeof(*in6)];
  385. conn *incoming;
  386. conn *outgoing;
  387. uint8_t *buf;
  388. int addrlen;
  389. incoming = &cx->incoming;
  390. outgoing = &cx->outgoing;
  391. ASSERT(incoming->rdstate == c_stop);
  392. ASSERT(incoming->wrstate == c_stop);
  393. ASSERT(outgoing->rdstate == c_stop);
  394. ASSERT(outgoing->wrstate == c_stop);
  395. /* Build and send the reply. Not very pretty but gets the job done. */
  396. buf = (uint8_t *) incoming->t.buf;
  397. if (outgoing->result == 0) {
  398. /* The RFC mandates that the SOCKS server must include the local port
  399. * and address in the reply. So that's what we do.
  400. */
  401. addrlen = sizeof(addr_storage);
  402. CHECK(0 == uv_tcp_getsockname(&outgoing->handle.tcp,
  403. (struct sockaddr *) addr_storage,
  404. &addrlen));
  405. buf[0] = 5; /* Version. */
  406. buf[1] = 0; /* Success. */
  407. buf[2] = 0; /* Reserved. */
  408. if (addrlen == sizeof(*in)) {
  409. buf[3] = 1; /* IPv4. */
  410. in = (const struct sockaddr_in *) &addr_storage;
  411. memcpy(buf + 4, &in->sin_addr, 4);
  412. memcpy(buf + 8, &in->sin_port, 2);
  413. conn_write(incoming, buf, 10);
  414. } else if (addrlen == sizeof(*in6)) {
  415. buf[3] = 4; /* IPv6. */
  416. in6 = (const struct sockaddr_in6 *) &addr_storage;
  417. memcpy(buf + 4, &in6->sin6_addr, 16);
  418. memcpy(buf + 20, &in6->sin6_port, 2);
  419. conn_write(incoming, buf, 22);
  420. } else {
  421. UNREACHABLE();
  422. }
  423. return s_proxy_start;
  424. } else {
  425. pr_err("upstream connection error: %s\n", uv_strerror(outgoing->result));
  426. /* Send a 'Connection refused' reply. */
  427. conn_write(incoming, "\5\5\0\1\0\0\0\0\0\0", 10);
  428. return s_kill;
  429. }
  430. UNREACHABLE();
  431. return s_kill;
  432. }
  433. static int do_proxy_start(client_ctx *cx) {
  434. conn *incoming;
  435. conn *outgoing;
  436. incoming = &cx->incoming;
  437. outgoing = &cx->outgoing;
  438. ASSERT(incoming->rdstate == c_stop);
  439. ASSERT(incoming->wrstate == c_done);
  440. ASSERT(outgoing->rdstate == c_stop);
  441. ASSERT(outgoing->wrstate == c_stop);
  442. incoming->wrstate = c_stop;
  443. if (incoming->result < 0) {
  444. pr_err("write error: %s", uv_strerror(incoming->result));
  445. return do_kill(cx);
  446. }
  447. conn_read(incoming);
  448. conn_read(outgoing);
  449. return s_proxy;
  450. }
  451. /* Proxy incoming data back and forth. */
  452. static int do_proxy(client_ctx *cx) {
  453. if (conn_cycle("client", &cx->incoming, &cx->outgoing)) {
  454. return do_kill(cx);
  455. }
  456. if (conn_cycle("upstream", &cx->outgoing, &cx->incoming)) {
  457. return do_kill(cx);
  458. }
  459. return s_proxy;
  460. }
  461. static int do_kill(client_ctx *cx) {
  462. int new_state;
  463. if (cx->state >= s_almost_dead_0) {
  464. return cx->state;
  465. }
  466. /* Try to cancel the request. The callback still runs but if the
  467. * cancellation succeeded, it gets called with status=UV_ECANCELED.
  468. */
  469. new_state = s_almost_dead_1;
  470. if (cx->state == s_req_lookup) {
  471. new_state = s_almost_dead_0;
  472. uv_cancel(&cx->outgoing.t.req);
  473. }
  474. conn_close(&cx->incoming);
  475. conn_close(&cx->outgoing);
  476. return new_state;
  477. }
  478. static int do_almost_dead(client_ctx *cx) {
  479. ASSERT(cx->state >= s_almost_dead_0);
  480. return cx->state + 1; /* Another finalizer completed. */
  481. }
  482. static int conn_cycle(const char *who, conn *a, conn *b) {
  483. if (a->result < 0) {
  484. if (a->result != UV_EOF) {
  485. pr_err("%s error: %s", who, uv_strerror(a->result));
  486. }
  487. return -1;
  488. }
  489. if (b->result < 0) {
  490. return -1;
  491. }
  492. if (a->wrstate == c_done) {
  493. a->wrstate = c_stop;
  494. }
  495. /* The logic is as follows: read when we don't write and write when we don't
  496. * read. That gives us back-pressure handling for free because if the peer
  497. * sends data faster than we consume it, TCP congestion control kicks in.
  498. */
  499. if (a->wrstate == c_stop) {
  500. if (b->rdstate == c_stop) {
  501. conn_read(b);
  502. } else if (b->rdstate == c_done) {
  503. conn_write(a, b->t.buf, b->result);
  504. b->rdstate = c_stop; /* Triggers the call to conn_read() above. */
  505. }
  506. }
  507. return 0;
  508. }
  509. static void conn_timer_reset(conn *c) {
  510. CHECK(0 == uv_timer_start(&c->timer_handle,
  511. conn_timer_expire,
  512. c->idle_timeout,
  513. 0));
  514. }
  515. static void conn_timer_expire(uv_timer_t *handle, int status) {
  516. conn *c;
  517. CHECK(0 == status);
  518. c = CONTAINER_OF(handle, conn, timer_handle);
  519. c->result = UV_ETIMEDOUT;
  520. do_next(c->client);
  521. }
  522. static void conn_getaddrinfo(conn *c, const char *hostname) {
  523. struct addrinfo hints;
  524. memset(&hints, 0, sizeof(hints));
  525. hints.ai_family = AF_UNSPEC;
  526. hints.ai_socktype = SOCK_STREAM;
  527. hints.ai_protocol = IPPROTO_TCP;
  528. CHECK(0 == uv_getaddrinfo(c->client->sx->loop,
  529. &c->t.addrinfo_req,
  530. conn_getaddrinfo_done,
  531. hostname,
  532. NULL,
  533. &hints));
  534. conn_timer_reset(c);
  535. }
  536. static void conn_getaddrinfo_done(uv_getaddrinfo_t *req,
  537. int status,
  538. struct addrinfo *ai) {
  539. conn *c;
  540. c = CONTAINER_OF(req, conn, t.addrinfo_req);
  541. c->result = status;
  542. if (status == 0) {
  543. /* FIXME(bnoordhuis) Should try all addresses. */
  544. if (ai->ai_family == AF_INET) {
  545. c->t.addr4 = *(const struct sockaddr_in *) ai->ai_addr;
  546. } else if (ai->ai_family == AF_INET6) {
  547. c->t.addr6 = *(const struct sockaddr_in6 *) ai->ai_addr;
  548. } else {
  549. UNREACHABLE();
  550. }
  551. }
  552. uv_freeaddrinfo(ai);
  553. do_next(c->client);
  554. }
  555. /* Assumes that c->t.sa contains a valid AF_INET or AF_INET6 address. */
  556. static int conn_connect(conn *c) {
  557. ASSERT(c->t.addr.sa_family == AF_INET ||
  558. c->t.addr.sa_family == AF_INET6);
  559. conn_timer_reset(c);
  560. return uv_tcp_connect(&c->t.connect_req,
  561. &c->handle.tcp,
  562. &c->t.addr,
  563. conn_connect_done);
  564. }
  565. static void conn_connect_done(uv_connect_t *req, int status) {
  566. conn *c;
  567. if (status == UV_ECANCELED) {
  568. return; /* Handle has been closed. */
  569. }
  570. c = CONTAINER_OF(req, conn, t.connect_req);
  571. c->result = status;
  572. do_next(c->client);
  573. }
  574. static void conn_read(conn *c) {
  575. ASSERT(c->rdstate == c_stop);
  576. CHECK(0 == uv_read_start(&c->handle.stream, conn_alloc, conn_read_done));
  577. c->rdstate = c_busy;
  578. conn_timer_reset(c);
  579. }
  580. static void conn_read_done(uv_stream_t *handle,
  581. ssize_t nread,
  582. const uv_buf_t *buf) {
  583. conn *c;
  584. c = CONTAINER_OF(handle, conn, handle);
  585. ASSERT(c->t.buf == buf->base);
  586. ASSERT(c->rdstate == c_busy);
  587. c->rdstate = c_done;
  588. c->result = nread;
  589. uv_read_stop(&c->handle.stream);
  590. do_next(c->client);
  591. }
  592. static void conn_alloc(uv_handle_t *handle, size_t size, uv_buf_t *buf) {
  593. conn *c;
  594. c = CONTAINER_OF(handle, conn, handle);
  595. ASSERT(c->rdstate == c_busy);
  596. buf->base = c->t.buf;
  597. buf->len = sizeof(c->t.buf);
  598. }
  599. static void conn_write(conn *c, const void *data, unsigned int len) {
  600. uv_buf_t buf;
  601. ASSERT(c->wrstate == c_stop || c->wrstate == c_done);
  602. c->wrstate = c_busy;
  603. /* It's okay to cast away constness here, uv_write() won't modify the
  604. * memory.
  605. */
  606. buf.base = (char *) data;
  607. buf.len = len;
  608. CHECK(0 == uv_write(&c->write_req,
  609. &c->handle.stream,
  610. &buf,
  611. 1,
  612. conn_write_done));
  613. conn_timer_reset(c);
  614. }
  615. static void conn_write_done(uv_write_t *req, int status) {
  616. conn *c;
  617. if (status == UV_ECANCELED) {
  618. return; /* Handle has been closed. */
  619. }
  620. c = CONTAINER_OF(req, conn, write_req);
  621. ASSERT(c->wrstate == c_busy);
  622. c->wrstate = c_done;
  623. c->result = status;
  624. do_next(c->client);
  625. }
  626. static void conn_close(conn *c) {
  627. ASSERT(c->rdstate != c_dead);
  628. ASSERT(c->wrstate != c_dead);
  629. c->rdstate = c_dead;
  630. c->wrstate = c_dead;
  631. c->timer_handle.data = c;
  632. c->handle.handle.data = c;
  633. uv_close(&c->handle.handle, conn_close_done);
  634. uv_close((uv_handle_t *) &c->timer_handle, conn_close_done);
  635. }
  636. static void conn_close_done(uv_handle_t *handle) {
  637. conn *c;
  638. c = handle->data;
  639. do_next(c->client);
  640. }