123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445 |
- /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to
- * deal in the Software without restriction, including without limitation the
- * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
- * sell copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
- * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
- * IN THE SOFTWARE.
- */
- #include "task.h"
- #include "uv.h"
- #define IPC_PIPE_NAME TEST_PIPENAME
- #define NUM_CONNECTS (250 * 1000)
- union stream_handle {
- uv_pipe_t pipe;
- uv_tcp_t tcp;
- };
- /* Use as (uv_stream_t *) &handle_storage -- it's kind of clunky but it
- * avoids aliasing warnings.
- */
- typedef unsigned char handle_storage_t[sizeof(union stream_handle)];
- /* Used for passing around the listen handle, not part of the benchmark proper.
- * We have an overabundance of server types here. It works like this:
- *
- * 1. The main thread starts an IPC pipe server.
- * 2. The worker threads connect to the IPC server and obtain a listen handle.
- * 3. The worker threads start accepting requests on the listen handle.
- * 4. The main thread starts connecting repeatedly.
- *
- * Step #4 should perhaps be farmed out over several threads.
- */
- struct ipc_server_ctx {
- handle_storage_t server_handle;
- unsigned int num_connects;
- uv_pipe_t ipc_pipe;
- };
- struct ipc_peer_ctx {
- handle_storage_t peer_handle;
- uv_write_t write_req;
- };
- struct ipc_client_ctx {
- uv_connect_t connect_req;
- uv_stream_t* server_handle;
- uv_pipe_t ipc_pipe;
- char scratch[16];
- };
- /* Used in the actual benchmark. */
- struct server_ctx {
- handle_storage_t server_handle;
- unsigned int num_connects;
- uv_async_t async_handle;
- uv_thread_t thread_id;
- uv_sem_t semaphore;
- };
- struct client_ctx {
- handle_storage_t client_handle;
- unsigned int num_connects;
- uv_connect_t connect_req;
- uv_idle_t idle_handle;
- };
- static void ipc_connection_cb(uv_stream_t* ipc_pipe, int status);
- static void ipc_write_cb(uv_write_t* req, int status);
- static void ipc_close_cb(uv_handle_t* handle);
- static void ipc_connect_cb(uv_connect_t* req, int status);
- static void ipc_read2_cb(uv_pipe_t* ipc_pipe,
- ssize_t nread,
- const uv_buf_t* buf,
- uv_handle_type type);
- static void ipc_alloc_cb(uv_handle_t* handle,
- size_t suggested_size,
- uv_buf_t* buf);
- static void sv_async_cb(uv_async_t* handle, int status);
- static void sv_connection_cb(uv_stream_t* server_handle, int status);
- static void sv_read_cb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf);
- static void sv_alloc_cb(uv_handle_t* handle,
- size_t suggested_size,
- uv_buf_t* buf);
- static void cl_connect_cb(uv_connect_t* req, int status);
- static void cl_idle_cb(uv_idle_t* handle, int status);
- static void cl_close_cb(uv_handle_t* handle);
- static struct sockaddr_in listen_addr;
- static void ipc_connection_cb(uv_stream_t* ipc_pipe, int status) {
- struct ipc_server_ctx* sc;
- struct ipc_peer_ctx* pc;
- uv_loop_t* loop;
- uv_buf_t buf;
- loop = ipc_pipe->loop;
- buf = uv_buf_init("PING", 4);
- sc = container_of(ipc_pipe, struct ipc_server_ctx, ipc_pipe);
- pc = calloc(1, sizeof(*pc));
- ASSERT(pc != NULL);
- if (ipc_pipe->type == UV_TCP)
- ASSERT(0 == uv_tcp_init(loop, (uv_tcp_t*) &pc->peer_handle));
- else if (ipc_pipe->type == UV_NAMED_PIPE)
- ASSERT(0 == uv_pipe_init(loop, (uv_pipe_t*) &pc->peer_handle, 1));
- else
- ASSERT(0);
- ASSERT(0 == uv_accept(ipc_pipe, (uv_stream_t*) &pc->peer_handle));
- ASSERT(0 == uv_write2(&pc->write_req,
- (uv_stream_t*) &pc->peer_handle,
- &buf,
- 1,
- (uv_stream_t*) &sc->server_handle,
- ipc_write_cb));
- if (--sc->num_connects == 0)
- uv_close((uv_handle_t*) ipc_pipe, NULL);
- }
- static void ipc_write_cb(uv_write_t* req, int status) {
- struct ipc_peer_ctx* ctx;
- ctx = container_of(req, struct ipc_peer_ctx, write_req);
- uv_close((uv_handle_t*) &ctx->peer_handle, ipc_close_cb);
- }
- static void ipc_close_cb(uv_handle_t* handle) {
- struct ipc_peer_ctx* ctx;
- ctx = container_of(handle, struct ipc_peer_ctx, peer_handle);
- free(ctx);
- }
- static void ipc_connect_cb(uv_connect_t* req, int status) {
- struct ipc_client_ctx* ctx;
- ctx = container_of(req, struct ipc_client_ctx, connect_req);
- ASSERT(0 == status);
- ASSERT(0 == uv_read2_start((uv_stream_t*) &ctx->ipc_pipe,
- ipc_alloc_cb,
- ipc_read2_cb));
- }
- static void ipc_alloc_cb(uv_handle_t* handle,
- size_t suggested_size,
- uv_buf_t* buf) {
- struct ipc_client_ctx* ctx;
- ctx = container_of(handle, struct ipc_client_ctx, ipc_pipe);
- buf->base = ctx->scratch;
- buf->len = sizeof(ctx->scratch);
- }
- static void ipc_read2_cb(uv_pipe_t* ipc_pipe,
- ssize_t nread,
- const uv_buf_t* buf,
- uv_handle_type type) {
- struct ipc_client_ctx* ctx;
- uv_loop_t* loop;
- ctx = container_of(ipc_pipe, struct ipc_client_ctx, ipc_pipe);
- loop = ipc_pipe->loop;
- if (type == UV_TCP)
- ASSERT(0 == uv_tcp_init(loop, (uv_tcp_t*) ctx->server_handle));
- else if (type == UV_NAMED_PIPE)
- ASSERT(0 == uv_pipe_init(loop, (uv_pipe_t*) ctx->server_handle, 0));
- else
- ASSERT(0);
- ASSERT(0 == uv_accept((uv_stream_t*) &ctx->ipc_pipe, ctx->server_handle));
- uv_close((uv_handle_t*) &ctx->ipc_pipe, NULL);
- }
- /* Set up an IPC pipe server that hands out listen sockets to the worker
- * threads. It's kind of cumbersome for such a simple operation, maybe we
- * should revive uv_import() and uv_export().
- */
- static void send_listen_handles(uv_handle_type type,
- unsigned int num_servers,
- struct server_ctx* servers) {
- struct ipc_server_ctx ctx;
- uv_loop_t* loop;
- unsigned int i;
- loop = uv_default_loop();
- ctx.num_connects = num_servers;
- if (type == UV_TCP) {
- ASSERT(0 == uv_tcp_init(loop, (uv_tcp_t*) &ctx.server_handle));
- ASSERT(0 == uv_tcp_bind((uv_tcp_t*) &ctx.server_handle,
- (const struct sockaddr*) &listen_addr,
- 0));
- }
- else
- ASSERT(0);
- ASSERT(0 == uv_pipe_init(loop, &ctx.ipc_pipe, 1));
- ASSERT(0 == uv_pipe_bind(&ctx.ipc_pipe, IPC_PIPE_NAME));
- ASSERT(0 == uv_listen((uv_stream_t*) &ctx.ipc_pipe, 128, ipc_connection_cb));
- for (i = 0; i < num_servers; i++)
- uv_sem_post(&servers[i].semaphore);
- ASSERT(0 == uv_run(loop, UV_RUN_DEFAULT));
- uv_close((uv_handle_t*) &ctx.server_handle, NULL);
- ASSERT(0 == uv_run(loop, UV_RUN_DEFAULT));
- for (i = 0; i < num_servers; i++)
- uv_sem_wait(&servers[i].semaphore);
- }
- static void get_listen_handle(uv_loop_t* loop, uv_stream_t* server_handle) {
- struct ipc_client_ctx ctx;
- ctx.server_handle = server_handle;
- ctx.server_handle->data = "server handle";
- ASSERT(0 == uv_pipe_init(loop, &ctx.ipc_pipe, 1));
- uv_pipe_connect(&ctx.connect_req,
- &ctx.ipc_pipe,
- IPC_PIPE_NAME,
- ipc_connect_cb);
- ASSERT(0 == uv_run(loop, UV_RUN_DEFAULT));
- }
- static void server_cb(void *arg) {
- struct server_ctx *ctx;
- uv_loop_t* loop;
- ctx = arg;
- loop = uv_loop_new();
- ASSERT(loop != NULL);
- ASSERT(0 == uv_async_init(loop, &ctx->async_handle, sv_async_cb));
- uv_unref((uv_handle_t*) &ctx->async_handle);
- /* Wait until the main thread is ready. */
- uv_sem_wait(&ctx->semaphore);
- get_listen_handle(loop, (uv_stream_t*) &ctx->server_handle);
- uv_sem_post(&ctx->semaphore);
- /* Now start the actual benchmark. */
- ASSERT(0 == uv_listen((uv_stream_t*) &ctx->server_handle,
- 128,
- sv_connection_cb));
- ASSERT(0 == uv_run(loop, UV_RUN_DEFAULT));
- uv_loop_delete(loop);
- }
- static void sv_async_cb(uv_async_t* handle, int status) {
- struct server_ctx* ctx;
- ctx = container_of(handle, struct server_ctx, async_handle);
- uv_close((uv_handle_t*) &ctx->server_handle, NULL);
- uv_close((uv_handle_t*) &ctx->async_handle, NULL);
- }
- static void sv_connection_cb(uv_stream_t* server_handle, int status) {
- handle_storage_t* storage;
- struct server_ctx* ctx;
- ctx = container_of(server_handle, struct server_ctx, server_handle);
- ASSERT(status == 0);
- storage = malloc(sizeof(*storage));
- ASSERT(storage != NULL);
- if (server_handle->type == UV_TCP)
- ASSERT(0 == uv_tcp_init(server_handle->loop, (uv_tcp_t*) storage));
- else if (server_handle->type == UV_NAMED_PIPE)
- ASSERT(0 == uv_pipe_init(server_handle->loop, (uv_pipe_t*) storage, 0));
- else
- ASSERT(0);
- ASSERT(0 == uv_accept(server_handle, (uv_stream_t*) storage));
- ASSERT(0 == uv_read_start((uv_stream_t*) storage, sv_alloc_cb, sv_read_cb));
- ctx->num_connects++;
- }
- static void sv_alloc_cb(uv_handle_t* handle,
- size_t suggested_size,
- uv_buf_t* buf) {
- static char slab[32];
- buf->base = slab;
- buf->len = sizeof(slab);
- }
- static void sv_read_cb(uv_stream_t* handle,
- ssize_t nread,
- const uv_buf_t* buf) {
- ASSERT(nread == UV_EOF);
- uv_close((uv_handle_t*) handle, (uv_close_cb) free);
- }
- static void cl_connect_cb(uv_connect_t* req, int status) {
- struct client_ctx* ctx = container_of(req, struct client_ctx, connect_req);
- uv_idle_start(&ctx->idle_handle, cl_idle_cb);
- ASSERT(0 == status);
- }
- static void cl_idle_cb(uv_idle_t* handle, int status) {
- struct client_ctx* ctx = container_of(handle, struct client_ctx, idle_handle);
- uv_close((uv_handle_t*) &ctx->client_handle, cl_close_cb);
- uv_idle_stop(&ctx->idle_handle);
- }
- static void cl_close_cb(uv_handle_t* handle) {
- struct client_ctx* ctx;
- ctx = container_of(handle, struct client_ctx, client_handle);
- if (--ctx->num_connects == 0) {
- uv_close((uv_handle_t*) &ctx->idle_handle, NULL);
- return;
- }
- ASSERT(0 == uv_tcp_init(handle->loop, (uv_tcp_t*) &ctx->client_handle));
- ASSERT(0 == uv_tcp_connect(&ctx->connect_req,
- (uv_tcp_t*) &ctx->client_handle,
- (const struct sockaddr*) &listen_addr,
- cl_connect_cb));
- }
- static int test_tcp(unsigned int num_servers, unsigned int num_clients) {
- struct server_ctx* servers;
- struct client_ctx* clients;
- uv_loop_t* loop;
- uv_tcp_t* handle;
- unsigned int i;
- double time;
- ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &listen_addr));
- loop = uv_default_loop();
- servers = calloc(num_servers, sizeof(servers[0]));
- clients = calloc(num_clients, sizeof(clients[0]));
- ASSERT(servers != NULL);
- ASSERT(clients != NULL);
- /* We're making the assumption here that from the perspective of the
- * OS scheduler, threads are functionally equivalent to and interchangeable
- * with full-blown processes.
- */
- for (i = 0; i < num_servers; i++) {
- struct server_ctx* ctx = servers + i;
- ASSERT(0 == uv_sem_init(&ctx->semaphore, 0));
- ASSERT(0 == uv_thread_create(&ctx->thread_id, server_cb, ctx));
- }
- send_listen_handles(UV_TCP, num_servers, servers);
- for (i = 0; i < num_clients; i++) {
- struct client_ctx* ctx = clients + i;
- ctx->num_connects = NUM_CONNECTS / num_clients;
- handle = (uv_tcp_t*) &ctx->client_handle;
- handle->data = "client handle";
- ASSERT(0 == uv_tcp_init(loop, handle));
- ASSERT(0 == uv_tcp_connect(&ctx->connect_req,
- handle,
- (const struct sockaddr*) &listen_addr,
- cl_connect_cb));
- ASSERT(0 == uv_idle_init(loop, &ctx->idle_handle));
- }
- {
- uint64_t t = uv_hrtime();
- ASSERT(0 == uv_run(loop, UV_RUN_DEFAULT));
- t = uv_hrtime() - t;
- time = t / 1e9;
- }
- for (i = 0; i < num_servers; i++) {
- struct server_ctx* ctx = servers + i;
- uv_async_send(&ctx->async_handle);
- ASSERT(0 == uv_thread_join(&ctx->thread_id));
- uv_sem_destroy(&ctx->semaphore);
- }
- printf("accept%u: %.0f accepts/sec (%u total)\n",
- num_servers,
- NUM_CONNECTS / time,
- NUM_CONNECTS);
- for (i = 0; i < num_servers; i++) {
- struct server_ctx* ctx = servers + i;
- printf(" thread #%u: %.0f accepts/sec (%u total, %.1f%%)\n",
- i,
- ctx->num_connects / time,
- ctx->num_connects,
- ctx->num_connects * 100.0 / NUM_CONNECTS);
- }
- free(clients);
- free(servers);
- MAKE_VALGRIND_HAPPY();
- return 0;
- }
- BENCHMARK_IMPL(tcp_multi_accept2) {
- return test_tcp(2, 40);
- }
- BENCHMARK_IMPL(tcp_multi_accept4) {
- return test_tcp(4, 40);
- }
- BENCHMARK_IMPL(tcp_multi_accept8) {
- return test_tcp(8, 40);
- }
|