benchmark-multi-accept.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
  2. *
  3. * Permission is hereby granted, free of charge, to any person obtaining a copy
  4. * of this software and associated documentation files (the "Software"), to
  5. * deal in the Software without restriction, including without limitation the
  6. * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  7. * sell copies of the Software, and to permit persons to whom the Software is
  8. * furnished to do so, subject to the following conditions:
  9. *
  10. * The above copyright notice and this permission notice shall be included in
  11. * all copies or substantial portions of the Software.
  12. *
  13. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  14. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  15. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  16. * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  17. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  18. * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  19. * IN THE SOFTWARE.
  20. */
  21. #include "task.h"
  22. #include "uv.h"
  23. #define IPC_PIPE_NAME TEST_PIPENAME
  24. #define NUM_CONNECTS (250 * 1000)
  25. union stream_handle {
  26. uv_pipe_t pipe;
  27. uv_tcp_t tcp;
  28. };
  29. /* Use as (uv_stream_t *) &handle_storage -- it's kind of clunky but it
  30. * avoids aliasing warnings.
  31. */
  32. typedef unsigned char handle_storage_t[sizeof(union stream_handle)];
  33. /* Used for passing around the listen handle, not part of the benchmark proper.
  34. * We have an overabundance of server types here. It works like this:
  35. *
  36. * 1. The main thread starts an IPC pipe server.
  37. * 2. The worker threads connect to the IPC server and obtain a listen handle.
  38. * 3. The worker threads start accepting requests on the listen handle.
  39. * 4. The main thread starts connecting repeatedly.
  40. *
  41. * Step #4 should perhaps be farmed out over several threads.
  42. */
  43. struct ipc_server_ctx {
  44. handle_storage_t server_handle;
  45. unsigned int num_connects;
  46. uv_pipe_t ipc_pipe;
  47. };
  48. struct ipc_peer_ctx {
  49. handle_storage_t peer_handle;
  50. uv_write_t write_req;
  51. };
  52. struct ipc_client_ctx {
  53. uv_connect_t connect_req;
  54. uv_stream_t* server_handle;
  55. uv_pipe_t ipc_pipe;
  56. char scratch[16];
  57. };
  58. /* Used in the actual benchmark. */
  59. struct server_ctx {
  60. handle_storage_t server_handle;
  61. unsigned int num_connects;
  62. uv_async_t async_handle;
  63. uv_thread_t thread_id;
  64. uv_sem_t semaphore;
  65. };
  66. struct client_ctx {
  67. handle_storage_t client_handle;
  68. unsigned int num_connects;
  69. uv_connect_t connect_req;
  70. uv_idle_t idle_handle;
  71. };
  72. static void ipc_connection_cb(uv_stream_t* ipc_pipe, int status);
  73. static void ipc_write_cb(uv_write_t* req, int status);
  74. static void ipc_close_cb(uv_handle_t* handle);
  75. static void ipc_connect_cb(uv_connect_t* req, int status);
  76. static void ipc_read2_cb(uv_pipe_t* ipc_pipe,
  77. ssize_t nread,
  78. uv_buf_t buf,
  79. uv_handle_type type);
  80. static uv_buf_t ipc_alloc_cb(uv_handle_t* handle, size_t suggested_size);
  81. static void sv_async_cb(uv_async_t* handle, int status);
  82. static void sv_connection_cb(uv_stream_t* server_handle, int status);
  83. static void sv_read_cb(uv_stream_t* handle, ssize_t nread, uv_buf_t buf);
  84. static uv_buf_t sv_alloc_cb(uv_handle_t* handle, size_t suggested_size);
  85. static void cl_connect_cb(uv_connect_t* req, int status);
  86. static void cl_idle_cb(uv_idle_t* handle, int status);
  87. static void cl_close_cb(uv_handle_t* handle);
  88. static struct sockaddr_in listen_addr;
  89. static void ipc_connection_cb(uv_stream_t* ipc_pipe, int status) {
  90. struct ipc_server_ctx* sc;
  91. struct ipc_peer_ctx* pc;
  92. uv_loop_t* loop;
  93. uv_buf_t buf;
  94. loop = ipc_pipe->loop;
  95. buf = uv_buf_init("PING", 4);
  96. sc = container_of(ipc_pipe, struct ipc_server_ctx, ipc_pipe);
  97. pc = calloc(1, sizeof(*pc));
  98. ASSERT(pc != NULL);
  99. if (ipc_pipe->type == UV_TCP)
  100. ASSERT(0 == uv_tcp_init(loop, (uv_tcp_t*) &pc->peer_handle));
  101. else if (ipc_pipe->type == UV_NAMED_PIPE)
  102. ASSERT(0 == uv_pipe_init(loop, (uv_pipe_t*) &pc->peer_handle, 1));
  103. else
  104. ASSERT(0);
  105. ASSERT(0 == uv_accept(ipc_pipe, (uv_stream_t*) &pc->peer_handle));
  106. ASSERT(0 == uv_write2(&pc->write_req,
  107. (uv_stream_t*) &pc->peer_handle,
  108. &buf,
  109. 1,
  110. (uv_stream_t*) &sc->server_handle,
  111. ipc_write_cb));
  112. if (--sc->num_connects == 0)
  113. uv_close((uv_handle_t*) ipc_pipe, NULL);
  114. }
  115. static void ipc_write_cb(uv_write_t* req, int status) {
  116. struct ipc_peer_ctx* ctx;
  117. ctx = container_of(req, struct ipc_peer_ctx, write_req);
  118. uv_close((uv_handle_t*) &ctx->peer_handle, ipc_close_cb);
  119. }
  120. static void ipc_close_cb(uv_handle_t* handle) {
  121. struct ipc_peer_ctx* ctx;
  122. ctx = container_of(handle, struct ipc_peer_ctx, peer_handle);
  123. free(ctx);
  124. }
  125. static void ipc_connect_cb(uv_connect_t* req, int status) {
  126. struct ipc_client_ctx* ctx;
  127. ctx = container_of(req, struct ipc_client_ctx, connect_req);
  128. ASSERT(0 == status);
  129. ASSERT(0 == uv_read2_start((uv_stream_t*) &ctx->ipc_pipe,
  130. ipc_alloc_cb,
  131. ipc_read2_cb));
  132. }
  133. static uv_buf_t ipc_alloc_cb(uv_handle_t* handle, size_t suggested_size) {
  134. struct ipc_client_ctx* ctx;
  135. ctx = container_of(handle, struct ipc_client_ctx, ipc_pipe);
  136. return uv_buf_init(ctx->scratch, sizeof(ctx->scratch));
  137. }
  138. static void ipc_read2_cb(uv_pipe_t* ipc_pipe,
  139. ssize_t nread,
  140. uv_buf_t buf,
  141. uv_handle_type type) {
  142. struct ipc_client_ctx* ctx;
  143. uv_loop_t* loop;
  144. ctx = container_of(ipc_pipe, struct ipc_client_ctx, ipc_pipe);
  145. loop = ipc_pipe->loop;
  146. if (type == UV_TCP)
  147. ASSERT(0 == uv_tcp_init(loop, (uv_tcp_t*) ctx->server_handle));
  148. else if (type == UV_NAMED_PIPE)
  149. ASSERT(0 == uv_pipe_init(loop, (uv_pipe_t*) ctx->server_handle, 0));
  150. else
  151. ASSERT(0);
  152. ASSERT(0 == uv_accept((uv_stream_t*) &ctx->ipc_pipe, ctx->server_handle));
  153. uv_close((uv_handle_t*) &ctx->ipc_pipe, NULL);
  154. }
  155. /* Set up an IPC pipe server that hands out listen sockets to the worker
  156. * threads. It's kind of cumbersome for such a simple operation, maybe we
  157. * should revive uv_import() and uv_export().
  158. */
  159. static void send_listen_handles(uv_handle_type type,
  160. unsigned int num_servers,
  161. struct server_ctx* servers) {
  162. struct ipc_server_ctx ctx;
  163. uv_loop_t* loop;
  164. unsigned int i;
  165. loop = uv_default_loop();
  166. ctx.num_connects = num_servers;
  167. if (type == UV_TCP) {
  168. ASSERT(0 == uv_tcp_init(loop, (uv_tcp_t*) &ctx.server_handle));
  169. ASSERT(0 == uv_tcp_bind((uv_tcp_t*) &ctx.server_handle, listen_addr));
  170. }
  171. else
  172. ASSERT(0);
  173. ASSERT(0 == uv_pipe_init(loop, &ctx.ipc_pipe, 1));
  174. ASSERT(0 == uv_pipe_bind(&ctx.ipc_pipe, IPC_PIPE_NAME));
  175. ASSERT(0 == uv_listen((uv_stream_t*) &ctx.ipc_pipe, 128, ipc_connection_cb));
  176. for (i = 0; i < num_servers; i++)
  177. uv_sem_post(&servers[i].semaphore);
  178. ASSERT(0 == uv_run(loop, UV_RUN_DEFAULT));
  179. uv_close((uv_handle_t*) &ctx.server_handle, NULL);
  180. ASSERT(0 == uv_run(loop, UV_RUN_DEFAULT));
  181. for (i = 0; i < num_servers; i++)
  182. uv_sem_wait(&servers[i].semaphore);
  183. }
  184. static void get_listen_handle(uv_loop_t* loop, uv_stream_t* server_handle) {
  185. struct ipc_client_ctx ctx;
  186. ctx.server_handle = server_handle;
  187. ctx.server_handle->data = "server handle";
  188. ASSERT(0 == uv_pipe_init(loop, &ctx.ipc_pipe, 1));
  189. uv_pipe_connect(&ctx.connect_req,
  190. &ctx.ipc_pipe,
  191. IPC_PIPE_NAME,
  192. ipc_connect_cb);
  193. ASSERT(0 == uv_run(loop, UV_RUN_DEFAULT));
  194. }
  195. static void server_cb(void *arg) {
  196. struct server_ctx *ctx;
  197. uv_loop_t* loop;
  198. ctx = arg;
  199. loop = uv_loop_new();
  200. ASSERT(loop != NULL);
  201. ASSERT(0 == uv_async_init(loop, &ctx->async_handle, sv_async_cb));
  202. uv_unref((uv_handle_t*) &ctx->async_handle);
  203. /* Wait until the main thread is ready. */
  204. uv_sem_wait(&ctx->semaphore);
  205. get_listen_handle(loop, (uv_stream_t*) &ctx->server_handle);
  206. uv_sem_post(&ctx->semaphore);
  207. /* Now start the actual benchmark. */
  208. ASSERT(0 == uv_listen((uv_stream_t*) &ctx->server_handle,
  209. 128,
  210. sv_connection_cb));
  211. ASSERT(0 == uv_run(loop, UV_RUN_DEFAULT));
  212. uv_loop_delete(loop);
  213. }
  214. static void sv_async_cb(uv_async_t* handle, int status) {
  215. struct server_ctx* ctx;
  216. ctx = container_of(handle, struct server_ctx, async_handle);
  217. uv_close((uv_handle_t*) &ctx->server_handle, NULL);
  218. uv_close((uv_handle_t*) &ctx->async_handle, NULL);
  219. }
  220. static void sv_connection_cb(uv_stream_t* server_handle, int status) {
  221. handle_storage_t* storage;
  222. struct server_ctx* ctx;
  223. ctx = container_of(server_handle, struct server_ctx, server_handle);
  224. ASSERT(status == 0);
  225. storage = malloc(sizeof(*storage));
  226. ASSERT(storage != NULL);
  227. if (server_handle->type == UV_TCP)
  228. ASSERT(0 == uv_tcp_init(server_handle->loop, (uv_tcp_t*) storage));
  229. else if (server_handle->type == UV_NAMED_PIPE)
  230. ASSERT(0 == uv_pipe_init(server_handle->loop, (uv_pipe_t*) storage, 0));
  231. else
  232. ASSERT(0);
  233. ASSERT(0 == uv_accept(server_handle, (uv_stream_t*) storage));
  234. ASSERT(0 == uv_read_start((uv_stream_t*) storage, sv_alloc_cb, sv_read_cb));
  235. ctx->num_connects++;
  236. }
  237. static uv_buf_t sv_alloc_cb(uv_handle_t* handle, size_t suggested_size) {
  238. static char buf[32];
  239. return uv_buf_init(buf, sizeof(buf));
  240. }
  241. static void sv_read_cb(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
  242. ASSERT(nread == -1);
  243. ASSERT(uv_last_error(handle->loop).code == UV_EOF);
  244. uv_close((uv_handle_t*) handle, (uv_close_cb) free);
  245. }
  246. static void cl_connect_cb(uv_connect_t* req, int status) {
  247. struct client_ctx* ctx = container_of(req, struct client_ctx, connect_req);
  248. uv_idle_start(&ctx->idle_handle, cl_idle_cb);
  249. ASSERT(0 == status);
  250. }
  251. static void cl_idle_cb(uv_idle_t* handle, int status) {
  252. struct client_ctx* ctx = container_of(handle, struct client_ctx, idle_handle);
  253. uv_close((uv_handle_t*) &ctx->client_handle, cl_close_cb);
  254. uv_idle_stop(&ctx->idle_handle);
  255. }
  256. static void cl_close_cb(uv_handle_t* handle) {
  257. struct client_ctx* ctx;
  258. ctx = container_of(handle, struct client_ctx, client_handle);
  259. if (--ctx->num_connects == 0) {
  260. uv_close((uv_handle_t*) &ctx->idle_handle, NULL);
  261. return;
  262. }
  263. ASSERT(0 == uv_tcp_init(handle->loop, (uv_tcp_t*) &ctx->client_handle));
  264. ASSERT(0 == uv_tcp_connect(&ctx->connect_req,
  265. (uv_tcp_t*) &ctx->client_handle,
  266. listen_addr,
  267. cl_connect_cb));
  268. }
  269. static int test_tcp(unsigned int num_servers, unsigned int num_clients) {
  270. struct server_ctx* servers;
  271. struct client_ctx* clients;
  272. uv_loop_t* loop;
  273. uv_tcp_t* handle;
  274. unsigned int i;
  275. double time;
  276. listen_addr = uv_ip4_addr("127.0.0.1", TEST_PORT);
  277. loop = uv_default_loop();
  278. servers = calloc(num_servers, sizeof(servers[0]));
  279. clients = calloc(num_clients, sizeof(clients[0]));
  280. ASSERT(servers != NULL);
  281. ASSERT(clients != NULL);
  282. /* We're making the assumption here that from the perspective of the
  283. * OS scheduler, threads are functionally equivalent to and interchangeable
  284. * with full-blown processes.
  285. */
  286. for (i = 0; i < num_servers; i++) {
  287. struct server_ctx* ctx = servers + i;
  288. ASSERT(0 == uv_sem_init(&ctx->semaphore, 0));
  289. ASSERT(0 == uv_thread_create(&ctx->thread_id, server_cb, ctx));
  290. }
  291. send_listen_handles(UV_TCP, num_servers, servers);
  292. for (i = 0; i < num_clients; i++) {
  293. struct client_ctx* ctx = clients + i;
  294. ctx->num_connects = NUM_CONNECTS / num_clients;
  295. handle = (uv_tcp_t*) &ctx->client_handle;
  296. handle->data = "client handle";
  297. ASSERT(0 == uv_tcp_init(loop, handle));
  298. ASSERT(0 == uv_tcp_connect(&ctx->connect_req,
  299. handle,
  300. listen_addr,
  301. cl_connect_cb));
  302. ASSERT(0 == uv_idle_init(loop, &ctx->idle_handle));
  303. }
  304. {
  305. uint64_t t = uv_hrtime();
  306. ASSERT(0 == uv_run(loop, UV_RUN_DEFAULT));
  307. t = uv_hrtime() - t;
  308. time = t / 1e9;
  309. }
  310. for (i = 0; i < num_servers; i++) {
  311. struct server_ctx* ctx = servers + i;
  312. uv_async_send(&ctx->async_handle);
  313. ASSERT(0 == uv_thread_join(&ctx->thread_id));
  314. uv_sem_destroy(&ctx->semaphore);
  315. }
  316. printf("accept%u: %.0f accepts/sec (%u total)\n",
  317. num_servers,
  318. NUM_CONNECTS / time,
  319. NUM_CONNECTS);
  320. for (i = 0; i < num_servers; i++) {
  321. struct server_ctx* ctx = servers + i;
  322. printf(" thread #%u: %.0f accepts/sec (%u total, %.1f%%)\n",
  323. i,
  324. ctx->num_connects / time,
  325. ctx->num_connects,
  326. ctx->num_connects * 100.0 / NUM_CONNECTS);
  327. }
  328. free(clients);
  329. free(servers);
  330. MAKE_VALGRIND_HAPPY();
  331. return 0;
  332. }
  333. BENCHMARK_IMPL(tcp_multi_accept2) {
  334. return test_tcp(2, 40);
  335. }
  336. BENCHMARK_IMPL(tcp_multi_accept4) {
  337. return test_tcp(4, 40);
  338. }
  339. BENCHMARK_IMPL(tcp_multi_accept8) {
  340. return test_tcp(8, 40);
  341. }