test-ipc.c 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648
  1. /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
  2. *
  3. * Permission is hereby granted, free of charge, to any person obtaining a copy
  4. * of this software and associated documentation files (the "Software"), to
  5. * deal in the Software without restriction, including without limitation the
  6. * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  7. * sell copies of the Software, and to permit persons to whom the Software is
  8. * furnished to do so, subject to the following conditions:
  9. *
  10. * The above copyright notice and this permission notice shall be included in
  11. * all copies or substantial portions of the Software.
  12. *
  13. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  14. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  15. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  16. * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  17. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  18. * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  19. * IN THE SOFTWARE.
  20. */
  21. #include "uv.h"
  22. #include "task.h"
  23. #include <stdio.h>
  24. #include <string.h>
  25. static uv_pipe_t channel;
  26. static uv_tcp_t tcp_server;
  27. static uv_tcp_t tcp_connection;
  28. static int exit_cb_called;
  29. static int read2_cb_called;
  30. static int tcp_write_cb_called;
  31. static int tcp_read_cb_called;
  32. static int on_pipe_read_called;
  33. static int local_conn_accepted;
  34. static int remote_conn_accepted;
  35. static int tcp_server_listening;
  36. static uv_write_t write_req;
  37. static uv_pipe_t channel;
  38. static uv_tcp_t tcp_server;
  39. static uv_write_t conn_notify_req;
  40. static int close_cb_called;
  41. static int connection_accepted;
  42. static int tcp_conn_read_cb_called;
  43. static int tcp_conn_write_cb_called;
  44. typedef struct {
  45. uv_connect_t conn_req;
  46. uv_write_t tcp_write_req;
  47. uv_tcp_t conn;
  48. } tcp_conn;
  49. #define CONN_COUNT 100
  50. static void close_server_conn_cb(uv_handle_t* handle) {
  51. free(handle);
  52. }
  53. static void on_connection(uv_stream_t* server, int status) {
  54. uv_tcp_t* conn;
  55. int r;
  56. if (!local_conn_accepted) {
  57. /* Accept the connection and close it. Also and close the server. */
  58. ASSERT(status == 0);
  59. ASSERT((uv_stream_t*)&tcp_server == server);
  60. conn = malloc(sizeof(*conn));
  61. ASSERT(conn);
  62. r = uv_tcp_init(server->loop, conn);
  63. ASSERT(r == 0);
  64. r = uv_accept(server, (uv_stream_t*)conn);
  65. ASSERT(r == 0);
  66. uv_close((uv_handle_t*)conn, close_server_conn_cb);
  67. uv_close((uv_handle_t*)server, NULL);
  68. local_conn_accepted = 1;
  69. }
  70. }
  71. static void exit_cb(uv_process_t* process,
  72. int64_t exit_status,
  73. int term_signal) {
  74. printf("exit_cb\n");
  75. exit_cb_called++;
  76. ASSERT(exit_status == 0);
  77. uv_close((uv_handle_t*)process, NULL);
  78. }
  79. static void on_alloc(uv_handle_t* handle,
  80. size_t suggested_size,
  81. uv_buf_t* buf) {
  82. buf->base = malloc(suggested_size);
  83. buf->len = suggested_size;
  84. }
  85. static void close_client_conn_cb(uv_handle_t* handle) {
  86. tcp_conn* p = (tcp_conn*)handle->data;
  87. free(p);
  88. }
  89. static void connect_cb(uv_connect_t* req, int status) {
  90. uv_close((uv_handle_t*)req->handle, close_client_conn_cb);
  91. }
  92. static void make_many_connections(void) {
  93. tcp_conn* conn;
  94. struct sockaddr_in addr;
  95. int r, i;
  96. for (i = 0; i < CONN_COUNT; i++) {
  97. conn = malloc(sizeof(*conn));
  98. ASSERT(conn);
  99. r = uv_tcp_init(uv_default_loop(), &conn->conn);
  100. ASSERT(r == 0);
  101. ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
  102. r = uv_tcp_connect(&conn->conn_req,
  103. (uv_tcp_t*) &conn->conn,
  104. (const struct sockaddr*) &addr,
  105. connect_cb);
  106. ASSERT(r == 0);
  107. conn->conn.data = conn;
  108. }
  109. }
  110. static void on_read(uv_pipe_t* pipe,
  111. ssize_t nread,
  112. const uv_buf_t* buf,
  113. uv_handle_type pending) {
  114. int r;
  115. uv_buf_t outbuf;
  116. if (nread == 0) {
  117. /* Everything OK, but nothing read. */
  118. free(buf->base);
  119. return;
  120. }
  121. if (nread < 0) {
  122. if (nread == UV_EOF) {
  123. free(buf->base);
  124. return;
  125. }
  126. printf("error recving on channel: %s\n", uv_strerror(nread));
  127. abort();
  128. }
  129. fprintf(stderr, "got %d bytes\n", (int)nread);
  130. if (!tcp_server_listening) {
  131. ASSERT(nread > 0 && buf->base && pending != UV_UNKNOWN_HANDLE);
  132. read2_cb_called++;
  133. /* Accept the pending TCP server, and start listening on it. */
  134. ASSERT(pending == UV_TCP);
  135. r = uv_tcp_init(uv_default_loop(), &tcp_server);
  136. ASSERT(r == 0);
  137. r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server);
  138. ASSERT(r == 0);
  139. r = uv_listen((uv_stream_t*)&tcp_server, 12, on_connection);
  140. ASSERT(r == 0);
  141. tcp_server_listening = 1;
  142. /* Make sure that the expected data is correctly multiplexed. */
  143. ASSERT(memcmp("hello\n", buf->base, nread) == 0);
  144. outbuf = uv_buf_init("world\n", 6);
  145. r = uv_write(&write_req, (uv_stream_t*)pipe, &outbuf, 1, NULL);
  146. ASSERT(r == 0);
  147. /* Create a bunch of connections to get both servers to accept. */
  148. make_many_connections();
  149. } else if (memcmp("accepted_connection\n", buf->base, nread) == 0) {
  150. /* Remote server has accepted a connection. Close the channel. */
  151. ASSERT(pending == UV_UNKNOWN_HANDLE);
  152. remote_conn_accepted = 1;
  153. uv_close((uv_handle_t*)&channel, NULL);
  154. }
  155. free(buf->base);
  156. }
  157. void spawn_helper(uv_pipe_t* channel,
  158. uv_process_t* process,
  159. const char* helper) {
  160. uv_process_options_t options;
  161. size_t exepath_size;
  162. char exepath[1024];
  163. char* args[3];
  164. int r;
  165. uv_stdio_container_t stdio[1];
  166. r = uv_pipe_init(uv_default_loop(), channel, 1);
  167. ASSERT(r == 0);
  168. ASSERT(channel->ipc);
  169. exepath_size = sizeof(exepath);
  170. r = uv_exepath(exepath, &exepath_size);
  171. ASSERT(r == 0);
  172. exepath[exepath_size] = '\0';
  173. args[0] = exepath;
  174. args[1] = (char*)helper;
  175. args[2] = NULL;
  176. memset(&options, 0, sizeof(options));
  177. options.file = exepath;
  178. options.args = args;
  179. options.exit_cb = exit_cb;
  180. options.stdio = stdio;
  181. options.stdio[0].flags = UV_CREATE_PIPE |
  182. UV_READABLE_PIPE | UV_WRITABLE_PIPE;
  183. options.stdio[0].data.stream = (uv_stream_t*)channel;
  184. options.stdio_count = 1;
  185. r = uv_spawn(uv_default_loop(), process, &options);
  186. ASSERT(r == 0);
  187. }
  188. static void on_tcp_write(uv_write_t* req, int status) {
  189. ASSERT(status == 0);
  190. ASSERT(req->handle == (uv_stream_t*)&tcp_connection);
  191. tcp_write_cb_called++;
  192. }
  193. static void on_read_alloc(uv_handle_t* handle,
  194. size_t suggested_size,
  195. uv_buf_t* buf) {
  196. buf->base = malloc(suggested_size);
  197. buf->len = suggested_size;
  198. }
  199. static void on_tcp_read(uv_stream_t* tcp, ssize_t nread, const uv_buf_t* buf) {
  200. ASSERT(nread > 0);
  201. ASSERT(memcmp("hello again\n", buf->base, nread) == 0);
  202. ASSERT(tcp == (uv_stream_t*)&tcp_connection);
  203. free(buf->base);
  204. tcp_read_cb_called++;
  205. uv_close((uv_handle_t*)tcp, NULL);
  206. uv_close((uv_handle_t*)&channel, NULL);
  207. }
  208. static void on_read_connection(uv_pipe_t* pipe,
  209. ssize_t nread,
  210. const uv_buf_t* buf,
  211. uv_handle_type pending) {
  212. int r;
  213. uv_buf_t outbuf;
  214. if (nread == 0) {
  215. /* Everything OK, but nothing read. */
  216. free(buf->base);
  217. return;
  218. }
  219. if (nread < 0) {
  220. if (nread == UV_EOF) {
  221. free(buf->base);
  222. return;
  223. }
  224. printf("error recving on channel: %s\n", uv_strerror(nread));
  225. abort();
  226. }
  227. fprintf(stderr, "got %d bytes\n", (int)nread);
  228. ASSERT(nread > 0 && buf->base && pending != UV_UNKNOWN_HANDLE);
  229. read2_cb_called++;
  230. /* Accept the pending TCP connection */
  231. ASSERT(pending == UV_TCP);
  232. r = uv_tcp_init(uv_default_loop(), &tcp_connection);
  233. ASSERT(r == 0);
  234. r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_connection);
  235. ASSERT(r == 0);
  236. /* Make sure that the expected data is correctly multiplexed. */
  237. ASSERT(memcmp("hello\n", buf->base, nread) == 0);
  238. /* Write/read to/from the connection */
  239. outbuf = uv_buf_init("world\n", 6);
  240. r = uv_write(&write_req, (uv_stream_t*)&tcp_connection, &outbuf, 1,
  241. on_tcp_write);
  242. ASSERT(r == 0);
  243. r = uv_read_start((uv_stream_t*)&tcp_connection, on_read_alloc, on_tcp_read);
  244. ASSERT(r == 0);
  245. free(buf->base);
  246. }
  247. static int run_ipc_test(const char* helper, uv_read2_cb read_cb) {
  248. uv_process_t process;
  249. int r;
  250. spawn_helper(&channel, &process, helper);
  251. uv_read2_start((uv_stream_t*)&channel, on_alloc, read_cb);
  252. r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
  253. ASSERT(r == 0);
  254. MAKE_VALGRIND_HAPPY();
  255. return 0;
  256. }
  257. TEST_IMPL(ipc_listen_before_write) {
  258. int r = run_ipc_test("ipc_helper_listen_before_write", on_read);
  259. ASSERT(local_conn_accepted == 1);
  260. ASSERT(remote_conn_accepted == 1);
  261. ASSERT(read2_cb_called == 1);
  262. ASSERT(exit_cb_called == 1);
  263. return r;
  264. }
  265. TEST_IMPL(ipc_listen_after_write) {
  266. int r = run_ipc_test("ipc_helper_listen_after_write", on_read);
  267. ASSERT(local_conn_accepted == 1);
  268. ASSERT(remote_conn_accepted == 1);
  269. ASSERT(read2_cb_called == 1);
  270. ASSERT(exit_cb_called == 1);
  271. return r;
  272. }
  273. TEST_IMPL(ipc_tcp_connection) {
  274. int r = run_ipc_test("ipc_helper_tcp_connection", on_read_connection);
  275. ASSERT(read2_cb_called == 1);
  276. ASSERT(tcp_write_cb_called == 1);
  277. ASSERT(tcp_read_cb_called == 1);
  278. ASSERT(exit_cb_called == 1);
  279. return r;
  280. }
  281. #ifdef _WIN32
  282. TEST_IMPL(listen_with_simultaneous_accepts) {
  283. uv_tcp_t server;
  284. int r;
  285. struct sockaddr_in addr;
  286. ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
  287. r = uv_tcp_init(uv_default_loop(), &server);
  288. ASSERT(r == 0);
  289. r = uv_tcp_bind(&server, (const struct sockaddr*) &addr, 0);
  290. ASSERT(r == 0);
  291. r = uv_tcp_simultaneous_accepts(&server, 1);
  292. ASSERT(r == 0);
  293. r = uv_listen((uv_stream_t*)&server, SOMAXCONN, NULL);
  294. ASSERT(r == 0);
  295. ASSERT(server.reqs_pending == 32);
  296. MAKE_VALGRIND_HAPPY();
  297. return 0;
  298. }
  299. TEST_IMPL(listen_no_simultaneous_accepts) {
  300. uv_tcp_t server;
  301. int r;
  302. struct sockaddr_in addr;
  303. ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
  304. r = uv_tcp_init(uv_default_loop(), &server);
  305. ASSERT(r == 0);
  306. r = uv_tcp_bind(&server, (const struct sockaddr*) &addr, 0);
  307. ASSERT(r == 0);
  308. r = uv_tcp_simultaneous_accepts(&server, 0);
  309. ASSERT(r == 0);
  310. r = uv_listen((uv_stream_t*)&server, SOMAXCONN, NULL);
  311. ASSERT(r == 0);
  312. ASSERT(server.reqs_pending == 1);
  313. MAKE_VALGRIND_HAPPY();
  314. return 0;
  315. }
  316. #endif
  317. /* Everything here runs in a child process. */
  318. static tcp_conn conn;
  319. static void close_cb(uv_handle_t* handle) {
  320. close_cb_called++;
  321. }
  322. static void conn_notify_write_cb(uv_write_t* req, int status) {
  323. uv_close((uv_handle_t*)&tcp_server, close_cb);
  324. uv_close((uv_handle_t*)&channel, close_cb);
  325. }
  326. static void tcp_connection_write_cb(uv_write_t* req, int status) {
  327. ASSERT((uv_handle_t*)&conn.conn == (uv_handle_t*)req->handle);
  328. uv_close((uv_handle_t*)req->handle, close_cb);
  329. uv_close((uv_handle_t*)&channel, close_cb);
  330. uv_close((uv_handle_t*)&tcp_server, close_cb);
  331. tcp_conn_write_cb_called++;
  332. }
  333. static void on_tcp_child_process_read(uv_stream_t* tcp,
  334. ssize_t nread,
  335. const uv_buf_t* buf) {
  336. uv_buf_t outbuf;
  337. int r;
  338. if (nread < 0) {
  339. if (nread == UV_EOF) {
  340. free(buf->base);
  341. return;
  342. }
  343. printf("error recving on tcp connection: %s\n", uv_strerror(nread));
  344. abort();
  345. }
  346. ASSERT(nread > 0);
  347. ASSERT(memcmp("world\n", buf->base, nread) == 0);
  348. on_pipe_read_called++;
  349. free(buf->base);
  350. /* Write to the socket */
  351. outbuf = uv_buf_init("hello again\n", 12);
  352. r = uv_write(&conn.tcp_write_req, tcp, &outbuf, 1, tcp_connection_write_cb);
  353. ASSERT(r == 0);
  354. tcp_conn_read_cb_called++;
  355. }
  356. static void connect_child_process_cb(uv_connect_t* req, int status) {
  357. int r;
  358. ASSERT(status == 0);
  359. r = uv_read_start(req->handle, on_read_alloc, on_tcp_child_process_read);
  360. ASSERT(r == 0);
  361. }
  362. static void ipc_on_connection(uv_stream_t* server, int status) {
  363. int r;
  364. uv_buf_t buf;
  365. if (!connection_accepted) {
  366. /*
  367. * Accept the connection and close it. Also let the other
  368. * side know.
  369. */
  370. ASSERT(status == 0);
  371. ASSERT((uv_stream_t*)&tcp_server == server);
  372. r = uv_tcp_init(server->loop, &conn.conn);
  373. ASSERT(r == 0);
  374. r = uv_accept(server, (uv_stream_t*)&conn.conn);
  375. ASSERT(r == 0);
  376. uv_close((uv_handle_t*)&conn.conn, close_cb);
  377. buf = uv_buf_init("accepted_connection\n", 20);
  378. r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
  379. NULL, conn_notify_write_cb);
  380. ASSERT(r == 0);
  381. connection_accepted = 1;
  382. }
  383. }
  384. static void ipc_on_connection_tcp_conn(uv_stream_t* server, int status) {
  385. int r;
  386. uv_buf_t buf;
  387. uv_tcp_t* conn;
  388. ASSERT(status == 0);
  389. ASSERT((uv_stream_t*)&tcp_server == server);
  390. conn = malloc(sizeof(*conn));
  391. ASSERT(conn);
  392. r = uv_tcp_init(server->loop, conn);
  393. ASSERT(r == 0);
  394. r = uv_accept(server, (uv_stream_t*)conn);
  395. ASSERT(r == 0);
  396. /* Send the accepted connection to the other process */
  397. buf = uv_buf_init("hello\n", 6);
  398. r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
  399. (uv_stream_t*)conn, NULL);
  400. ASSERT(r == 0);
  401. r = uv_read_start((uv_stream_t*) conn,
  402. on_read_alloc,
  403. on_tcp_child_process_read);
  404. ASSERT(r == 0);
  405. uv_close((uv_handle_t*)conn, close_cb);
  406. }
  407. int ipc_helper(int listen_after_write) {
  408. /*
  409. * This is launched from test-ipc.c. stdin is a duplex channel that we
  410. * over which a handle will be transmitted.
  411. */
  412. struct sockaddr_in addr;
  413. uv_write_t write_req;
  414. int r;
  415. uv_buf_t buf;
  416. ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
  417. r = uv_pipe_init(uv_default_loop(), &channel, 1);
  418. ASSERT(r == 0);
  419. uv_pipe_open(&channel, 0);
  420. ASSERT(1 == uv_is_readable((uv_stream_t*) &channel));
  421. ASSERT(1 == uv_is_writable((uv_stream_t*) &channel));
  422. ASSERT(0 == uv_is_closing((uv_handle_t*) &channel));
  423. r = uv_tcp_init(uv_default_loop(), &tcp_server);
  424. ASSERT(r == 0);
  425. r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
  426. ASSERT(r == 0);
  427. if (!listen_after_write) {
  428. r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection);
  429. ASSERT(r == 0);
  430. }
  431. buf = uv_buf_init("hello\n", 6);
  432. r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1,
  433. (uv_stream_t*)&tcp_server, NULL);
  434. ASSERT(r == 0);
  435. if (listen_after_write) {
  436. r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection);
  437. ASSERT(r == 0);
  438. }
  439. r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
  440. ASSERT(r == 0);
  441. ASSERT(connection_accepted == 1);
  442. ASSERT(close_cb_called == 3);
  443. MAKE_VALGRIND_HAPPY();
  444. return 0;
  445. }
  446. int ipc_helper_tcp_connection(void) {
  447. /*
  448. * This is launched from test-ipc.c. stdin is a duplex channel that we
  449. * over which a handle will be transmitted.
  450. */
  451. int r;
  452. struct sockaddr_in addr;
  453. r = uv_pipe_init(uv_default_loop(), &channel, 1);
  454. ASSERT(r == 0);
  455. uv_pipe_open(&channel, 0);
  456. ASSERT(1 == uv_is_readable((uv_stream_t*) &channel));
  457. ASSERT(1 == uv_is_writable((uv_stream_t*) &channel));
  458. ASSERT(0 == uv_is_closing((uv_handle_t*) &channel));
  459. r = uv_tcp_init(uv_default_loop(), &tcp_server);
  460. ASSERT(r == 0);
  461. ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
  462. r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
  463. ASSERT(r == 0);
  464. r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection_tcp_conn);
  465. ASSERT(r == 0);
  466. /* Make a connection to the server */
  467. r = uv_tcp_init(uv_default_loop(), &conn.conn);
  468. ASSERT(r == 0);
  469. ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
  470. r = uv_tcp_connect(&conn.conn_req,
  471. (uv_tcp_t*) &conn.conn,
  472. (const struct sockaddr*) &addr,
  473. connect_child_process_cb);
  474. ASSERT(r == 0);
  475. r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
  476. ASSERT(r == 0);
  477. ASSERT(tcp_conn_read_cb_called == 1);
  478. ASSERT(tcp_conn_write_cb_called == 1);
  479. ASSERT(close_cb_called == 4);
  480. MAKE_VALGRIND_HAPPY();
  481. return 0;
  482. }