test_cadet_flow.c 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888
  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2011, 2017 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file cadet/test_cadet_flow.c
  18. * @author Bart Polot
  19. * @author Christian Grothoff
  20. * @brief Test for flow control of CADET service
  21. */
  22. #include <stdio.h>
  23. #include "platform.h"
  24. #include "cadet_test_lib.h"
  25. #include "gnunet_cadet_service.h"
  26. #include "gnunet_statistics_service.h"
  27. #include <gauger.h>
  28. /**
  29. * Ugly workaround to unify data handlers on incoming and outgoing channels.
  30. */
  31. struct CadetTestChannelWrapper
  32. {
  33. /**
  34. * Channel pointer.
  35. */
  36. struct GNUNET_CADET_Channel *ch;
  37. };
  38. /**
  39. * How many messages to send by default.
  40. */
  41. #define TOTAL_PACKETS_DEFAULT 500
  42. /**
  43. * How long until we give up on connecting the peers?
  44. */
  45. #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 120)
  46. /**
  47. * Time to wait by default for stuff that should be rather fast.
  48. */
  49. #define SHORT_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 20)
  50. /**
  51. * How fast do we send messages?
  52. */
  53. #define SEND_INTERVAL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 10)
  54. /**
  55. * How many packets to send.
  56. */
  57. static unsigned int total_packets = TOTAL_PACKETS_DEFAULT;
  58. /**
  59. * Time to wait for fast operations.
  60. */
  61. static struct GNUNET_TIME_Relative short_time;
  62. /**
  63. * Size of each test packet's payload
  64. */
  65. static size_t size_payload = sizeof (uint32_t);
  66. /**
  67. * Operation to get peer ids.
  68. */
  69. static struct GNUNET_TESTBED_Operation *t_op[2];
  70. /**
  71. * Peer ids.
  72. */
  73. static struct GNUNET_PeerIdentity *p_id[2];
  74. /**
  75. * Port ID
  76. */
  77. static struct GNUNET_HashCode port;
  78. /**
  79. * Peer ids counter.
  80. */
  81. static unsigned int p_ids;
  82. /**
  83. * Is the setup initialized?
  84. */
  85. static int initialized;
  86. /**
  87. * Number of payload packes sent.
  88. */
  89. static int data_sent;
  90. /**
  91. * Number of payload packets received.
  92. */
  93. static int data_received;
  94. /**
  95. * Number of payload packed acknowledgements sent.
  96. */
  97. static int ack_sent;
  98. /**
  99. * Number of payload packed explicitly (app level) acknowledged.
  100. */
  101. static int ack_received;
  102. /**
  103. * Total number of peers asked to run.
  104. */
  105. static unsigned int peers_requested = 2;
  106. /**
  107. * Number of currently running peers (should be same as @c peers_requested).
  108. */
  109. static unsigned int peers_running;
  110. /**
  111. * Test context (to shut down).
  112. */
  113. struct GNUNET_CADET_TEST_Context *test_ctx;
  114. /**
  115. * Task called to disconnect peers.
  116. */
  117. static struct GNUNET_SCHEDULER_Task *disconnect_task;
  118. /**
  119. * Task To perform tests
  120. */
  121. static struct GNUNET_SCHEDULER_Task *test_task;
  122. /**
  123. * Task runnining #send_next_msg().
  124. */
  125. static struct GNUNET_SCHEDULER_Task *send_next_msg_task;
  126. /**
  127. * Cadet handle for the root peer
  128. */
  129. static struct GNUNET_CADET_Handle *h1;
  130. /**
  131. * Cadet handle for the first leaf peer
  132. */
  133. static struct GNUNET_CADET_Handle *h2;
  134. /**
  135. * Channel handle for the root peer
  136. */
  137. static struct GNUNET_CADET_Channel *outgoing_ch;
  138. /**
  139. * Channel handle for the dest peer
  140. */
  141. static struct GNUNET_CADET_Channel *incoming_ch;
  142. /**
  143. * Time we started the data transmission (after channel has been established
  144. * and initilized).
  145. */
  146. static struct GNUNET_TIME_Absolute start_time;
  147. /**
  148. * Peers handle.
  149. */
  150. static struct GNUNET_TESTBED_Peer **testbed_peers;
  151. /**
  152. * Statistics operation handle.
  153. */
  154. static struct GNUNET_TESTBED_Operation *stats_op;
  155. /**
  156. * Keepalives sent.
  157. */
  158. static unsigned int ka_sent;
  159. /**
  160. * Keepalives received.
  161. */
  162. static unsigned int ka_received;
  163. /**
  164. * How many messages were dropped by CADET because of full buffers?
  165. */
  166. static unsigned int msg_dropped;
  167. /**
  168. * Show the results of the test (banwidth acheived) and log them to GAUGER
  169. */
  170. static void
  171. show_end_data (void)
  172. {
  173. static struct GNUNET_TIME_Absolute end_time;
  174. static struct GNUNET_TIME_Relative total_time;
  175. end_time = GNUNET_TIME_absolute_get ();
  176. total_time = GNUNET_TIME_absolute_get_difference (start_time, end_time);
  177. FPRINTF (stderr,
  178. "\nResults of test \"%s\"\n",
  179. test_name);
  180. FPRINTF (stderr,
  181. "Test time %s\n",
  182. GNUNET_STRINGS_relative_time_to_string (total_time, GNUNET_YES));
  183. FPRINTF (stderr,
  184. "Test bandwidth: %f kb/s\n",
  185. 4 * total_packets * 1.0 / (total_time.rel_value_us / 1000)); // 4bytes * ms
  186. FPRINTF (stderr,
  187. "Test throughput: %f packets/s\n\n",
  188. total_packets * 1000.0 / (total_time.rel_value_us / 1000)); // packets * ms
  189. GAUGER ("CADET",
  190. test_name,
  191. total_packets * 1000.0 / (total_time.rel_value_us / 1000),
  192. "packets/s");
  193. }
  194. /**
  195. * Shut down peergroup, clean up.
  196. *
  197. * @param cls Closure (unused).
  198. * @param tc Task Context.
  199. */
  200. static void
  201. shutdown_task (void *cls)
  202. {
  203. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  204. "Ending test.\n");
  205. if (NULL != send_next_msg_task)
  206. {
  207. GNUNET_SCHEDULER_cancel (send_next_msg_task);
  208. send_next_msg_task = NULL;
  209. }
  210. if (NULL != test_task)
  211. {
  212. GNUNET_SCHEDULER_cancel (test_task);
  213. test_task = NULL;
  214. }
  215. for (unsigned int i = 0; i < 2; i++)
  216. GNUNET_TESTBED_operation_done (t_op[i]);
  217. if (NULL != outgoing_ch)
  218. {
  219. GNUNET_CADET_channel_destroy (outgoing_ch);
  220. outgoing_ch = NULL;
  221. }
  222. if (NULL != incoming_ch)
  223. {
  224. GNUNET_CADET_channel_destroy (incoming_ch);
  225. incoming_ch = NULL;
  226. }
  227. GNUNET_CADET_TEST_cleanup (test_ctx);
  228. }
  229. /**
  230. * Stats callback. Finish the stats testbed operation and when all stats have
  231. * been iterated, shutdown the test.
  232. *
  233. * @param cls Closure (line number from which termination was requested).
  234. * @param op the operation that has been finished
  235. * @param emsg error message in case the operation has failed; will be NULL if
  236. * operation has executed successfully.
  237. */
  238. static void
  239. stats_cont (void *cls,
  240. struct GNUNET_TESTBED_Operation *op,
  241. const char *emsg)
  242. {
  243. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  244. "KA sent: %u, KA received: %u\n",
  245. ka_sent,
  246. ka_received);
  247. if ((KEEPALIVE == test) && ((ka_sent < 2) || (ka_sent > ka_received + 1)))
  248. {
  249. GNUNET_break (0);
  250. ok--;
  251. }
  252. GNUNET_TESTBED_operation_done (stats_op);
  253. if (NULL != disconnect_task)
  254. GNUNET_SCHEDULER_cancel (disconnect_task);
  255. disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
  256. cls);
  257. }
  258. /**
  259. * Process statistic values.
  260. *
  261. * @param cls closure (line number, unused)
  262. * @param peer the peer the statistic belong to
  263. * @param subsystem name of subsystem that created the statistic
  264. * @param name the name of the datum
  265. * @param value the current value
  266. * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not
  267. * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
  268. */
  269. static int
  270. stats_iterator (void *cls,
  271. const struct GNUNET_TESTBED_Peer *peer,
  272. const char *subsystem,
  273. const char *name,
  274. uint64_t value,
  275. int is_persistent)
  276. {
  277. static const char *s_sent = "# keepalives sent";
  278. static const char *s_recv = "# keepalives received";
  279. static const char *rdrops = "# messages dropped due to full buffer";
  280. static const char *cdrops = "# messages dropped due to slow client";
  281. uint32_t i;
  282. i = GNUNET_TESTBED_get_index (peer);
  283. GNUNET_log (GNUNET_ERROR_TYPE_INFO, "STATS PEER %u - %s [%s]: %llu\n", i,
  284. subsystem, name, (unsigned long long) value);
  285. if (0 == strncmp (s_sent, name, strlen (s_sent)) && 0 == i)
  286. ka_sent = value;
  287. if (0 == strncmp (s_recv, name, strlen (s_recv)) && peers_requested - 1 == i)
  288. ka_received = value;
  289. if (0 == strncmp (rdrops, name, strlen (rdrops)))
  290. msg_dropped += value;
  291. if (0 == strncmp (cdrops, name, strlen (cdrops)))
  292. msg_dropped += value;
  293. return GNUNET_OK;
  294. }
  295. /**
  296. * Task to gather all statistics.
  297. *
  298. * @param cls Closure (line from which the task was scheduled).
  299. */
  300. static void
  301. gather_stats_and_exit (void *cls)
  302. {
  303. long l = (long) cls;
  304. disconnect_task = NULL;
  305. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  306. "gathering statistics from line %ld\n",
  307. l);
  308. if (NULL != outgoing_ch)
  309. {
  310. GNUNET_CADET_channel_destroy (outgoing_ch);
  311. outgoing_ch = NULL;
  312. }
  313. stats_op = GNUNET_TESTBED_get_statistics (peers_running,
  314. testbed_peers,
  315. "cadet",
  316. NULL,
  317. &stats_iterator,
  318. stats_cont,
  319. cls);
  320. }
  321. /**
  322. * Abort test: schedule disconnect and shutdown immediately
  323. *
  324. * @param line Line in the code the abort is requested from (__LINE__).
  325. */
  326. static void
  327. abort_test (long line)
  328. {
  329. if (NULL != disconnect_task)
  330. {
  331. GNUNET_SCHEDULER_cancel (disconnect_task);
  332. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  333. "Aborting test from %ld\n",
  334. line);
  335. disconnect_task =
  336. GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
  337. (void *) line);
  338. }
  339. }
  340. /**
  341. * Send a message on the channel with the appropriate size and payload.
  342. *
  343. * Update the appropriate *_sent counter.
  344. *
  345. * @param channel Channel to send the message on.
  346. */
  347. static void
  348. send_test_message (struct GNUNET_CADET_Channel *channel)
  349. {
  350. struct GNUNET_MQ_Envelope *env;
  351. struct GNUNET_MessageHeader *msg;
  352. uint32_t *data;
  353. int payload;
  354. int size;
  355. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  356. "Sending test message on channel %p\n",
  357. channel);
  358. size = size_payload;
  359. if (GNUNET_NO == initialized)
  360. {
  361. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending INITIALIZER\n");
  362. size += 1000;
  363. payload = data_sent;
  364. if (SPEED_ACK == test) // FIXME unify SPEED_ACK with an initializer
  365. data_sent++;
  366. }
  367. else if (SPEED == test || SPEED_ACK == test)
  368. {
  369. if (get_target_channel() == channel)
  370. {
  371. payload = ack_sent;
  372. size += ack_sent;
  373. ack_sent++;
  374. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  375. "Sending ACK %u [%d bytes]\n",
  376. payload, size);
  377. }
  378. else
  379. {
  380. payload = data_sent;
  381. size += data_sent;
  382. data_sent++;
  383. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  384. "Sending DATA %u [%d bytes]\n",
  385. data_sent, size);
  386. }
  387. }
  388. else if (FORWARD == test)
  389. {
  390. payload = ack_sent;
  391. }
  392. else if (P2P_SIGNAL == test)
  393. {
  394. payload = data_sent;
  395. }
  396. else
  397. {
  398. GNUNET_assert (0);
  399. }
  400. env = GNUNET_MQ_msg_extra (msg, size, GNUNET_MESSAGE_TYPE_DUMMY);
  401. data = (uint32_t *) &msg[1];
  402. *data = htonl (payload);
  403. GNUNET_MQ_send (GNUNET_CADET_get_mq (channel), env);
  404. }
  405. /**
  406. * Task to request a new data transmission in a SPEED test, without waiting
  407. * for previous messages to be sent/arrrive.
  408. *
  409. * @param cls Closure (unused).
  410. */
  411. static void
  412. send_next_msg (void *cls)
  413. {
  414. struct GNUNET_CADET_Channel *channel;
  415. send_next_msg_task = NULL;
  416. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  417. "Sending next message: %d\n",
  418. data_sent);
  419. channel = GNUNET_YES == test_backwards ? incoming_ch : outgoing_ch;
  420. GNUNET_assert (NULL != channel);
  421. GNUNET_assert (SPEED == test);
  422. send_test_message (channel);
  423. if (data_sent < total_packets)
  424. {
  425. /* SPEED test: Send all messages as soon as possible */
  426. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  427. "Scheduling message %d\n",
  428. data_sent + 1);
  429. send_next_msg_task =
  430. GNUNET_SCHEDULER_add_delayed (SEND_INTERVAL,
  431. &send_next_msg,
  432. NULL);
  433. }
  434. }
  435. /**
  436. * Check if payload is sane (size contains payload).
  437. *
  438. * @param cls should match #ch
  439. * @param message The actual message.
  440. * @return #GNUNET_OK to keep the channel open,
  441. * #GNUNET_SYSERR to close it (signal serious error).
  442. */
  443. static int
  444. check_data (void *cls,
  445. const struct GNUNET_MessageHeader *message)
  446. {
  447. return GNUNET_OK; /* all is well-formed */
  448. }
  449. /**
  450. * Function is called whenever a message is received.
  451. *
  452. * @param cls closure (set from GNUNET_CADET_connect(), peer number)
  453. * @param message the actual message
  454. */
  455. static void
  456. handle_data (void *cls,
  457. const struct GNUNET_MessageHeader *message)
  458. {
  459. struct CadetTestChannelWrapper *ch = cls;
  460. struct GNUNET_CADET_Channel *channel = ch->ch;
  461. uint32_t *data;
  462. uint32_t payload;
  463. int *counter;
  464. GNUNET_CADET_receive_done (channel);
  465. counter = get_target_channel () == channel ? &data_received : &ack_received;
  466. if (channel == outgoing_ch)
  467. {
  468. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  469. "Root client got a message.\n");
  470. }
  471. else if (channel == incoming_ch)
  472. {
  473. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  474. "Leaf client got a message.\n");
  475. }
  476. else
  477. {
  478. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  479. "Unknown channel %p.\n",
  480. channel);
  481. GNUNET_assert (0);
  482. }
  483. data = (uint32_t *) &message[1];
  484. payload = ntohl (*data);
  485. if (payload == *counter)
  486. {
  487. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  488. "Payload as expected: %u\n",
  489. payload);
  490. }
  491. else
  492. {
  493. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  494. "Received payload %u, expected: %u\n",
  495. payload, *counter);
  496. }
  497. (*counter)++;
  498. if (get_target_channel () == channel) /* Got "data" */
  499. {
  500. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  501. " received data %u\n",
  502. data_received);
  503. if (data_received < total_packets)
  504. return;
  505. }
  506. else /* Got "ack" */
  507. {
  508. if (SPEED_ACK == test || SPEED == test)
  509. {
  510. GNUNET_log (GNUNET_ERROR_TYPE_INFO, " received ack %u\n", ack_received);
  511. /* Send more data */
  512. send_test_message (channel);
  513. if (ack_received < total_packets && SPEED != test)
  514. return;
  515. if (ok == 2 && SPEED == test)
  516. return;
  517. show_end_data ();
  518. }
  519. if (test == P2P_SIGNAL)
  520. {
  521. GNUNET_CADET_channel_destroy (incoming_ch);
  522. incoming_ch = NULL;
  523. }
  524. else
  525. {
  526. GNUNET_CADET_channel_destroy (outgoing_ch);
  527. outgoing_ch = NULL;
  528. }
  529. }
  530. }
  531. /**
  532. * Method called whenever a peer connects to a port in MQ-based CADET.
  533. *
  534. * @param cls Closure from #GNUNET_CADET_open_port (peer # as long).
  535. * @param channel New handle to the channel.
  536. * @param source Peer that started this channel.
  537. * @return Closure for the incoming @a channel. It's given to:
  538. * - The #GNUNET_CADET_DisconnectEventHandler (given to
  539. * #GNUNET_CADET_open_port) when the channel dies.
  540. * - Each the #GNUNET_MQ_MessageCallback handlers for each message
  541. * received on the @a channel.
  542. */
  543. static void *
  544. connect_handler (void *cls,
  545. struct GNUNET_CADET_Channel *channel,
  546. const struct GNUNET_PeerIdentity *source)
  547. {
  548. struct CadetTestChannelWrapper *ch;
  549. long peer = (long) cls;
  550. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  551. "Incoming channel from %s to %ld: %p\n",
  552. GNUNET_i2s (source),
  553. peer,
  554. channel);
  555. if (peer == peers_requested - 1)
  556. {
  557. if (NULL != incoming_ch)
  558. {
  559. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  560. "Duplicate incoming channel for client %lu\n",
  561. (long) cls);
  562. GNUNET_assert (0);
  563. }
  564. incoming_ch = channel;
  565. }
  566. else
  567. {
  568. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  569. "Incoming channel for unexpected peer #%lu\n",
  570. (long) cls);
  571. GNUNET_assert (0);
  572. }
  573. ch = GNUNET_new (struct CadetTestChannelWrapper);
  574. ch->ch = channel;
  575. return ch;
  576. }
  577. /**
  578. * Function called whenever an MQ-channel is destroyed, even if the destruction
  579. * was requested by #GNUNET_CADET_channel_destroy.
  580. * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
  581. *
  582. * It should clean up any associated state, including cancelling any pending
  583. * transmission on this channel.
  584. *
  585. * @param cls Channel closure (channel wrapper).
  586. * @param channel Connection to the other end (henceforth invalid).
  587. */
  588. static void
  589. disconnect_handler (void *cls,
  590. const struct GNUNET_CADET_Channel *channel)
  591. {
  592. struct CadetTestChannelWrapper *ch_w = cls;
  593. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  594. "Channel disconnected at %d\n",
  595. ok);
  596. GNUNET_assert (ch_w->ch == channel);
  597. if (channel == incoming_ch)
  598. incoming_ch = NULL;
  599. else if (outgoing_ch == channel)
  600. outgoing_ch = NULL;
  601. else
  602. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  603. "Disconnect on unknown channel %p\n",
  604. channel);
  605. if (NULL != disconnect_task)
  606. GNUNET_SCHEDULER_cancel (disconnect_task);
  607. disconnect_task = GNUNET_SCHEDULER_add_now (&gather_stats_and_exit,
  608. (void *) __LINE__);
  609. GNUNET_free (ch_w);
  610. }
  611. /**
  612. * Start the testcase, we know the peers and have handles to CADET.
  613. *
  614. * Testcase continues when the root receives confirmation of connected peers,
  615. * on callback function ch.
  616. *
  617. * @param cls Closure (unused).
  618. */
  619. static void
  620. start_test (void *cls)
  621. {
  622. struct GNUNET_MQ_MessageHandler handlers[] = {
  623. GNUNET_MQ_hd_var_size (data,
  624. GNUNET_MESSAGE_TYPE_DUMMY,
  625. struct GNUNET_MessageHeader,
  626. NULL),
  627. GNUNET_MQ_handler_end ()
  628. };
  629. struct CadetTestChannelWrapper *ch;
  630. enum GNUNET_CADET_ChannelOption flags;
  631. test_task = NULL;
  632. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  633. "In start_test\n");
  634. start_time = GNUNET_TIME_absolute_get ();
  635. ch = GNUNET_new (struct CadetTestChannelWrapper);
  636. outgoing_ch = GNUNET_CADET_channel_create (h1,
  637. ch,
  638. p_id[1],
  639. &port,
  640. flags,
  641. NULL,
  642. &disconnect_handler,
  643. handlers);
  644. ch->ch = outgoing_ch;
  645. GNUNET_assert (NULL == disconnect_task);
  646. disconnect_task
  647. = GNUNET_SCHEDULER_add_delayed (short_time,
  648. &gather_stats_and_exit,
  649. (void *) __LINE__);
  650. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  651. "Sending data initializer on channel %p...\n",
  652. outgoing_ch);
  653. send_test_message (outgoing_ch);
  654. }
  655. /**
  656. * Callback to be called when the requested peer information is available
  657. *
  658. * @param cls the closure from GNUNET_TESTBED_peer_get_information()
  659. * @param op the operation this callback corresponds to
  660. * @param pinfo the result; will be NULL if the operation has failed
  661. * @param emsg error message if the operation has failed;
  662. * NULL if the operation is successfull
  663. */
  664. static void
  665. pi_cb (void *cls,
  666. struct GNUNET_TESTBED_Operation *op,
  667. const struct GNUNET_TESTBED_PeerInformation *pinfo,
  668. const char *emsg)
  669. {
  670. long i = (long) cls;
  671. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  672. "ID callback for %ld\n",
  673. i);
  674. if ( (NULL == pinfo) ||
  675. (NULL != emsg) )
  676. {
  677. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  678. "pi_cb: %s\n",
  679. emsg);
  680. abort_test (__LINE__);
  681. return;
  682. }
  683. p_id[i] = pinfo->result.id;
  684. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  685. "id: %s\n",
  686. GNUNET_i2s (p_id[i]));
  687. p_ids++;
  688. if (p_ids < 2)
  689. return;
  690. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  691. "Got all IDs, starting test\n");
  692. test_task = GNUNET_SCHEDULER_add_now (&start_test,
  693. NULL);
  694. }
  695. /**
  696. * test main: start test when all peers are connected
  697. *
  698. * @param cls Closure.
  699. * @param ctx Argument to give to GNUNET_CADET_TEST_cleanup on test end.
  700. * @param num_peers Number of peers that are running.
  701. * @param peers Array of peers.
  702. * @param cadets Handle to each of the CADETs of the peers.
  703. */
  704. static void
  705. tmain (void *cls,
  706. struct GNUNET_CADET_TEST_Context *ctx,
  707. unsigned int num_peers,
  708. struct GNUNET_TESTBED_Peer **peers,
  709. struct GNUNET_CADET_Handle **cadets)
  710. {
  711. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  712. "test main\n");
  713. test_ctx = ctx;
  714. peers_running = num_peers;
  715. GNUNET_assert (peers_running == peers_requested);
  716. testbed_peers = peers;
  717. h1 = cadets[0];
  718. h2 = cadets[num_peers - 1];
  719. GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
  720. NULL);
  721. p_ids = 0;
  722. t_op[0] = GNUNET_TESTBED_peer_get_information (peers[0],
  723. GNUNET_TESTBED_PIT_IDENTITY,
  724. &pi_cb,
  725. (void *) 0L);
  726. t_op[1] = GNUNET_TESTBED_peer_get_information (peers[num_peers - 1],
  727. GNUNET_TESTBED_PIT_IDENTITY,
  728. &pi_cb,
  729. (void *) 1L);
  730. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  731. "requested peer ids\n");
  732. }
  733. /**
  734. * Main: start test
  735. */
  736. int
  737. main (int argc,
  738. char *argv[])
  739. {
  740. static const struct GNUNET_HashCode *ports[2];
  741. struct GNUNET_MQ_MessageHandler handlers[] = {
  742. GNUNET_MQ_hd_var_size (data,
  743. GNUNET_MESSAGE_TYPE_DUMMY,
  744. struct GNUNET_MessageHeader,
  745. NULL),
  746. GNUNET_MQ_handler_end ()
  747. };
  748. const char *config_file = "test_cadet.conf";
  749. char port_id[] = "test port";
  750. struct GNUNET_GETOPT_CommandLineOption options[] = {
  751. GNUNET_GETOPT_option_relative_time ('t',
  752. "time",
  753. "short_time",
  754. gettext_noop ("set short timeout"),
  755. &short_time),
  756. GNUNET_GETOPT_option_uint ('m',
  757. "messages",
  758. "NUM_MESSAGES",
  759. gettext_noop ("set number of messages to send"),
  760. &total_packets),
  761. GNUNET_GETOPT_option_uint ('p',
  762. "peers",
  763. "NUM_PEERS",
  764. gettext_noop ("number of peers to launch"),
  765. &peers_requested),
  766. GNUNET_GETOPT_OPTION_END
  767. };
  768. GNUNET_log_setup ("test-cadet-flow",
  769. "DEBUG",
  770. NULL);
  771. total_packets = TOTAL_PACKETS;
  772. short_time = SHORT_TIME;
  773. if (-1 == GNUNET_GETOPT_run (argv[0],
  774. options,
  775. argc,
  776. argv))
  777. {
  778. FPRINTF (stderr,
  779. "test failed: problem with CLI parameters\n");
  780. return 1;
  781. }
  782. GNUNET_CRYPTO_hash (port_id,
  783. sizeof (port_id),
  784. &port);
  785. ports[0] = &port;
  786. ports[1] = NULL;
  787. GNUNET_CADET_TEST_ruN ("test_cadet_flow",
  788. config_file,
  789. peers_requested,
  790. &tmain,
  791. NULL, /* tmain cls */
  792. &connect_handler,
  793. NULL,
  794. &disconnect_handler,
  795. handlers,
  796. ports);
  797. return 0;
  798. }
  799. /* end of test_cadet_flow.c */