123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581 |
- /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to
- * deal in the Software without restriction, including without limitation the
- * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
- * sell copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
- * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
- * IN THE SOFTWARE.
- */
- #include <errno.h>
- #ifndef _WIN32
- # include <fcntl.h>
- # include <sys/socket.h>
- # include <unistd.h>
- #endif
- #include "uv.h"
- #include "task.h"
- #define NUM_CLIENTS 5
- #define TRANSFER_BYTES (1 << 16)
- #undef MIN
- #define MIN(a, b) (((a) < (b)) ? (a) : (b));
- typedef enum {
- UNIDIRECTIONAL,
- DUPLEX
- } test_mode_t;
- typedef struct connection_context_s {
- uv_poll_t poll_handle;
- uv_timer_t timer_handle;
- uv_os_sock_t sock;
- size_t read, sent;
- int is_server_connection;
- int open_handles;
- int got_fin, sent_fin;
- unsigned int events, delayed_events;
- } connection_context_t;
- typedef struct server_context_s {
- uv_poll_t poll_handle;
- uv_os_sock_t sock;
- int connections;
- } server_context_t;
- static void delay_timer_cb(uv_timer_t* timer, int status);
- static test_mode_t test_mode = DUPLEX;
- static int closed_connections = 0;
- static int valid_writable_wakeups = 0;
- static int spurious_writable_wakeups = 0;
- static int got_eagain(void) {
- #ifdef _WIN32
- return WSAGetLastError() == WSAEWOULDBLOCK;
- #else
- return errno == EAGAIN
- || errno == EINPROGRESS
- #ifdef EWOULDBLOCK
- || errno == EWOULDBLOCK;
- #endif
- ;
- #endif
- }
- static void set_nonblocking(uv_os_sock_t sock) {
- int r;
- #ifdef _WIN32
- unsigned long on = 1;
- r = ioctlsocket(sock, FIONBIO, &on);
- ASSERT(r == 0);
- #else
- int flags = fcntl(sock, F_GETFL, 0);
- ASSERT(flags >= 0);
- r = fcntl(sock, F_SETFL, flags | O_NONBLOCK);
- ASSERT(r >= 0);
- #endif
- }
- static uv_os_sock_t create_nonblocking_bound_socket(
- struct sockaddr_in bind_addr) {
- uv_os_sock_t sock;
- int r;
- sock = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
- #ifdef _WIN32
- ASSERT(sock != INVALID_SOCKET);
- #else
- ASSERT(sock >= 0);
- #endif
- set_nonblocking(sock);
- #ifndef _WIN32
- {
- /* Allow reuse of the port. */
- int yes = 1;
- r = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes);
- ASSERT(r == 0);
- }
- #endif
- r = bind(sock, (const struct sockaddr*) &bind_addr, sizeof bind_addr);
- ASSERT(r == 0);
- return sock;
- }
- static void close_socket(uv_os_sock_t sock) {
- int r;
- #ifdef _WIN32
- r = closesocket(sock);
- #else
- r = close(sock);
- #endif
- ASSERT(r == 0);
- }
- static connection_context_t* create_connection_context(
- uv_os_sock_t sock, int is_server_connection) {
- int r;
- connection_context_t* context;
- context = (connection_context_t*) malloc(sizeof *context);
- ASSERT(context != NULL);
- context->sock = sock;
- context->is_server_connection = is_server_connection;
- context->read = 0;
- context->sent = 0;
- context->open_handles = 0;
- context->events = 0;
- context->delayed_events = 0;
- context->got_fin = 0;
- context->sent_fin = 0;
- r = uv_poll_init_socket(uv_default_loop(), &context->poll_handle, sock);
- context->open_handles++;
- context->poll_handle.data = context;
- ASSERT(r == 0);
- r = uv_timer_init(uv_default_loop(), &context->timer_handle);
- context->open_handles++;
- context->timer_handle.data = context;
- ASSERT(r == 0);
- return context;
- }
- static void connection_close_cb(uv_handle_t* handle) {
- connection_context_t* context = (connection_context_t*) handle->data;
- if (--context->open_handles == 0) {
- if (test_mode == DUPLEX || context->is_server_connection) {
- ASSERT(context->read == TRANSFER_BYTES);
- } else {
- ASSERT(context->read == 0);
- }
- if (test_mode == DUPLEX || !context->is_server_connection) {
- ASSERT(context->sent == TRANSFER_BYTES);
- } else {
- ASSERT(context->sent == 0);
- }
- closed_connections++;
- free(context);
- }
- }
- static void destroy_connection_context(connection_context_t* context) {
- uv_close((uv_handle_t*) &context->poll_handle, connection_close_cb);
- uv_close((uv_handle_t*) &context->timer_handle, connection_close_cb);
- }
- static void connection_poll_cb(uv_poll_t* handle, int status, int events) {
- connection_context_t* context = (connection_context_t*) handle->data;
- unsigned int new_events;
- int r;
- ASSERT(status == 0);
- ASSERT(events & context->events);
- ASSERT(!(events & ~context->events));
- new_events = context->events;
- if (events & UV_READABLE) {
- int action = rand() % 7;
- switch (action) {
- case 0:
- case 1: {
- /* Read a couple of bytes. */
- static char buffer[74];
- r = recv(context->sock, buffer, sizeof buffer, 0);
- ASSERT(r >= 0);
- if (r > 0) {
- context->read += r;
- } else {
- /* Got FIN. */
- context->got_fin = 1;
- new_events &= ~UV_READABLE;
- }
- break;
- }
- case 2:
- case 3: {
- /* Read until EAGAIN. */
- static char buffer[931];
- r = recv(context->sock, buffer, sizeof buffer, 0);
- ASSERT(r >= 0);
- while (r > 0) {
- context->read += r;
- r = recv(context->sock, buffer, sizeof buffer, 0);
- }
- if (r == 0) {
- /* Got FIN. */
- context->got_fin = 1;
- new_events &= ~UV_READABLE;
- } else {
- ASSERT(got_eagain());
- }
- break;
- }
- case 4:
- /* Ignore. */
- break;
- case 5:
- /* Stop reading for a while. Restart in timer callback. */
- new_events &= ~UV_READABLE;
- if (!uv_is_active((uv_handle_t*) &context->timer_handle)) {
- context->delayed_events = UV_READABLE;
- uv_timer_start(&context->timer_handle, delay_timer_cb, 10, 0);
- } else {
- context->delayed_events |= UV_READABLE;
- }
- break;
- case 6:
- /* Fudge with the event mask. */
- uv_poll_start(&context->poll_handle, UV_WRITABLE, connection_poll_cb);
- uv_poll_start(&context->poll_handle, UV_READABLE, connection_poll_cb);
- context->events = UV_READABLE;
- break;
- default:
- ASSERT(0);
- }
- }
- if (events & UV_WRITABLE) {
- if (context->sent < TRANSFER_BYTES &&
- !(test_mode == UNIDIRECTIONAL && context->is_server_connection)) {
- /* We have to send more bytes. */
- int action = rand() % 7;
- switch (action) {
- case 0:
- case 1: {
- /* Send a couple of bytes. */
- static char buffer[103];
- int send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer);
- ASSERT(send_bytes > 0);
- r = send(context->sock, buffer, send_bytes, 0);
- if (r < 0) {
- ASSERT(got_eagain());
- spurious_writable_wakeups++;
- break;
- }
- ASSERT(r > 0);
- context->sent += r;
- valid_writable_wakeups++;
- break;
- }
- case 2:
- case 3: {
- /* Send until EAGAIN. */
- static char buffer[1234];
- int send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer);
- ASSERT(send_bytes > 0);
- r = send(context->sock, buffer, send_bytes, 0);
- if (r < 0) {
- ASSERT(got_eagain());
- spurious_writable_wakeups++;
- break;
- }
- ASSERT(r > 0);
- valid_writable_wakeups++;
- context->sent += r;
- while (context->sent < TRANSFER_BYTES) {
- send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer);
- ASSERT(send_bytes > 0);
- r = send(context->sock, buffer, send_bytes, 0);
- if (r <= 0) break;
- context->sent += r;
- }
- ASSERT(r > 0 || got_eagain());
- break;
- }
- case 4:
- /* Ignore. */
- break;
- case 5:
- /* Stop sending for a while. Restart in timer callback. */
- new_events &= ~UV_WRITABLE;
- if (!uv_is_active((uv_handle_t*) &context->timer_handle)) {
- context->delayed_events = UV_WRITABLE;
- uv_timer_start(&context->timer_handle, delay_timer_cb, 100, 0);
- } else {
- context->delayed_events |= UV_WRITABLE;
- }
- break;
- case 6:
- /* Fudge with the event mask. */
- uv_poll_start(&context->poll_handle,
- UV_READABLE,
- connection_poll_cb);
- uv_poll_start(&context->poll_handle,
- UV_WRITABLE,
- connection_poll_cb);
- context->events = UV_WRITABLE;
- break;
- default:
- ASSERT(0);
- }
- } else {
- /* Nothing more to write. Send FIN. */
- int r;
- #ifdef _WIN32
- r = shutdown(context->sock, SD_SEND);
- #else
- r = shutdown(context->sock, SHUT_WR);
- #endif
- ASSERT(r == 0);
- context->sent_fin = 1;
- new_events &= ~UV_WRITABLE;
- }
- }
- if (context->got_fin && context->sent_fin) {
- /* Sent and received FIN. Close and destroy context. */
- close_socket(context->sock);
- destroy_connection_context(context);
- context->events = 0;
- } else if (new_events != context->events) {
- /* Poll mask changed. Call uv_poll_start again. */
- context->events = new_events;
- uv_poll_start(handle, new_events, connection_poll_cb);
- }
- /* Assert that uv_is_active works correctly for poll handles. */
- if (context->events != 0) {
- ASSERT(1 == uv_is_active((uv_handle_t*) handle));
- } else {
- ASSERT(0 == uv_is_active((uv_handle_t*) handle));
- }
- }
- static void delay_timer_cb(uv_timer_t* timer, int status) {
- connection_context_t* context = (connection_context_t*) timer->data;
- int r;
- /* Timer should auto stop. */
- ASSERT(0 == uv_is_active((uv_handle_t*) timer));
- /* Add the requested events to the poll mask. */
- ASSERT(context->delayed_events != 0);
- context->events |= context->delayed_events;
- context->delayed_events = 0;
- r = uv_poll_start(&context->poll_handle,
- context->events,
- connection_poll_cb);
- ASSERT(r == 0);
- }
- static server_context_t* create_server_context(
- uv_os_sock_t sock) {
- int r;
- server_context_t* context;
- context = (server_context_t*) malloc(sizeof *context);
- ASSERT(context != NULL);
- context->sock = sock;
- context->connections = 0;
- r = uv_poll_init_socket(uv_default_loop(), &context->poll_handle, sock);
- context->poll_handle.data = context;
- ASSERT(r == 0);
- return context;
- }
- static void server_close_cb(uv_handle_t* handle) {
- server_context_t* context = (server_context_t*) handle->data;
- free(context);
- }
- static void destroy_server_context(server_context_t* context) {
- uv_close((uv_handle_t*) &context->poll_handle, server_close_cb);
- }
- static void server_poll_cb(uv_poll_t* handle, int status, int events) {
- server_context_t* server_context = (server_context_t*)
- handle->data;
- connection_context_t* connection_context;
- struct sockaddr_in addr;
- socklen_t addr_len;
- uv_os_sock_t sock;
- int r;
- addr_len = sizeof addr;
- sock = accept(server_context->sock, (struct sockaddr*) &addr, &addr_len);
- #ifdef _WIN32
- ASSERT(sock != INVALID_SOCKET);
- #else
- ASSERT(sock >= 0);
- #endif
- set_nonblocking(sock);
- connection_context = create_connection_context(sock, 1);
- connection_context->events = UV_READABLE | UV_WRITABLE;
- r = uv_poll_start(&connection_context->poll_handle,
- UV_READABLE | UV_WRITABLE,
- connection_poll_cb);
- ASSERT(r == 0);
- if (++server_context->connections == NUM_CLIENTS) {
- close_socket(server_context->sock);
- destroy_server_context(server_context);
- }
- }
- static void start_server(void) {
- server_context_t* context;
- struct sockaddr_in addr;
- uv_os_sock_t sock;
- int r;
- ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
- sock = create_nonblocking_bound_socket(addr);
- context = create_server_context(sock);
- r = listen(sock, 100);
- ASSERT(r == 0);
- r = uv_poll_start(&context->poll_handle, UV_READABLE, server_poll_cb);
- ASSERT(r == 0);
- }
- static void start_client(void) {
- uv_os_sock_t sock;
- connection_context_t* context;
- struct sockaddr_in server_addr;
- struct sockaddr_in addr;
- int r;
- ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &server_addr));
- ASSERT(0 == uv_ip4_addr("0.0.0.0", 0, &addr));
- sock = create_nonblocking_bound_socket(addr);
- context = create_connection_context(sock, 0);
- context->events = UV_READABLE | UV_WRITABLE;
- r = uv_poll_start(&context->poll_handle,
- UV_READABLE | UV_WRITABLE,
- connection_poll_cb);
- ASSERT(r == 0);
- r = connect(sock, (struct sockaddr*) &server_addr, sizeof server_addr);
- ASSERT(r == 0 || got_eagain());
- }
- static void start_poll_test(void) {
- int i, r;
- #ifdef _WIN32
- {
- struct WSAData wsa_data;
- int r = WSAStartup(MAKEWORD(2, 2), &wsa_data);
- ASSERT(r == 0);
- }
- #endif
- start_server();
- for (i = 0; i < NUM_CLIENTS; i++)
- start_client();
- r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
- ASSERT(r == 0);
- /* Assert that at most five percent of the writable wakeups was spurious. */
- ASSERT(spurious_writable_wakeups == 0 ||
- (valid_writable_wakeups + spurious_writable_wakeups) /
- spurious_writable_wakeups > 20);
- ASSERT(closed_connections == NUM_CLIENTS * 2);
- MAKE_VALGRIND_HAPPY();
- }
- TEST_IMPL(poll_duplex) {
- test_mode = DUPLEX;
- start_poll_test();
- return 0;
- }
- TEST_IMPL(poll_unidirectional) {
- test_mode = UNIDIRECTIONAL;
- start_poll_test();
- return 0;
- }
|