1
0

benchmark-multi-accept.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
  2. *
  3. * Permission is hereby granted, free of charge, to any person obtaining a copy
  4. * of this software and associated documentation files (the "Software"), to
  5. * deal in the Software without restriction, including without limitation the
  6. * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  7. * sell copies of the Software, and to permit persons to whom the Software is
  8. * furnished to do so, subject to the following conditions:
  9. *
  10. * The above copyright notice and this permission notice shall be included in
  11. * all copies or substantial portions of the Software.
  12. *
  13. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  14. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  15. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  16. * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  17. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  18. * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  19. * IN THE SOFTWARE.
  20. */
  21. #include "task.h"
  22. #include "uv.h"
  23. #define IPC_PIPE_NAME TEST_PIPENAME
  24. #define NUM_CONNECTS (250 * 1000)
  25. union stream_handle {
  26. uv_pipe_t pipe;
  27. uv_tcp_t tcp;
  28. };
  29. /* Use as (uv_stream_t *) &handle_storage -- it's kind of clunky but it
  30. * avoids aliasing warnings.
  31. */
  32. typedef unsigned char handle_storage_t[sizeof(union stream_handle)];
  33. /* Used for passing around the listen handle, not part of the benchmark proper.
  34. * We have an overabundance of server types here. It works like this:
  35. *
  36. * 1. The main thread starts an IPC pipe server.
  37. * 2. The worker threads connect to the IPC server and obtain a listen handle.
  38. * 3. The worker threads start accepting requests on the listen handle.
  39. * 4. The main thread starts connecting repeatedly.
  40. *
  41. * Step #4 should perhaps be farmed out over several threads.
  42. */
  43. struct ipc_server_ctx {
  44. handle_storage_t server_handle;
  45. unsigned int num_connects;
  46. uv_pipe_t ipc_pipe;
  47. };
  48. struct ipc_peer_ctx {
  49. handle_storage_t peer_handle;
  50. uv_write_t write_req;
  51. };
  52. struct ipc_client_ctx {
  53. uv_connect_t connect_req;
  54. uv_stream_t* server_handle;
  55. uv_pipe_t ipc_pipe;
  56. char scratch[16];
  57. };
  58. /* Used in the actual benchmark. */
  59. struct server_ctx {
  60. handle_storage_t server_handle;
  61. unsigned int num_connects;
  62. uv_async_t async_handle;
  63. uv_thread_t thread_id;
  64. uv_sem_t semaphore;
  65. };
  66. struct client_ctx {
  67. handle_storage_t client_handle;
  68. unsigned int num_connects;
  69. uv_connect_t connect_req;
  70. uv_idle_t idle_handle;
  71. };
  72. static void ipc_connection_cb(uv_stream_t* ipc_pipe, int status);
  73. static void ipc_write_cb(uv_write_t* req, int status);
  74. static void ipc_close_cb(uv_handle_t* handle);
  75. static void ipc_connect_cb(uv_connect_t* req, int status);
  76. static void ipc_read2_cb(uv_pipe_t* ipc_pipe,
  77. ssize_t nread,
  78. const uv_buf_t* buf,
  79. uv_handle_type type);
  80. static void ipc_alloc_cb(uv_handle_t* handle,
  81. size_t suggested_size,
  82. uv_buf_t* buf);
  83. static void sv_async_cb(uv_async_t* handle, int status);
  84. static void sv_connection_cb(uv_stream_t* server_handle, int status);
  85. static void sv_read_cb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf);
  86. static void sv_alloc_cb(uv_handle_t* handle,
  87. size_t suggested_size,
  88. uv_buf_t* buf);
  89. static void cl_connect_cb(uv_connect_t* req, int status);
  90. static void cl_idle_cb(uv_idle_t* handle, int status);
  91. static void cl_close_cb(uv_handle_t* handle);
  92. static struct sockaddr_in listen_addr;
  93. static void ipc_connection_cb(uv_stream_t* ipc_pipe, int status) {
  94. struct ipc_server_ctx* sc;
  95. struct ipc_peer_ctx* pc;
  96. uv_loop_t* loop;
  97. uv_buf_t buf;
  98. loop = ipc_pipe->loop;
  99. buf = uv_buf_init("PING", 4);
  100. sc = container_of(ipc_pipe, struct ipc_server_ctx, ipc_pipe);
  101. pc = calloc(1, sizeof(*pc));
  102. ASSERT(pc != NULL);
  103. if (ipc_pipe->type == UV_TCP)
  104. ASSERT(0 == uv_tcp_init(loop, (uv_tcp_t*) &pc->peer_handle));
  105. else if (ipc_pipe->type == UV_NAMED_PIPE)
  106. ASSERT(0 == uv_pipe_init(loop, (uv_pipe_t*) &pc->peer_handle, 1));
  107. else
  108. ASSERT(0);
  109. ASSERT(0 == uv_accept(ipc_pipe, (uv_stream_t*) &pc->peer_handle));
  110. ASSERT(0 == uv_write2(&pc->write_req,
  111. (uv_stream_t*) &pc->peer_handle,
  112. &buf,
  113. 1,
  114. (uv_stream_t*) &sc->server_handle,
  115. ipc_write_cb));
  116. if (--sc->num_connects == 0)
  117. uv_close((uv_handle_t*) ipc_pipe, NULL);
  118. }
  119. static void ipc_write_cb(uv_write_t* req, int status) {
  120. struct ipc_peer_ctx* ctx;
  121. ctx = container_of(req, struct ipc_peer_ctx, write_req);
  122. uv_close((uv_handle_t*) &ctx->peer_handle, ipc_close_cb);
  123. }
  124. static void ipc_close_cb(uv_handle_t* handle) {
  125. struct ipc_peer_ctx* ctx;
  126. ctx = container_of(handle, struct ipc_peer_ctx, peer_handle);
  127. free(ctx);
  128. }
  129. static void ipc_connect_cb(uv_connect_t* req, int status) {
  130. struct ipc_client_ctx* ctx;
  131. ctx = container_of(req, struct ipc_client_ctx, connect_req);
  132. ASSERT(0 == status);
  133. ASSERT(0 == uv_read2_start((uv_stream_t*) &ctx->ipc_pipe,
  134. ipc_alloc_cb,
  135. ipc_read2_cb));
  136. }
  137. static void ipc_alloc_cb(uv_handle_t* handle,
  138. size_t suggested_size,
  139. uv_buf_t* buf) {
  140. struct ipc_client_ctx* ctx;
  141. ctx = container_of(handle, struct ipc_client_ctx, ipc_pipe);
  142. buf->base = ctx->scratch;
  143. buf->len = sizeof(ctx->scratch);
  144. }
  145. static void ipc_read2_cb(uv_pipe_t* ipc_pipe,
  146. ssize_t nread,
  147. const uv_buf_t* buf,
  148. uv_handle_type type) {
  149. struct ipc_client_ctx* ctx;
  150. uv_loop_t* loop;
  151. ctx = container_of(ipc_pipe, struct ipc_client_ctx, ipc_pipe);
  152. loop = ipc_pipe->loop;
  153. if (type == UV_TCP)
  154. ASSERT(0 == uv_tcp_init(loop, (uv_tcp_t*) ctx->server_handle));
  155. else if (type == UV_NAMED_PIPE)
  156. ASSERT(0 == uv_pipe_init(loop, (uv_pipe_t*) ctx->server_handle, 0));
  157. else
  158. ASSERT(0);
  159. ASSERT(0 == uv_accept((uv_stream_t*) &ctx->ipc_pipe, ctx->server_handle));
  160. uv_close((uv_handle_t*) &ctx->ipc_pipe, NULL);
  161. }
  162. /* Set up an IPC pipe server that hands out listen sockets to the worker
  163. * threads. It's kind of cumbersome for such a simple operation, maybe we
  164. * should revive uv_import() and uv_export().
  165. */
  166. static void send_listen_handles(uv_handle_type type,
  167. unsigned int num_servers,
  168. struct server_ctx* servers) {
  169. struct ipc_server_ctx ctx;
  170. uv_loop_t* loop;
  171. unsigned int i;
  172. loop = uv_default_loop();
  173. ctx.num_connects = num_servers;
  174. if (type == UV_TCP) {
  175. ASSERT(0 == uv_tcp_init(loop, (uv_tcp_t*) &ctx.server_handle));
  176. ASSERT(0 == uv_tcp_bind((uv_tcp_t*) &ctx.server_handle,
  177. (const struct sockaddr*) &listen_addr,
  178. 0));
  179. }
  180. else
  181. ASSERT(0);
  182. ASSERT(0 == uv_pipe_init(loop, &ctx.ipc_pipe, 1));
  183. ASSERT(0 == uv_pipe_bind(&ctx.ipc_pipe, IPC_PIPE_NAME));
  184. ASSERT(0 == uv_listen((uv_stream_t*) &ctx.ipc_pipe, 128, ipc_connection_cb));
  185. for (i = 0; i < num_servers; i++)
  186. uv_sem_post(&servers[i].semaphore);
  187. ASSERT(0 == uv_run(loop, UV_RUN_DEFAULT));
  188. uv_close((uv_handle_t*) &ctx.server_handle, NULL);
  189. ASSERT(0 == uv_run(loop, UV_RUN_DEFAULT));
  190. for (i = 0; i < num_servers; i++)
  191. uv_sem_wait(&servers[i].semaphore);
  192. }
  193. static void get_listen_handle(uv_loop_t* loop, uv_stream_t* server_handle) {
  194. struct ipc_client_ctx ctx;
  195. ctx.server_handle = server_handle;
  196. ctx.server_handle->data = "server handle";
  197. ASSERT(0 == uv_pipe_init(loop, &ctx.ipc_pipe, 1));
  198. uv_pipe_connect(&ctx.connect_req,
  199. &ctx.ipc_pipe,
  200. IPC_PIPE_NAME,
  201. ipc_connect_cb);
  202. ASSERT(0 == uv_run(loop, UV_RUN_DEFAULT));
  203. }
  204. static void server_cb(void *arg) {
  205. struct server_ctx *ctx;
  206. uv_loop_t* loop;
  207. ctx = arg;
  208. loop = uv_loop_new();
  209. ASSERT(loop != NULL);
  210. ASSERT(0 == uv_async_init(loop, &ctx->async_handle, sv_async_cb));
  211. uv_unref((uv_handle_t*) &ctx->async_handle);
  212. /* Wait until the main thread is ready. */
  213. uv_sem_wait(&ctx->semaphore);
  214. get_listen_handle(loop, (uv_stream_t*) &ctx->server_handle);
  215. uv_sem_post(&ctx->semaphore);
  216. /* Now start the actual benchmark. */
  217. ASSERT(0 == uv_listen((uv_stream_t*) &ctx->server_handle,
  218. 128,
  219. sv_connection_cb));
  220. ASSERT(0 == uv_run(loop, UV_RUN_DEFAULT));
  221. uv_loop_delete(loop);
  222. }
  223. static void sv_async_cb(uv_async_t* handle, int status) {
  224. struct server_ctx* ctx;
  225. ctx = container_of(handle, struct server_ctx, async_handle);
  226. uv_close((uv_handle_t*) &ctx->server_handle, NULL);
  227. uv_close((uv_handle_t*) &ctx->async_handle, NULL);
  228. }
  229. static void sv_connection_cb(uv_stream_t* server_handle, int status) {
  230. handle_storage_t* storage;
  231. struct server_ctx* ctx;
  232. ctx = container_of(server_handle, struct server_ctx, server_handle);
  233. ASSERT(status == 0);
  234. storage = malloc(sizeof(*storage));
  235. ASSERT(storage != NULL);
  236. if (server_handle->type == UV_TCP)
  237. ASSERT(0 == uv_tcp_init(server_handle->loop, (uv_tcp_t*) storage));
  238. else if (server_handle->type == UV_NAMED_PIPE)
  239. ASSERT(0 == uv_pipe_init(server_handle->loop, (uv_pipe_t*) storage, 0));
  240. else
  241. ASSERT(0);
  242. ASSERT(0 == uv_accept(server_handle, (uv_stream_t*) storage));
  243. ASSERT(0 == uv_read_start((uv_stream_t*) storage, sv_alloc_cb, sv_read_cb));
  244. ctx->num_connects++;
  245. }
  246. static void sv_alloc_cb(uv_handle_t* handle,
  247. size_t suggested_size,
  248. uv_buf_t* buf) {
  249. static char slab[32];
  250. buf->base = slab;
  251. buf->len = sizeof(slab);
  252. }
  253. static void sv_read_cb(uv_stream_t* handle,
  254. ssize_t nread,
  255. const uv_buf_t* buf) {
  256. ASSERT(nread == UV_EOF);
  257. uv_close((uv_handle_t*) handle, (uv_close_cb) free);
  258. }
  259. static void cl_connect_cb(uv_connect_t* req, int status) {
  260. struct client_ctx* ctx = container_of(req, struct client_ctx, connect_req);
  261. uv_idle_start(&ctx->idle_handle, cl_idle_cb);
  262. ASSERT(0 == status);
  263. }
  264. static void cl_idle_cb(uv_idle_t* handle, int status) {
  265. struct client_ctx* ctx = container_of(handle, struct client_ctx, idle_handle);
  266. uv_close((uv_handle_t*) &ctx->client_handle, cl_close_cb);
  267. uv_idle_stop(&ctx->idle_handle);
  268. }
  269. static void cl_close_cb(uv_handle_t* handle) {
  270. struct client_ctx* ctx;
  271. ctx = container_of(handle, struct client_ctx, client_handle);
  272. if (--ctx->num_connects == 0) {
  273. uv_close((uv_handle_t*) &ctx->idle_handle, NULL);
  274. return;
  275. }
  276. ASSERT(0 == uv_tcp_init(handle->loop, (uv_tcp_t*) &ctx->client_handle));
  277. ASSERT(0 == uv_tcp_connect(&ctx->connect_req,
  278. (uv_tcp_t*) &ctx->client_handle,
  279. (const struct sockaddr*) &listen_addr,
  280. cl_connect_cb));
  281. }
  282. static int test_tcp(unsigned int num_servers, unsigned int num_clients) {
  283. struct server_ctx* servers;
  284. struct client_ctx* clients;
  285. uv_loop_t* loop;
  286. uv_tcp_t* handle;
  287. unsigned int i;
  288. double time;
  289. ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &listen_addr));
  290. loop = uv_default_loop();
  291. servers = calloc(num_servers, sizeof(servers[0]));
  292. clients = calloc(num_clients, sizeof(clients[0]));
  293. ASSERT(servers != NULL);
  294. ASSERT(clients != NULL);
  295. /* We're making the assumption here that from the perspective of the
  296. * OS scheduler, threads are functionally equivalent to and interchangeable
  297. * with full-blown processes.
  298. */
  299. for (i = 0; i < num_servers; i++) {
  300. struct server_ctx* ctx = servers + i;
  301. ASSERT(0 == uv_sem_init(&ctx->semaphore, 0));
  302. ASSERT(0 == uv_thread_create(&ctx->thread_id, server_cb, ctx));
  303. }
  304. send_listen_handles(UV_TCP, num_servers, servers);
  305. for (i = 0; i < num_clients; i++) {
  306. struct client_ctx* ctx = clients + i;
  307. ctx->num_connects = NUM_CONNECTS / num_clients;
  308. handle = (uv_tcp_t*) &ctx->client_handle;
  309. handle->data = "client handle";
  310. ASSERT(0 == uv_tcp_init(loop, handle));
  311. ASSERT(0 == uv_tcp_connect(&ctx->connect_req,
  312. handle,
  313. (const struct sockaddr*) &listen_addr,
  314. cl_connect_cb));
  315. ASSERT(0 == uv_idle_init(loop, &ctx->idle_handle));
  316. }
  317. {
  318. uint64_t t = uv_hrtime();
  319. ASSERT(0 == uv_run(loop, UV_RUN_DEFAULT));
  320. t = uv_hrtime() - t;
  321. time = t / 1e9;
  322. }
  323. for (i = 0; i < num_servers; i++) {
  324. struct server_ctx* ctx = servers + i;
  325. uv_async_send(&ctx->async_handle);
  326. ASSERT(0 == uv_thread_join(&ctx->thread_id));
  327. uv_sem_destroy(&ctx->semaphore);
  328. }
  329. printf("accept%u: %.0f accepts/sec (%u total)\n",
  330. num_servers,
  331. NUM_CONNECTS / time,
  332. NUM_CONNECTS);
  333. for (i = 0; i < num_servers; i++) {
  334. struct server_ctx* ctx = servers + i;
  335. printf(" thread #%u: %.0f accepts/sec (%u total, %.1f%%)\n",
  336. i,
  337. ctx->num_connects / time,
  338. ctx->num_connects,
  339. ctx->num_connects * 100.0 / NUM_CONNECTS);
  340. }
  341. free(clients);
  342. free(servers);
  343. MAKE_VALGRIND_HAPPY();
  344. return 0;
  345. }
  346. BENCHMARK_IMPL(tcp_multi_accept2) {
  347. return test_tcp(2, 40);
  348. }
  349. BENCHMARK_IMPL(tcp_multi_accept4) {
  350. return test_tcp(4, 40);
  351. }
  352. BENCHMARK_IMPL(tcp_multi_accept8) {
  353. return test_tcp(8, 40);
  354. }