1
0

benchmark-pump.c 9.8 KB


  1. /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
  2. *
  3. * Permission is hereby granted, free of charge, to any person obtaining a copy
  4. * of this software and associated documentation files (the "Software"), to
  5. * deal in the Software without restriction, including without limitation the
  6. * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  7. * sell copies of the Software, and to permit persons to whom the Software is
  8. * furnished to do so, subject to the following conditions:
  9. *
  10. * The above copyright notice and this permission notice shall be included in
  11. * all copies or substantial portions of the Software.
  12. *
  13. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  14. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  15. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  16. * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  17. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  18. * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  19. * IN THE SOFTWARE.
  20. */
  21. #include "task.h"
  22. #include "uv.h"
  23. #include <math.h>
  24. #include <stdio.h>
  25. static int TARGET_CONNECTIONS;
  26. #define WRITE_BUFFER_SIZE 8192
  27. #define MAX_SIMULTANEOUS_CONNECTS 100
  28. #define PRINT_STATS 0
  29. #define STATS_INTERVAL 1000 /* msec */
  30. #define STATS_COUNT 5
  31. static void do_write(uv_stream_t*);
  32. static void maybe_connect_some();
  33. static uv_req_t* req_alloc();
  34. static void req_free(uv_req_t* uv_req);
  35. static void buf_alloc(uv_handle_t* handle, size_t size, uv_buf_t* buf);
  36. static void buf_free(const uv_buf_t* buf);
  37. static uv_loop_t* loop;
  38. static uv_tcp_t tcpServer;
  39. static uv_pipe_t pipeServer;
  40. static uv_stream_t* server;
  41. static struct sockaddr_in listen_addr;
  42. static struct sockaddr_in connect_addr;
  43. static int64_t start_time;
  44. static int max_connect_socket = 0;
  45. static int max_read_sockets = 0;
  46. static int read_sockets = 0;
  47. static int write_sockets = 0;
  48. static int64_t nrecv = 0;
  49. static int64_t nrecv_total = 0;
  50. static int64_t nsent = 0;
  51. static int64_t nsent_total = 0;
  52. static int stats_left = 0;
  53. static char write_buffer[WRITE_BUFFER_SIZE];
  54. /* Make this as large as you need. */
  55. #define MAX_WRITE_HANDLES 1000
  56. static stream_type type;
  57. static uv_tcp_t tcp_write_handles[MAX_WRITE_HANDLES];
  58. static uv_pipe_t pipe_write_handles[MAX_WRITE_HANDLES];
  59. static uv_timer_t timer_handle;
  60. static double gbit(int64_t bytes, int64_t passed_ms) {
  61. double gbits = ((double)bytes / (1024 * 1024 * 1024)) * 8;
  62. return gbits / ((double)passed_ms / 1000);
  63. }
  64. static void show_stats(uv_timer_t* handle, int status) {
  65. int64_t diff;
  66. int i;
  67. #if PRINT_STATS
  68. LOGF("connections: %d, write: %.1f gbit/s\n",
  69. write_sockets,
  70. gbit(nsent, STATS_INTERVAL));
  71. #endif
  72. /* Exit if the show is over */
  73. if (!--stats_left) {
  74. uv_update_time(loop);
  75. diff = uv_now(loop) - start_time;
  76. LOGF("%s_pump%d_client: %.1f gbit/s\n",
  77. type == TCP ? "tcp" : "pipe",
  78. write_sockets,
  79. gbit(nsent_total, diff));
  80. for (i = 0; i < write_sockets; i++) {
  81. if (type == TCP)
  82. uv_close((uv_handle_t*) &tcp_write_handles[i], NULL);
  83. else
  84. uv_close((uv_handle_t*) &pipe_write_handles[i], NULL);
  85. }
  86. exit(0);
  87. }
  88. /* Reset read and write counters */
  89. nrecv = 0;
  90. nsent = 0;
  91. }
  92. static void read_show_stats(void) {
  93. int64_t diff;
  94. uv_update_time(loop);
  95. diff = uv_now(loop) - start_time;
  96. LOGF("%s_pump%d_server: %.1f gbit/s\n",
  97. type == TCP ? "tcp" : "pipe",
  98. max_read_sockets,
  99. gbit(nrecv_total, diff));
  100. }
  101. static void read_sockets_close_cb(uv_handle_t* handle) {
  102. free(handle);
  103. read_sockets--;
  104. /* If it's past the first second and everyone has closed their connection
  105. * Then print stats.
  106. */
  107. if (uv_now(loop) - start_time > 1000 && read_sockets == 0) {
  108. read_show_stats();
  109. uv_close((uv_handle_t*)server, NULL);
  110. }
  111. }
  112. static void start_stats_collection(void) {
  113. int r;
  114. /* Show-stats timer */
  115. stats_left = STATS_COUNT;
  116. r = uv_timer_init(loop, &timer_handle);
  117. ASSERT(r == 0);
  118. r = uv_timer_start(&timer_handle, show_stats, STATS_INTERVAL, STATS_INTERVAL);
  119. ASSERT(r == 0);
  120. uv_update_time(loop);
  121. start_time = uv_now(loop);
  122. }
  123. static void read_cb(uv_stream_t* stream, ssize_t bytes, const uv_buf_t* buf) {
  124. if (nrecv_total == 0) {
  125. ASSERT(start_time == 0);
  126. uv_update_time(loop);
  127. start_time = uv_now(loop);
  128. }
  129. if (bytes < 0) {
  130. uv_close((uv_handle_t*)stream, read_sockets_close_cb);
  131. return;
  132. }
  133. buf_free(buf);
  134. nrecv += bytes;
  135. nrecv_total += bytes;
  136. }
  137. static void write_cb(uv_write_t* req, int status) {
  138. ASSERT(status == 0);
  139. req_free((uv_req_t*) req);
  140. nsent += sizeof write_buffer;
  141. nsent_total += sizeof write_buffer;
  142. do_write((uv_stream_t*) req->handle);
  143. }
  144. static void do_write(uv_stream_t* stream) {
  145. uv_write_t* req;
  146. uv_buf_t buf;
  147. int r;
  148. buf.base = (char*) &write_buffer;
  149. buf.len = sizeof write_buffer;
  150. req = (uv_write_t*) req_alloc();
  151. r = uv_write(req, stream, &buf, 1, write_cb);
  152. ASSERT(r == 0);
  153. }
  154. static void connect_cb(uv_connect_t* req, int status) {
  155. int i;
  156. if (status) LOG(uv_strerror(status));
  157. ASSERT(status == 0);
  158. write_sockets++;
  159. req_free((uv_req_t*) req);
  160. maybe_connect_some();
  161. if (write_sockets == TARGET_CONNECTIONS) {
  162. start_stats_collection();
  163. /* Yay! start writing */
  164. for (i = 0; i < write_sockets; i++) {
  165. if (type == TCP)
  166. do_write((uv_stream_t*) &tcp_write_handles[i]);
  167. else
  168. do_write((uv_stream_t*) &pipe_write_handles[i]);
  169. }
  170. }
  171. }
  172. static void maybe_connect_some(void) {
  173. uv_connect_t* req;
  174. uv_tcp_t* tcp;
  175. uv_pipe_t* pipe;
  176. int r;
  177. while (max_connect_socket < TARGET_CONNECTIONS &&
  178. max_connect_socket < write_sockets + MAX_SIMULTANEOUS_CONNECTS) {
  179. if (type == TCP) {
  180. tcp = &tcp_write_handles[max_connect_socket++];
  181. r = uv_tcp_init(loop, tcp);
  182. ASSERT(r == 0);
  183. req = (uv_connect_t*) req_alloc();
  184. r = uv_tcp_connect(req,
  185. tcp,
  186. (const struct sockaddr*) &connect_addr,
  187. connect_cb);
  188. ASSERT(r == 0);
  189. } else {
  190. pipe = &pipe_write_handles[max_connect_socket++];
  191. r = uv_pipe_init(loop, pipe, 0);
  192. ASSERT(r == 0);
  193. req = (uv_connect_t*) req_alloc();
  194. uv_pipe_connect(req, pipe, TEST_PIPENAME, connect_cb);
  195. }
  196. }
  197. }
  198. static void connection_cb(uv_stream_t* s, int status) {
  199. uv_stream_t* stream;
  200. int r;
  201. ASSERT(server == s);
  202. ASSERT(status == 0);
  203. if (type == TCP) {
  204. stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
  205. r = uv_tcp_init(loop, (uv_tcp_t*)stream);
  206. ASSERT(r == 0);
  207. } else {
  208. stream = (uv_stream_t*)malloc(sizeof(uv_pipe_t));
  209. r = uv_pipe_init(loop, (uv_pipe_t*)stream, 0);
  210. ASSERT(r == 0);
  211. }
  212. r = uv_accept(s, stream);
  213. ASSERT(r == 0);
  214. r = uv_read_start(stream, buf_alloc, read_cb);
  215. ASSERT(r == 0);
  216. read_sockets++;
  217. max_read_sockets++;
  218. }
  219. /*
  220. * Request allocator
  221. */
  222. typedef struct req_list_s {
  223. union uv_any_req uv_req;
  224. struct req_list_s* next;
  225. } req_list_t;
  226. static req_list_t* req_freelist = NULL;
  227. static uv_req_t* req_alloc(void) {
  228. req_list_t* req;
  229. req = req_freelist;
  230. if (req != NULL) {
  231. req_freelist = req->next;
  232. return (uv_req_t*) req;
  233. }
  234. req = (req_list_t*) malloc(sizeof *req);
  235. return (uv_req_t*) req;
  236. }
  237. static void req_free(uv_req_t* uv_req) {
  238. req_list_t* req = (req_list_t*) uv_req;
  239. req->next = req_freelist;
  240. req_freelist = req;
  241. }
  242. /*
  243. * Buffer allocator
  244. */
  245. typedef struct buf_list_s {
  246. uv_buf_t uv_buf_t;
  247. struct buf_list_s* next;
  248. } buf_list_t;
  249. static buf_list_t* buf_freelist = NULL;
  250. static void buf_alloc(uv_handle_t* handle, size_t size, uv_buf_t* buf) {
  251. buf_list_t* ab;
  252. ab = buf_freelist;
  253. if (ab != NULL)
  254. buf_freelist = ab->next;
  255. else {
  256. ab = malloc(size + sizeof(*ab));
  257. ab->uv_buf_t.len = size;
  258. ab->uv_buf_t.base = (char*) (ab + 1);
  259. }
  260. *buf = ab->uv_buf_t;
  261. }
  262. static void buf_free(const uv_buf_t* buf) {
  263. buf_list_t* ab = (buf_list_t*) buf->base - 1;
  264. ab->next = buf_freelist;
  265. buf_freelist = ab;
  266. }
  267. HELPER_IMPL(tcp_pump_server) {
  268. int r;
  269. type = TCP;
  270. loop = uv_default_loop();
  271. ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &listen_addr));
  272. /* Server */
  273. server = (uv_stream_t*)&tcpServer;
  274. r = uv_tcp_init(loop, &tcpServer);
  275. ASSERT(r == 0);
  276. r = uv_tcp_bind(&tcpServer, (const struct sockaddr*) &listen_addr, 0);
  277. ASSERT(r == 0);
  278. r = uv_listen((uv_stream_t*)&tcpServer, MAX_WRITE_HANDLES, connection_cb);
  279. ASSERT(r == 0);
  280. uv_run(loop, UV_RUN_DEFAULT);
  281. return 0;
  282. }
  283. HELPER_IMPL(pipe_pump_server) {
  284. int r;
  285. type = PIPE;
  286. loop = uv_default_loop();
  287. /* Server */
  288. server = (uv_stream_t*)&pipeServer;
  289. r = uv_pipe_init(loop, &pipeServer, 0);
  290. ASSERT(r == 0);
  291. r = uv_pipe_bind(&pipeServer, TEST_PIPENAME);
  292. ASSERT(r == 0);
  293. r = uv_listen((uv_stream_t*)&pipeServer, MAX_WRITE_HANDLES, connection_cb);
  294. ASSERT(r == 0);
  295. uv_run(loop, UV_RUN_DEFAULT);
  296. MAKE_VALGRIND_HAPPY();
  297. return 0;
  298. }
  299. static void tcp_pump(int n) {
  300. ASSERT(n <= MAX_WRITE_HANDLES);
  301. TARGET_CONNECTIONS = n;
  302. type = TCP;
  303. loop = uv_default_loop();
  304. ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &connect_addr));
  305. /* Start making connections */
  306. maybe_connect_some();
  307. uv_run(loop, UV_RUN_DEFAULT);
  308. MAKE_VALGRIND_HAPPY();
  309. }
  310. static void pipe_pump(int n) {
  311. ASSERT(n <= MAX_WRITE_HANDLES);
  312. TARGET_CONNECTIONS = n;
  313. type = PIPE;
  314. loop = uv_default_loop();
  315. /* Start making connections */
  316. maybe_connect_some();
  317. uv_run(loop, UV_RUN_DEFAULT);
  318. MAKE_VALGRIND_HAPPY();
  319. }
  320. BENCHMARK_IMPL(tcp_pump100_client) {
  321. tcp_pump(100);
  322. return 0;
  323. }
  324. BENCHMARK_IMPL(tcp_pump1_client) {
  325. tcp_pump(1);
  326. return 0;
  327. }
  328. BENCHMARK_IMPL(pipe_pump100_client) {
  329. pipe_pump(100);
  330. return 0;
  331. }
  332. BENCHMARK_IMPL(pipe_pump1_client) {
  333. pipe_pump(1);
  334. return 0;
  335. }