test-poll.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581
  1. /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
  2. *
  3. * Permission is hereby granted, free of charge, to any person obtaining a copy
  4. * of this software and associated documentation files (the "Software"), to
  5. * deal in the Software without restriction, including without limitation the
  6. * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  7. * sell copies of the Software, and to permit persons to whom the Software is
  8. * furnished to do so, subject to the following conditions:
  9. *
  10. * The above copyright notice and this permission notice shall be included in
  11. * all copies or substantial portions of the Software.
  12. *
  13. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  14. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  15. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  16. * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  17. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  18. * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  19. * IN THE SOFTWARE.
  20. */
  21. #include <errno.h>
  22. #ifndef _WIN32
  23. # include <fcntl.h>
  24. # include <sys/socket.h>
  25. # include <unistd.h>
  26. #endif
  27. #include "uv.h"
  28. #include "task.h"
  29. #define NUM_CLIENTS 5
  30. #define TRANSFER_BYTES (1 << 16)
  31. #undef MIN
  32. #define MIN(a, b) (((a) < (b)) ? (a) : (b));
  33. typedef enum {
  34. UNIDIRECTIONAL,
  35. DUPLEX
  36. } test_mode_t;
  37. typedef struct connection_context_s {
  38. uv_poll_t poll_handle;
  39. uv_timer_t timer_handle;
  40. uv_os_sock_t sock;
  41. size_t read, sent;
  42. int is_server_connection;
  43. int open_handles;
  44. int got_fin, sent_fin;
  45. unsigned int events, delayed_events;
  46. } connection_context_t;
  47. typedef struct server_context_s {
  48. uv_poll_t poll_handle;
  49. uv_os_sock_t sock;
  50. int connections;
  51. } server_context_t;
  52. static void delay_timer_cb(uv_timer_t* timer, int status);
  53. static test_mode_t test_mode = DUPLEX;
  54. static int closed_connections = 0;
  55. static int valid_writable_wakeups = 0;
  56. static int spurious_writable_wakeups = 0;
  57. static int got_eagain(void) {
  58. #ifdef _WIN32
  59. return WSAGetLastError() == WSAEWOULDBLOCK;
  60. #else
  61. return errno == EAGAIN
  62. || errno == EINPROGRESS
  63. #ifdef EWOULDBLOCK
  64. || errno == EWOULDBLOCK;
  65. #endif
  66. ;
  67. #endif
  68. }
  69. static void set_nonblocking(uv_os_sock_t sock) {
  70. int r;
  71. #ifdef _WIN32
  72. unsigned long on = 1;
  73. r = ioctlsocket(sock, FIONBIO, &on);
  74. ASSERT(r == 0);
  75. #else
  76. int flags = fcntl(sock, F_GETFL, 0);
  77. ASSERT(flags >= 0);
  78. r = fcntl(sock, F_SETFL, flags | O_NONBLOCK);
  79. ASSERT(r >= 0);
  80. #endif
  81. }
  82. static uv_os_sock_t create_nonblocking_bound_socket(
  83. struct sockaddr_in bind_addr) {
  84. uv_os_sock_t sock;
  85. int r;
  86. sock = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
  87. #ifdef _WIN32
  88. ASSERT(sock != INVALID_SOCKET);
  89. #else
  90. ASSERT(sock >= 0);
  91. #endif
  92. set_nonblocking(sock);
  93. #ifndef _WIN32
  94. {
  95. /* Allow reuse of the port. */
  96. int yes = 1;
  97. r = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes);
  98. ASSERT(r == 0);
  99. }
  100. #endif
  101. r = bind(sock, (const struct sockaddr*) &bind_addr, sizeof bind_addr);
  102. ASSERT(r == 0);
  103. return sock;
  104. }
  105. static void close_socket(uv_os_sock_t sock) {
  106. int r;
  107. #ifdef _WIN32
  108. r = closesocket(sock);
  109. #else
  110. r = close(sock);
  111. #endif
  112. ASSERT(r == 0);
  113. }
  114. static connection_context_t* create_connection_context(
  115. uv_os_sock_t sock, int is_server_connection) {
  116. int r;
  117. connection_context_t* context;
  118. context = (connection_context_t*) malloc(sizeof *context);
  119. ASSERT(context != NULL);
  120. context->sock = sock;
  121. context->is_server_connection = is_server_connection;
  122. context->read = 0;
  123. context->sent = 0;
  124. context->open_handles = 0;
  125. context->events = 0;
  126. context->delayed_events = 0;
  127. context->got_fin = 0;
  128. context->sent_fin = 0;
  129. r = uv_poll_init_socket(uv_default_loop(), &context->poll_handle, sock);
  130. context->open_handles++;
  131. context->poll_handle.data = context;
  132. ASSERT(r == 0);
  133. r = uv_timer_init(uv_default_loop(), &context->timer_handle);
  134. context->open_handles++;
  135. context->timer_handle.data = context;
  136. ASSERT(r == 0);
  137. return context;
  138. }
  139. static void connection_close_cb(uv_handle_t* handle) {
  140. connection_context_t* context = (connection_context_t*) handle->data;
  141. if (--context->open_handles == 0) {
  142. if (test_mode == DUPLEX || context->is_server_connection) {
  143. ASSERT(context->read == TRANSFER_BYTES);
  144. } else {
  145. ASSERT(context->read == 0);
  146. }
  147. if (test_mode == DUPLEX || !context->is_server_connection) {
  148. ASSERT(context->sent == TRANSFER_BYTES);
  149. } else {
  150. ASSERT(context->sent == 0);
  151. }
  152. closed_connections++;
  153. free(context);
  154. }
  155. }
  156. static void destroy_connection_context(connection_context_t* context) {
  157. uv_close((uv_handle_t*) &context->poll_handle, connection_close_cb);
  158. uv_close((uv_handle_t*) &context->timer_handle, connection_close_cb);
  159. }
  160. static void connection_poll_cb(uv_poll_t* handle, int status, int events) {
  161. connection_context_t* context = (connection_context_t*) handle->data;
  162. unsigned int new_events;
  163. int r;
  164. ASSERT(status == 0);
  165. ASSERT(events & context->events);
  166. ASSERT(!(events & ~context->events));
  167. new_events = context->events;
  168. if (events & UV_READABLE) {
  169. int action = rand() % 7;
  170. switch (action) {
  171. case 0:
  172. case 1: {
  173. /* Read a couple of bytes. */
  174. static char buffer[74];
  175. r = recv(context->sock, buffer, sizeof buffer, 0);
  176. ASSERT(r >= 0);
  177. if (r > 0) {
  178. context->read += r;
  179. } else {
  180. /* Got FIN. */
  181. context->got_fin = 1;
  182. new_events &= ~UV_READABLE;
  183. }
  184. break;
  185. }
  186. case 2:
  187. case 3: {
  188. /* Read until EAGAIN. */
  189. static char buffer[931];
  190. r = recv(context->sock, buffer, sizeof buffer, 0);
  191. ASSERT(r >= 0);
  192. while (r > 0) {
  193. context->read += r;
  194. r = recv(context->sock, buffer, sizeof buffer, 0);
  195. }
  196. if (r == 0) {
  197. /* Got FIN. */
  198. context->got_fin = 1;
  199. new_events &= ~UV_READABLE;
  200. } else {
  201. ASSERT(got_eagain());
  202. }
  203. break;
  204. }
  205. case 4:
  206. /* Ignore. */
  207. break;
  208. case 5:
  209. /* Stop reading for a while. Restart in timer callback. */
  210. new_events &= ~UV_READABLE;
  211. if (!uv_is_active((uv_handle_t*) &context->timer_handle)) {
  212. context->delayed_events = UV_READABLE;
  213. uv_timer_start(&context->timer_handle, delay_timer_cb, 10, 0);
  214. } else {
  215. context->delayed_events |= UV_READABLE;
  216. }
  217. break;
  218. case 6:
  219. /* Fudge with the event mask. */
  220. uv_poll_start(&context->poll_handle, UV_WRITABLE, connection_poll_cb);
  221. uv_poll_start(&context->poll_handle, UV_READABLE, connection_poll_cb);
  222. context->events = UV_READABLE;
  223. break;
  224. default:
  225. ASSERT(0);
  226. }
  227. }
  228. if (events & UV_WRITABLE) {
  229. if (context->sent < TRANSFER_BYTES &&
  230. !(test_mode == UNIDIRECTIONAL && context->is_server_connection)) {
  231. /* We have to send more bytes. */
  232. int action = rand() % 7;
  233. switch (action) {
  234. case 0:
  235. case 1: {
  236. /* Send a couple of bytes. */
  237. static char buffer[103];
  238. int send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer);
  239. ASSERT(send_bytes > 0);
  240. r = send(context->sock, buffer, send_bytes, 0);
  241. if (r < 0) {
  242. ASSERT(got_eagain());
  243. spurious_writable_wakeups++;
  244. break;
  245. }
  246. ASSERT(r > 0);
  247. context->sent += r;
  248. valid_writable_wakeups++;
  249. break;
  250. }
  251. case 2:
  252. case 3: {
  253. /* Send until EAGAIN. */
  254. static char buffer[1234];
  255. int send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer);
  256. ASSERT(send_bytes > 0);
  257. r = send(context->sock, buffer, send_bytes, 0);
  258. if (r < 0) {
  259. ASSERT(got_eagain());
  260. spurious_writable_wakeups++;
  261. break;
  262. }
  263. ASSERT(r > 0);
  264. valid_writable_wakeups++;
  265. context->sent += r;
  266. while (context->sent < TRANSFER_BYTES) {
  267. send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer);
  268. ASSERT(send_bytes > 0);
  269. r = send(context->sock, buffer, send_bytes, 0);
  270. if (r <= 0) break;
  271. context->sent += r;
  272. }
  273. ASSERT(r > 0 || got_eagain());
  274. break;
  275. }
  276. case 4:
  277. /* Ignore. */
  278. break;
  279. case 5:
  280. /* Stop sending for a while. Restart in timer callback. */
  281. new_events &= ~UV_WRITABLE;
  282. if (!uv_is_active((uv_handle_t*) &context->timer_handle)) {
  283. context->delayed_events = UV_WRITABLE;
  284. uv_timer_start(&context->timer_handle, delay_timer_cb, 100, 0);
  285. } else {
  286. context->delayed_events |= UV_WRITABLE;
  287. }
  288. break;
  289. case 6:
  290. /* Fudge with the event mask. */
  291. uv_poll_start(&context->poll_handle,
  292. UV_READABLE,
  293. connection_poll_cb);
  294. uv_poll_start(&context->poll_handle,
  295. UV_WRITABLE,
  296. connection_poll_cb);
  297. context->events = UV_WRITABLE;
  298. break;
  299. default:
  300. ASSERT(0);
  301. }
  302. } else {
  303. /* Nothing more to write. Send FIN. */
  304. int r;
  305. #ifdef _WIN32
  306. r = shutdown(context->sock, SD_SEND);
  307. #else
  308. r = shutdown(context->sock, SHUT_WR);
  309. #endif
  310. ASSERT(r == 0);
  311. context->sent_fin = 1;
  312. new_events &= ~UV_WRITABLE;
  313. }
  314. }
  315. if (context->got_fin && context->sent_fin) {
  316. /* Sent and received FIN. Close and destroy context. */
  317. close_socket(context->sock);
  318. destroy_connection_context(context);
  319. context->events = 0;
  320. } else if (new_events != context->events) {
  321. /* Poll mask changed. Call uv_poll_start again. */
  322. context->events = new_events;
  323. uv_poll_start(handle, new_events, connection_poll_cb);
  324. }
  325. /* Assert that uv_is_active works correctly for poll handles. */
  326. if (context->events != 0) {
  327. ASSERT(1 == uv_is_active((uv_handle_t*) handle));
  328. } else {
  329. ASSERT(0 == uv_is_active((uv_handle_t*) handle));
  330. }
  331. }
  332. static void delay_timer_cb(uv_timer_t* timer, int status) {
  333. connection_context_t* context = (connection_context_t*) timer->data;
  334. int r;
  335. /* Timer should auto stop. */
  336. ASSERT(0 == uv_is_active((uv_handle_t*) timer));
  337. /* Add the requested events to the poll mask. */
  338. ASSERT(context->delayed_events != 0);
  339. context->events |= context->delayed_events;
  340. context->delayed_events = 0;
  341. r = uv_poll_start(&context->poll_handle,
  342. context->events,
  343. connection_poll_cb);
  344. ASSERT(r == 0);
  345. }
  346. static server_context_t* create_server_context(
  347. uv_os_sock_t sock) {
  348. int r;
  349. server_context_t* context;
  350. context = (server_context_t*) malloc(sizeof *context);
  351. ASSERT(context != NULL);
  352. context->sock = sock;
  353. context->connections = 0;
  354. r = uv_poll_init_socket(uv_default_loop(), &context->poll_handle, sock);
  355. context->poll_handle.data = context;
  356. ASSERT(r == 0);
  357. return context;
  358. }
  359. static void server_close_cb(uv_handle_t* handle) {
  360. server_context_t* context = (server_context_t*) handle->data;
  361. free(context);
  362. }
  363. static void destroy_server_context(server_context_t* context) {
  364. uv_close((uv_handle_t*) &context->poll_handle, server_close_cb);
  365. }
  366. static void server_poll_cb(uv_poll_t* handle, int status, int events) {
  367. server_context_t* server_context = (server_context_t*)
  368. handle->data;
  369. connection_context_t* connection_context;
  370. struct sockaddr_in addr;
  371. socklen_t addr_len;
  372. uv_os_sock_t sock;
  373. int r;
  374. addr_len = sizeof addr;
  375. sock = accept(server_context->sock, (struct sockaddr*) &addr, &addr_len);
  376. #ifdef _WIN32
  377. ASSERT(sock != INVALID_SOCKET);
  378. #else
  379. ASSERT(sock >= 0);
  380. #endif
  381. set_nonblocking(sock);
  382. connection_context = create_connection_context(sock, 1);
  383. connection_context->events = UV_READABLE | UV_WRITABLE;
  384. r = uv_poll_start(&connection_context->poll_handle,
  385. UV_READABLE | UV_WRITABLE,
  386. connection_poll_cb);
  387. ASSERT(r == 0);
  388. if (++server_context->connections == NUM_CLIENTS) {
  389. close_socket(server_context->sock);
  390. destroy_server_context(server_context);
  391. }
  392. }
  393. static void start_server(void) {
  394. server_context_t* context;
  395. struct sockaddr_in addr;
  396. uv_os_sock_t sock;
  397. int r;
  398. ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
  399. sock = create_nonblocking_bound_socket(addr);
  400. context = create_server_context(sock);
  401. r = listen(sock, 100);
  402. ASSERT(r == 0);
  403. r = uv_poll_start(&context->poll_handle, UV_READABLE, server_poll_cb);
  404. ASSERT(r == 0);
  405. }
  406. static void start_client(void) {
  407. uv_os_sock_t sock;
  408. connection_context_t* context;
  409. struct sockaddr_in server_addr;
  410. struct sockaddr_in addr;
  411. int r;
  412. ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &server_addr));
  413. ASSERT(0 == uv_ip4_addr("0.0.0.0", 0, &addr));
  414. sock = create_nonblocking_bound_socket(addr);
  415. context = create_connection_context(sock, 0);
  416. context->events = UV_READABLE | UV_WRITABLE;
  417. r = uv_poll_start(&context->poll_handle,
  418. UV_READABLE | UV_WRITABLE,
  419. connection_poll_cb);
  420. ASSERT(r == 0);
  421. r = connect(sock, (struct sockaddr*) &server_addr, sizeof server_addr);
  422. ASSERT(r == 0 || got_eagain());
  423. }
  424. static void start_poll_test(void) {
  425. int i, r;
  426. #ifdef _WIN32
  427. {
  428. struct WSAData wsa_data;
  429. int r = WSAStartup(MAKEWORD(2, 2), &wsa_data);
  430. ASSERT(r == 0);
  431. }
  432. #endif
  433. start_server();
  434. for (i = 0; i < NUM_CLIENTS; i++)
  435. start_client();
  436. r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
  437. ASSERT(r == 0);
  438. /* Assert that at most five percent of the writable wakeups was spurious. */
  439. ASSERT(spurious_writable_wakeups == 0 ||
  440. (valid_writable_wakeups + spurious_writable_wakeups) /
  441. spurious_writable_wakeups > 20);
  442. ASSERT(closed_connections == NUM_CLIENTS * 2);
  443. MAKE_VALGRIND_HAPPY();
  444. }
  445. TEST_IMPL(poll_duplex) {
  446. test_mode = DUPLEX;
  447. start_poll_test();
  448. return 0;
  449. }
  450. TEST_IMPL(poll_unidirectional) {
  451. test_mode = UNIDIRECTIONAL;
  452. start_poll_test();
  453. return 0;
  454. }