gnunet-cadet-profiler.c 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092
  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2011 Christian Grothoff (and other contributing authors)
  4. GNUnet is free software; you can redistribute it and/or modify
  5. it under the terms of the GNU General Public License as published
  6. by the Free Software Foundation; either version 3, or (at your
  7. option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. General Public License for more details.
  12. You should have received a copy of the GNU General Public License
  13. along with GNUnet; see the file COPYING. If not, write to the
  14. Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  15. Boston, MA 02111-1307, USA.
  16. */
  17. /**
  18. * @file cadet/gnunet-cadet-profiler.c
  19. *
  20. * @brief Profiler for cadet experiments.
  21. */
  22. #include <stdio.h>
  23. #include "platform.h"
  24. #include "cadet_test_lib.h"
  25. #include "gnunet_cadet_service.h"
  26. #include "gnunet_statistics_service.h"
  27. #define PING 1
  28. #define PONG 2
  29. /**
  30. * Paximum ping period in milliseconds. Real period = rand (0, PING_PERIOD)
  31. */
  32. #define PING_PERIOD 1000
  33. /**
  34. * How long until we give up on connecting the peers?
  35. */
  36. #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 120)
  37. /**
  38. * Time to wait for stuff that should be rather fast
  39. */
  40. #define SHORT_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 300)
  41. /**
  42. * Total number of rounds.
  43. */
  44. #define number_rounds sizeof(rounds)/sizeof(rounds[0])
  45. /**
  46. * Ratio of peers active. First round always is 1.0.
  47. */
  48. static float rounds[] = {0.8, 0.7, 0.6, 0.5, 0.4, 0.3, 0.2, 0.0};
  49. /**
  50. * Message type for pings.
  51. */
  52. struct CadetPingMessage
  53. {
  54. /**
  55. * Header. Type PING/PONG.
  56. */
  57. struct GNUNET_MessageHeader header;
  58. /**
  59. * Message number.
  60. */
  61. uint32_t counter;
  62. /**
  63. * Time the message was sent.
  64. */
  65. struct GNUNET_TIME_AbsoluteNBO timestamp;
  66. /**
  67. * Round number.
  68. */
  69. uint32_t round_number;
  70. };
  71. /**
  72. * Peer description.
  73. */
  74. struct CadetPeer
  75. {
  76. /**
  77. * Testbed Operation (to get peer id, etc).
  78. */
  79. struct GNUNET_TESTBED_Operation *op;
  80. /**
  81. * Peer ID.
  82. */
  83. struct GNUNET_PeerIdentity id;
  84. /**
  85. * Cadet handle for the root peer
  86. */
  87. struct GNUNET_CADET_Handle *cadet;
  88. /**
  89. * Channel handle for the root peer
  90. */
  91. struct GNUNET_CADET_Channel *ch;
  92. /**
  93. * Channel handle for the dest peer
  94. */
  95. struct GNUNET_CADET_Channel *incoming_ch;
  96. /**
  97. * Channel handle for a warmup channel.
  98. */
  99. struct GNUNET_CADET_Channel *warmup_ch;
  100. /**
  101. * Number of payload packes sent
  102. */
  103. int data_sent;
  104. /**
  105. * Number of payload packets received
  106. */
  107. int data_received;
  108. /**
  109. * Is peer up?
  110. */
  111. int up;
  112. /**
  113. * Destinaton to ping.
  114. */
  115. struct CadetPeer *dest;
  116. /**
  117. * Incoming channel for pings.
  118. */
  119. struct CadetPeer *incoming;
  120. /**
  121. * Task to do the next ping.
  122. */
  123. struct GNUNET_SCHEDULER_Task * ping_task;
  124. float mean[number_rounds];
  125. float var[number_rounds];
  126. unsigned int pongs[number_rounds];
  127. unsigned int pings[number_rounds];
  128. };
  129. /**
  130. * Duration of each round.
  131. */
  132. static struct GNUNET_TIME_Relative round_time;
  133. /**
  134. * GNUNET_PeerIdentity -> CadetPeer
  135. */
  136. static struct GNUNET_CONTAINER_MultiPeerMap *ids;
  137. /**
  138. * Testbed peer handles.
  139. */
  140. static struct GNUNET_TESTBED_Peer **testbed_handles;
  141. /**
  142. * Testbed Operation (to get stats).
  143. */
  144. static struct GNUNET_TESTBED_Operation *stats_op;
  145. /**
  146. * Operation to get peer ids.
  147. */
  148. struct CadetPeer *peers;
  149. /**
  150. * Peer ids counter.
  151. */
  152. static unsigned int p_ids;
  153. /**
  154. * Total number of peers.
  155. */
  156. static unsigned long long peers_total;
  157. /**
  158. * Number of currently running peers.
  159. */
  160. static unsigned long long peers_running;
  161. /**
  162. * Number of peers doing pings.
  163. */
  164. static unsigned long long peers_pinging;
  165. /**
  166. * Test context (to shut down).
  167. */
  168. static struct GNUNET_CADET_TEST_Context *test_ctx;
  169. /**
  170. * Task called to shutdown test.
  171. */
  172. static struct GNUNET_SCHEDULER_Task * shutdown_handle;
  173. /**
  174. * Task called to disconnect peers, before shutdown.
  175. */
  176. static struct GNUNET_SCHEDULER_Task * disconnect_task;
  177. /**
  178. * Task to perform tests
  179. */
  180. static struct GNUNET_SCHEDULER_Task * test_task;
  181. /**
  182. * Round number.
  183. */
  184. static unsigned int current_round;
  185. /**
  186. * Do preconnect? (Each peer creates a tunnel to one other peer).
  187. */
  188. static int do_warmup;
  189. /**
  190. * Warmup progress.
  191. */
  192. static unsigned int peers_warmup;
  193. /**
  194. * Flag to notify callbacks not to generate any new traffic anymore.
  195. */
  196. static int test_finished;
  197. /**
  198. * START THE TEST ITSELF, AS WE ARE CONNECTED TO THE CADET SERVICES.
  199. *
  200. * Testcase continues when the root receives confirmation of connected peers,
  201. * on callback funtion ch.
  202. *
  203. * @param cls Closure (unsued).
  204. * @param tc Task Context.
  205. */
  206. static void
  207. start_test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
  208. /**
  209. * Calculate a random delay.
  210. *
  211. * @param max Exclusive maximum, in ms.
  212. *
  213. * @return A time between 0 a max-1 ms.
  214. */
  215. static struct GNUNET_TIME_Relative
  216. delay_ms_rnd (unsigned int max)
  217. {
  218. unsigned int rnd;
  219. rnd = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max);
  220. return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, rnd);
  221. }
  222. /**
  223. * Get the index of a peer in the peers array.
  224. *
  225. * @param peer Peer whose index to get.
  226. *
  227. * @return Index of peer in peers.
  228. */
  229. static unsigned int
  230. get_index (struct CadetPeer *peer)
  231. {
  232. return peer - peers;
  233. }
  234. /**
  235. * Show the results of the test (banwidth acheived) and log them to GAUGER
  236. */
  237. static void
  238. show_end_data (void)
  239. {
  240. struct CadetPeer *peer;
  241. unsigned int i;
  242. unsigned int j;
  243. for (i = 0; i < number_rounds; i++)
  244. {
  245. for (j = 0; j < peers_pinging; j++)
  246. {
  247. peer = &peers[j];
  248. FPRINTF (stdout,
  249. "ROUND %3u PEER %3u: %10.2f / %10.2f, PINGS: %3u, PONGS: %3u\n",
  250. i, j, peer->mean[i], sqrt (peer->var[i] / (peer->pongs[i] - 1)),
  251. peer->pings[i], peer->pongs[i]);
  252. }
  253. }
  254. }
  255. /**
  256. * Shut down peergroup, clean up.
  257. *
  258. * @param cls Closure (unused).
  259. * @param tc Task Context.
  260. */
  261. static void
  262. shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  263. {
  264. GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Ending test.\n");
  265. shutdown_handle = NULL;
  266. }
  267. /**
  268. * Disconnect from cadet services af all peers, call shutdown.
  269. *
  270. * @param cls Closure (unused).
  271. * @param tc Task Context.
  272. */
  273. static void
  274. disconnect_cadet_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  275. {
  276. long line = (long) cls;
  277. unsigned int i;
  278. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  279. "disconnecting cadet service, called from line %ld\n", line);
  280. disconnect_task = NULL;
  281. for (i = 0; i < peers_total; i++)
  282. {
  283. if (NULL != peers[i].op)
  284. GNUNET_TESTBED_operation_done (peers[i].op);
  285. if (peers[i].up != GNUNET_YES)
  286. continue;
  287. if (NULL != peers[i].ch)
  288. {
  289. GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u: channel %p\n", i, peers[i].ch);
  290. GNUNET_CADET_channel_destroy (peers[i].ch);
  291. }
  292. if (NULL != peers[i].warmup_ch)
  293. {
  294. GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u: warmup channel %p\n",
  295. i, peers[i].warmup_ch);
  296. GNUNET_CADET_channel_destroy (peers[i].warmup_ch);
  297. }
  298. if (NULL != peers[i].incoming_ch)
  299. {
  300. GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u: incoming channel %p\n",
  301. i, peers[i].incoming_ch);
  302. GNUNET_CADET_channel_destroy (peers[i].incoming_ch);
  303. }
  304. }
  305. GNUNET_CADET_TEST_cleanup (test_ctx);
  306. if (NULL != shutdown_handle)
  307. {
  308. GNUNET_SCHEDULER_cancel (shutdown_handle);
  309. }
  310. shutdown_handle = GNUNET_SCHEDULER_add_now (&shutdown_task, NULL);
  311. }
  312. /**
  313. * Finish test normally: schedule disconnect and shutdown
  314. *
  315. * @param line Line in the code the abort is requested from (__LINE__).
  316. */
  317. static void
  318. abort_test (long line)
  319. {
  320. if (disconnect_task != NULL)
  321. {
  322. GNUNET_SCHEDULER_cancel (disconnect_task);
  323. disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
  324. (void *) line);
  325. }
  326. }
  327. /**
  328. * Stats callback. Finish the stats testbed operation and when all stats have
  329. * been iterated, shutdown the test.
  330. *
  331. * @param cls closure
  332. * @param op the operation that has been finished
  333. * @param emsg error message in case the operation has failed; will be NULL if
  334. * operation has executed successfully.
  335. */
  336. static void
  337. stats_cont (void *cls, struct GNUNET_TESTBED_Operation *op, const char *emsg)
  338. {
  339. GNUNET_log (GNUNET_ERROR_TYPE_INFO, "... collecting statistics done.\n");
  340. GNUNET_TESTBED_operation_done (stats_op);
  341. if (NULL != disconnect_task)
  342. GNUNET_SCHEDULER_cancel (disconnect_task);
  343. disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
  344. (void *) __LINE__);
  345. }
  346. /**
  347. * Process statistic values.
  348. *
  349. * @param cls closure
  350. * @param peer the peer the statistic belong to
  351. * @param subsystem name of subsystem that created the statistic
  352. * @param name the name of the datum
  353. * @param value the current value
  354. * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
  355. * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
  356. */
  357. static int
  358. stats_iterator (void *cls, const struct GNUNET_TESTBED_Peer *peer,
  359. const char *subsystem, const char *name,
  360. uint64_t value, int is_persistent)
  361. {
  362. uint32_t i;
  363. i = GNUNET_TESTBED_get_index (peer);
  364. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " STATS %u - %s [%s]: %llu\n",
  365. i, subsystem, name, value);
  366. return GNUNET_OK;
  367. }
  368. /**
  369. * Task check that keepalives were sent and received.
  370. *
  371. * @param cls Closure (NULL).
  372. * @param tc Task Context.
  373. */
  374. static void
  375. collect_stats (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  376. {
  377. if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0)
  378. return;
  379. GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Start collecting statistics...\n");
  380. stats_op = GNUNET_TESTBED_get_statistics (peers_total, testbed_handles,
  381. NULL, NULL,
  382. stats_iterator, stats_cont, NULL);
  383. }
  384. /**
  385. * @brief Finish profiler normally. Signal finish and start collecting stats.
  386. *
  387. * @param cls Closure (unused).
  388. * @param tc Task context.
  389. */
  390. static void
  391. finish_profiler (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  392. {
  393. if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0)
  394. return;
  395. test_finished = GNUNET_YES;
  396. show_end_data();
  397. GNUNET_SCHEDULER_add_now (&collect_stats, NULL);
  398. }
  399. /**
  400. * Set the total number of running peers.
  401. *
  402. * @param target Desired number of running peers.
  403. */
  404. static void
  405. adjust_running_peers (unsigned int target)
  406. {
  407. struct GNUNET_TESTBED_Operation *op;
  408. unsigned int delta;
  409. unsigned int run;
  410. unsigned int i;
  411. unsigned int r;
  412. GNUNET_assert (target <= peers_total);
  413. GNUNET_log (GNUNET_ERROR_TYPE_INFO, "adjust peers to %u\n", target);
  414. if (target > peers_running)
  415. {
  416. delta = target - peers_running;
  417. run = GNUNET_YES;
  418. }
  419. else
  420. {
  421. delta = peers_running - target;
  422. run = GNUNET_NO;
  423. }
  424. for (i = 0; i < delta; i++)
  425. {
  426. do {
  427. r = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
  428. peers_total - peers_pinging);
  429. r += peers_pinging;
  430. } while (peers[r].up == run || NULL != peers[r].incoming);
  431. GNUNET_log (GNUNET_ERROR_TYPE_INFO, "St%s peer %u: %s\n",
  432. run ? "arting" : "opping", r, GNUNET_i2s (&peers[r].id));
  433. if (NULL != peers[r].ping_task)
  434. GNUNET_SCHEDULER_cancel (peers[r].ping_task);
  435. peers[r].ping_task = NULL;
  436. peers[r].up = run;
  437. if (NULL != peers[r].ch)
  438. GNUNET_CADET_channel_destroy (peers[r].ch);
  439. peers[r].ch = NULL;
  440. if (NULL != peers[r].dest)
  441. {
  442. if (NULL != peers[r].dest->incoming_ch)
  443. GNUNET_CADET_channel_destroy (peers[r].dest->incoming_ch);
  444. peers[r].dest->incoming_ch = NULL;
  445. }
  446. op = GNUNET_TESTBED_peer_manage_service (&peers[r], testbed_handles[r],
  447. "cadet", NULL, NULL, run);
  448. GNUNET_break (NULL != op);
  449. peers_running += run ? 1 : -1;
  450. GNUNET_assert (peers_running > 0);
  451. }
  452. }
  453. /**
  454. * @brief Move to next round.
  455. *
  456. * @param cls Closure (round #).
  457. * @param tc Task context.
  458. */
  459. static void
  460. next_rnd (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  461. {
  462. if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0)
  463. return;
  464. GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ROUND %ld\n", current_round);
  465. if (0.0 == rounds[current_round])
  466. {
  467. GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Finishing\n");
  468. GNUNET_SCHEDULER_add_now (&finish_profiler, NULL);
  469. return;
  470. }
  471. adjust_running_peers (rounds[current_round] * peers_total);
  472. current_round++;
  473. GNUNET_SCHEDULER_add_delayed (round_time, &next_rnd, NULL);
  474. }
  475. /**
  476. * Transmit ping callback.
  477. *
  478. * @param cls Closure (peer for PING, NULL for PONG).
  479. * @param size Size of the tranmist buffer.
  480. * @param buf Pointer to the beginning of the buffer.
  481. *
  482. * @return Number of bytes written to buf.
  483. */
  484. static size_t
  485. tmt_rdy_ping (void *cls, size_t size, void *buf);
  486. /**
  487. * Transmit pong callback.
  488. *
  489. * @param cls Closure (copy of PING message, to be freed).
  490. * @param size Size of the buffer we have.
  491. * @param buf Buffer to copy data to.
  492. */
  493. static size_t
  494. tmt_rdy_pong (void *cls, size_t size, void *buf)
  495. {
  496. struct CadetPingMessage *ping = cls;
  497. struct CadetPingMessage *pong;
  498. if (0 == size || NULL == buf)
  499. {
  500. GNUNET_free (ping);
  501. return 0;
  502. }
  503. pong = (struct CadetPingMessage *) buf;
  504. memcpy (pong, ping, sizeof (*ping));
  505. pong->header.type = htons (PONG);
  506. GNUNET_free (ping);
  507. return sizeof (*ping);
  508. }
  509. /**
  510. * @brief Send a ping to destination
  511. *
  512. * @param cls Closure (peer).
  513. * @param tc Task context.
  514. */
  515. static void
  516. ping (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  517. {
  518. struct CadetPeer *peer = (struct CadetPeer *) cls;
  519. peer->ping_task = NULL;
  520. if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0
  521. || GNUNET_YES == test_finished)
  522. return;
  523. GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u -> %u (%u)\n",
  524. get_index (peer), get_index (peer->dest), peer->data_sent);
  525. GNUNET_CADET_notify_transmit_ready (peer->ch, GNUNET_NO,
  526. GNUNET_TIME_UNIT_FOREVER_REL,
  527. sizeof (struct CadetPingMessage),
  528. &tmt_rdy_ping, peer);
  529. }
  530. /**
  531. * @brief Reply with a pong to origin.
  532. *
  533. * @param cls Closure (peer).
  534. * @param tc Task context.
  535. */
  536. static void
  537. pong (struct GNUNET_CADET_Channel *channel, const struct CadetPingMessage *ping)
  538. {
  539. struct CadetPingMessage *copy;
  540. copy = GNUNET_new (struct CadetPingMessage);
  541. memcpy (copy, ping, sizeof (*ping));
  542. GNUNET_CADET_notify_transmit_ready (channel, GNUNET_NO,
  543. GNUNET_TIME_UNIT_FOREVER_REL,
  544. sizeof (struct CadetPingMessage),
  545. &tmt_rdy_pong, copy);
  546. }
  547. /**
  548. * Transmit ping callback
  549. *
  550. * @param cls Closure (peer).
  551. * @param size Size of the buffer we have.
  552. * @param buf Buffer to copy data to.
  553. */
  554. static size_t
  555. tmt_rdy_ping (void *cls, size_t size, void *buf)
  556. {
  557. struct CadetPeer *peer = (struct CadetPeer *) cls;
  558. struct CadetPingMessage *msg = buf;
  559. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tmt_rdy called, filling buffer\n");
  560. if (size < sizeof (struct CadetPingMessage) || NULL == buf)
  561. {
  562. GNUNET_break (GNUNET_YES == test_finished);
  563. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  564. "size %u, buf %p, data_sent %u, data_received %u\n",
  565. size, buf, peer->data_sent, peer->data_received);
  566. return 0;
  567. }
  568. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending: msg %d\n", peer->data_sent);
  569. msg->header.size = htons (size);
  570. msg->header.type = htons (PING);
  571. msg->counter = htonl (peer->data_sent++);
  572. msg->round_number = htonl (current_round);
  573. msg->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
  574. peer->pings[current_round]++;
  575. peer->ping_task = GNUNET_SCHEDULER_add_delayed (delay_ms_rnd (PING_PERIOD),
  576. &ping, peer);
  577. return sizeof (struct CadetPingMessage);
  578. }
  579. /**
  580. * Function is called whenever a PING message is received.
  581. *
  582. * @param cls closure (peer #, set from GNUNET_CADET_connect)
  583. * @param channel connection to the other end
  584. * @param channel_ctx place to store local state associated with the channel
  585. * @param message the actual message
  586. * @return GNUNET_OK to keep the connection open,
  587. * GNUNET_SYSERR to close it (signal serious error)
  588. */
  589. int
  590. ping_handler (void *cls, struct GNUNET_CADET_Channel *channel,
  591. void **channel_ctx,
  592. const struct GNUNET_MessageHeader *message)
  593. {
  594. long n = (long) cls;
  595. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%u got PING\n", n);
  596. GNUNET_CADET_receive_done (channel);
  597. if (GNUNET_NO == test_finished)
  598. pong (channel, (struct CadetPingMessage *) message);
  599. return GNUNET_OK;
  600. }
  601. /**
  602. * Function is called whenever a PONG message is received.
  603. *
  604. * @param cls closure (peer #, set from GNUNET_CADET_connect)
  605. * @param channel connection to the other end
  606. * @param channel_ctx place to store local state associated with the channel
  607. * @param message the actual message
  608. * @return GNUNET_OK to keep the connection open,
  609. * GNUNET_SYSERR to close it (signal serious error)
  610. */
  611. int
  612. pong_handler (void *cls, struct GNUNET_CADET_Channel *channel,
  613. void **channel_ctx,
  614. const struct GNUNET_MessageHeader *message)
  615. {
  616. long n = (long) cls;
  617. struct CadetPeer *peer;
  618. struct CadetPingMessage *msg;
  619. struct GNUNET_TIME_Absolute send_time;
  620. struct GNUNET_TIME_Relative latency;
  621. unsigned int r /* Ping round */;
  622. float delta;
  623. GNUNET_CADET_receive_done (channel);
  624. peer = &peers[n];
  625. msg = (struct CadetPingMessage *) message;
  626. send_time = GNUNET_TIME_absolute_ntoh (msg->timestamp);
  627. latency = GNUNET_TIME_absolute_get_duration (send_time);
  628. r = ntohl (msg->round_number);
  629. GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u <- %u (%u) latency: %s\n",
  630. get_index (peer), get_index (peer->dest), ntohl (msg->counter),
  631. GNUNET_STRINGS_relative_time_to_string (latency, GNUNET_NO));
  632. /* Online variance calculation */
  633. peer->pongs[r]++;
  634. delta = latency.rel_value_us - peer->mean[r];
  635. peer->mean[r] = peer->mean[r] + delta/peer->pongs[r];
  636. peer->var[r] += delta * (latency.rel_value_us - peer->mean[r]);
  637. return GNUNET_OK;
  638. }
  639. /**
  640. * Handlers, for diverse services
  641. */
  642. static struct GNUNET_CADET_MessageHandler handlers[] = {
  643. {&ping_handler, PING, sizeof (struct CadetPingMessage)},
  644. {&pong_handler, PONG, sizeof (struct CadetPingMessage)},
  645. {NULL, 0, 0}
  646. };
  647. /**
  648. * Method called whenever another peer has added us to a channel
  649. * the other peer initiated.
  650. *
  651. * @param cls Closure.
  652. * @param channel New handle to the channel.
  653. * @param initiator Peer that started the channel.
  654. * @param port Port this channel is connected to.
  655. * @param options channel option flags
  656. * @return Initial channel context for the channel
  657. * (can be NULL -- that's not an error).
  658. */
  659. static void *
  660. incoming_channel (void *cls, struct GNUNET_CADET_Channel *channel,
  661. const struct GNUNET_PeerIdentity *initiator,
  662. uint32_t port, enum GNUNET_CADET_ChannelOption options)
  663. {
  664. long n = (long) cls;
  665. struct CadetPeer *peer;
  666. peer = GNUNET_CONTAINER_multipeermap_get (ids, initiator);
  667. GNUNET_assert (NULL != peer);
  668. if (NULL == peers[n].incoming)
  669. {
  670. GNUNET_log (GNUNET_ERROR_TYPE_INFO, "WARMUP %3u: %u <= %u\n",
  671. peers_warmup, n, get_index (peer));
  672. peers_warmup++;
  673. if (peers_warmup < peers_total)
  674. return NULL;
  675. if (NULL != test_task)
  676. {
  677. GNUNET_SCHEDULER_cancel (test_task);
  678. test_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
  679. &start_test, NULL);
  680. }
  681. return NULL;
  682. }
  683. GNUNET_assert (peer == peers[n].incoming);
  684. GNUNET_assert (peer->dest == &peers[n]);
  685. GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u <= %u %p\n",
  686. n, get_index (peer), channel);
  687. peers[n].incoming_ch = channel;
  688. return NULL;
  689. }
  690. /**
  691. * Function called whenever an inbound channel is destroyed. Should clean up
  692. * any associated state.
  693. *
  694. * @param cls closure (set from GNUNET_CADET_connect)
  695. * @param channel connection to the other end (henceforth invalid)
  696. * @param channel_ctx place where local state associated
  697. * with the channel is stored
  698. */
  699. static void
  700. channel_cleaner (void *cls, const struct GNUNET_CADET_Channel *channel,
  701. void *channel_ctx)
  702. {
  703. long n = (long) cls;
  704. struct CadetPeer *peer = &peers[n];
  705. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  706. "Channel %p disconnected at peer %ld\n", channel, n);
  707. if (peer->ch == channel)
  708. peer->ch = NULL;
  709. }
  710. /**
  711. * Select a random peer that has no incoming channel
  712. *
  713. * @param peer ID of the peer connecting. NULL if irrelevant (warmup).
  714. *
  715. * @return Random peer not yet connected to.
  716. */
  717. static struct CadetPeer *
  718. select_random_peer (struct CadetPeer *peer)
  719. {
  720. unsigned int r;
  721. do
  722. {
  723. r = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, peers_total);
  724. } while (NULL != peers[r].incoming);
  725. peers[r].incoming = peer;
  726. return &peers[r];
  727. }
  728. /**
  729. * START THE TEST ITSELF, AS WE ARE CONNECTED TO THE CADET SERVICES.
  730. *
  731. * Testcase continues when the root receives confirmation of connected peers,
  732. * on callback funtion ch.
  733. *
  734. * @param cls Closure (unsued).
  735. * @param tc Task Context.
  736. */
  737. static void
  738. start_test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  739. {
  740. enum GNUNET_CADET_ChannelOption flags;
  741. unsigned long i;
  742. test_task = NULL;
  743. if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0)
  744. return;
  745. GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Start profiler\n");
  746. flags = GNUNET_CADET_OPTION_DEFAULT;
  747. for (i = 0; i < peers_pinging; i++)
  748. {
  749. peers[i].dest = select_random_peer (&peers[i]);
  750. peers[i].ch = GNUNET_CADET_channel_create (peers[i].cadet, NULL,
  751. &peers[i].dest->id,
  752. 1, flags);
  753. if (NULL == peers[i].ch)
  754. {
  755. GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Channel %lu failed\n", i);
  756. GNUNET_CADET_TEST_cleanup (test_ctx);
  757. return;
  758. }
  759. GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u => %u %p\n",
  760. i, get_index (peers[i].dest), peers[i].ch);
  761. peers[i].ping_task = GNUNET_SCHEDULER_add_delayed (delay_ms_rnd (2000),
  762. &ping, &peers[i]);
  763. }
  764. peers_running = peers_total;
  765. if (NULL != disconnect_task)
  766. GNUNET_SCHEDULER_cancel (disconnect_task);
  767. disconnect_task =
  768. GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply(round_time,
  769. number_rounds + 1),
  770. &disconnect_cadet_peers,
  771. (void *) __LINE__);
  772. GNUNET_SCHEDULER_add_delayed (round_time, &next_rnd, NULL);
  773. }
  774. /**
  775. * Do warmup: create some channels to spread information about the topology.
  776. */
  777. static void
  778. warmup (void)
  779. {
  780. struct CadetPeer *peer;
  781. unsigned int i;
  782. for (i = 0; i < peers_total; i++)
  783. {
  784. peer = select_random_peer (NULL);
  785. GNUNET_log (GNUNET_ERROR_TYPE_INFO, "WARMUP %u => %u\n",
  786. i, get_index (peer));
  787. peers[i].warmup_ch =
  788. GNUNET_CADET_channel_create (peers[i].cadet, NULL, &peer->id,
  789. 1, GNUNET_CADET_OPTION_DEFAULT);
  790. if (NULL == peers[i].warmup_ch)
  791. {
  792. GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Warmup %u failed\n", i);
  793. GNUNET_CADET_TEST_cleanup (test_ctx);
  794. return;
  795. }
  796. }
  797. }
  798. /**
  799. * Callback to be called when the requested peer information is available
  800. *
  801. * @param cls the closure from GNUNET_TESTBED_peer_get_information()
  802. * @param op the operation this callback corresponds to
  803. * @param pinfo the result; will be NULL if the operation has failed
  804. * @param emsg error message if the operation has failed;
  805. * NULL if the operation is successfull
  806. */
  807. static void
  808. peer_id_cb (void *cls,
  809. struct GNUNET_TESTBED_Operation *op,
  810. const struct GNUNET_TESTBED_PeerInformation *pinfo,
  811. const char *emsg)
  812. {
  813. long n = (long) cls;
  814. if (NULL == pinfo || NULL != emsg)
  815. {
  816. GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "pi_cb: %s\n", emsg);
  817. abort_test (__LINE__);
  818. return;
  819. }
  820. peers[n].id = *(pinfo->result.id);
  821. GNUNET_log (GNUNET_ERROR_TYPE_INFO, " %u id: %s\n",
  822. n, GNUNET_i2s (&peers[n].id));
  823. GNUNET_break (GNUNET_OK ==
  824. GNUNET_CONTAINER_multipeermap_put (ids, &peers[n].id, &peers[n],
  825. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
  826. GNUNET_TESTBED_operation_done (peers[n].op);
  827. peers[n].op = NULL;
  828. p_ids++;
  829. if (p_ids < peers_total)
  830. return;
  831. GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Got all IDs, starting profiler\n");
  832. if (do_warmup)
  833. {
  834. struct GNUNET_TIME_Relative delay;
  835. warmup();
  836. delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
  837. 100 * peers_total);
  838. test_task = GNUNET_SCHEDULER_add_delayed (delay, &start_test, NULL);
  839. return; /* start_test from incoming_channel */
  840. }
  841. GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Starting in a second...\n");
  842. test_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
  843. &start_test, NULL);
  844. }
  845. /**
  846. * test main: start test when all peers are connected
  847. *
  848. * @param cls Closure.
  849. * @param ctx Argument to give to GNUNET_CADET_TEST_cleanup on test end.
  850. * @param num_peers Number of peers that are running.
  851. * @param testbed_peers Array of peers.
  852. * @param cadetes Handle to each of the CADETs of the peers.
  853. */
  854. static void
  855. tmain (void *cls,
  856. struct GNUNET_CADET_TEST_Context *ctx,
  857. unsigned int num_peers,
  858. struct GNUNET_TESTBED_Peer **testbed_peers,
  859. struct GNUNET_CADET_Handle **cadetes)
  860. {
  861. unsigned long i;
  862. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test main\n");
  863. test_ctx = ctx;
  864. GNUNET_assert (peers_total == num_peers);
  865. peers_running = num_peers;
  866. testbed_handles = testbed_peers;
  867. disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME,
  868. &disconnect_cadet_peers,
  869. (void *) __LINE__);
  870. shutdown_handle = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
  871. &shutdown_task, NULL);
  872. for (i = 0; i < peers_total; i++)
  873. {
  874. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "requesting id %ld\n", i);
  875. peers[i].up = GNUNET_YES;
  876. peers[i].cadet = cadetes[i];
  877. peers[i].op =
  878. GNUNET_TESTBED_peer_get_information (testbed_handles[i],
  879. GNUNET_TESTBED_PIT_IDENTITY,
  880. &peer_id_cb, (void *) i);
  881. }
  882. GNUNET_log (GNUNET_ERROR_TYPE_INFO, "requested peer ids\n");
  883. /* Continues from pi_cb -> do_test */
  884. }
  885. /**
  886. * Main: start profiler.
  887. */
  888. int
  889. main (int argc, char *argv[])
  890. {
  891. static uint32_t ports[2];
  892. const char *config_file;
  893. config_file = ".profiler.conf";
  894. if (4 > argc)
  895. {
  896. fprintf (stderr, "usage: %s ROUND_TIME PEERS PINGS [DO_WARMUP]\n", argv[0]);
  897. fprintf (stderr, "example: %s 30s 16 1 Y\n", argv[0]);
  898. return 1;
  899. }
  900. if (GNUNET_OK != GNUNET_STRINGS_fancy_time_to_relative (argv[1], &round_time))
  901. {
  902. fprintf (stderr, "%s is not a valid time\n", argv[1]);
  903. return 1;
  904. }
  905. peers_total = atoll (argv[2]);
  906. if (2 > peers_total)
  907. {
  908. fprintf (stderr, "%s peers is not valid (> 2)\n", argv[1]);
  909. return 1;
  910. }
  911. peers = GNUNET_malloc (sizeof (struct CadetPeer) * peers_total);
  912. peers_pinging = atoll (argv[3]);
  913. if (peers_total < 2 * peers_pinging)
  914. {
  915. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  916. "not enough peers, total should be > 2 * peers_pinging\n");
  917. return 1;
  918. }
  919. do_warmup = (5 > argc || argv[4][0] != 'N');
  920. ids = GNUNET_CONTAINER_multipeermap_create (2 * peers_total, GNUNET_YES);
  921. GNUNET_assert (NULL != ids);
  922. p_ids = 0;
  923. test_finished = GNUNET_NO;
  924. ports[0] = 1;
  925. ports[1] = 0;
  926. GNUNET_CADET_TEST_run ("cadet-profiler", config_file, peers_total,
  927. &tmain, NULL, /* tmain cls */
  928. &incoming_channel, &channel_cleaner,
  929. handlers, ports);
  930. GNUNET_free (peers);
  931. return 0;
  932. }
  933. /* end of gnunet-cadet-profiler.c */