gnunet-service-testbed_barriers.c 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921
  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2008--2016 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file testbed/gnunet-service-testbed_barriers.c
  18. * @brief barrier handling at the testbed controller
  19. * @author Sree Harsha Totakura <sreeharsha@totakura.in>
  20. */
  21. #include "gnunet-service-testbed.h"
  22. #include "gnunet-service-testbed_barriers.h"
  23. #include "testbed_api.h"
  24. /**
  25. * timeout for outgoing message transmissions in seconds
  26. */
  27. #define MESSAGE_SEND_TIMEOUT(s) \
  28. GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, s)
  29. /**
  30. * Test to see if local peers have reached the required quorum of a barrier
  31. */
  32. #define LOCAL_QUORUM_REACHED(barrier) \
  33. ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100))
  34. #ifdef LOG
  35. #undef LOG
  36. #endif
  37. /**
  38. * Logging shorthand
  39. */
  40. #define LOG(kind,...) \
  41. GNUNET_log_from (kind, "testbed-barriers", __VA_ARGS__)
  42. /**
  43. * Barrier
  44. */
  45. struct Barrier;
  46. /**
  47. * Context to be associated with each client
  48. */
  49. struct ClientCtx
  50. {
  51. /**
  52. * The barrier this client is waiting for
  53. */
  54. struct Barrier *barrier;
  55. /**
  56. * DLL next ptr
  57. */
  58. struct ClientCtx *next;
  59. /**
  60. * DLL prev ptr
  61. */
  62. struct ClientCtx *prev;
  63. /**
  64. * The client handle
  65. */
  66. struct GNUNET_SERVICE_Client *client;
  67. };
  68. /**
  69. * Wrapper around Barrier handle
  70. */
  71. struct WBarrier
  72. {
  73. /**
  74. * DLL next pointer
  75. */
  76. struct WBarrier *next;
  77. /**
  78. * DLL prev pointer
  79. */
  80. struct WBarrier *prev;
  81. /**
  82. * The local barrier associated with the creation of this wrapper
  83. */
  84. struct Barrier *barrier;
  85. /**
  86. * Handle to the slave controller where this wrapper creates a barrier
  87. */
  88. struct GNUNET_TESTBED_Controller *controller;
  89. /**
  90. * The barrier handle from API
  91. */
  92. struct GNUNET_TESTBED_Barrier *hbarrier;
  93. /**
  94. * Has this barrier been crossed?
  95. */
  96. uint8_t reached;
  97. };
  98. /**
  99. * Barrier
  100. */
  101. struct Barrier
  102. {
  103. /**
  104. * The hashcode of the barrier name
  105. */
  106. struct GNUNET_HashCode hash;
  107. /**
  108. * The client handle to the master controller
  109. */
  110. struct GNUNET_SERVICE_Client *mc;
  111. /**
  112. * The name of the barrier
  113. */
  114. char *name;
  115. /**
  116. * DLL head for the list of clients waiting for this barrier
  117. */
  118. struct ClientCtx *head;
  119. /**
  120. * DLL tail for the list of clients waiting for this barrier
  121. */
  122. struct ClientCtx *tail;
  123. /**
  124. * DLL head for the list of barrier handles
  125. */
  126. struct WBarrier *whead;
  127. /**
  128. * DLL tail for the list of barrier handles
  129. */
  130. struct WBarrier *wtail;
  131. /**
  132. * Identifier for the timeout task
  133. */
  134. struct GNUNET_SCHEDULER_Task *tout_task;
  135. /**
  136. * The status of this barrier
  137. */
  138. enum GNUNET_TESTBED_BarrierStatus status;
  139. /**
  140. * Number of barriers wrapped in the above DLL
  141. */
  142. unsigned int num_wbarriers;
  143. /**
  144. * Number of wrapped barriers reached so far
  145. */
  146. unsigned int num_wbarriers_reached;
  147. /**
  148. * Number of wrapped barrier initialised so far
  149. */
  150. unsigned int num_wbarriers_inited;
  151. /**
  152. * Number of peers which have reached this barrier
  153. */
  154. unsigned int nreached;
  155. /**
  156. * Number of slaves we have initialised this barrier
  157. */
  158. unsigned int nslaves;
  159. /**
  160. * Quorum percentage to be reached
  161. */
  162. uint8_t quorum;
  163. };
  164. /**
  165. * Hashtable handle for storing initialised barriers
  166. */
  167. static struct GNUNET_CONTAINER_MultiHashMap *barrier_map;
  168. /**
  169. * Service context
  170. */
  171. static struct GNUNET_SERVICE_Handle *ctx;
  172. /**
  173. * Function to remove a barrier from the barrier map and cleanup resources
  174. * occupied by a barrier
  175. *
  176. * @param barrier the barrier handle
  177. */
  178. static void
  179. remove_barrier (struct Barrier *barrier)
  180. {
  181. struct ClientCtx *ctx;
  182. GNUNET_assert (GNUNET_YES ==
  183. GNUNET_CONTAINER_multihashmap_remove (barrier_map,
  184. &barrier->hash,
  185. barrier));
  186. while (NULL != (ctx = barrier->head))
  187. {
  188. GNUNET_CONTAINER_DLL_remove (barrier->head,
  189. barrier->tail,
  190. ctx);
  191. ctx->barrier = NULL;
  192. }
  193. GNUNET_free (barrier->name);
  194. GNUNET_free (barrier);
  195. }
  196. /**
  197. * Cancels all subcontroller barrier handles
  198. *
  199. * @param barrier the local barrier
  200. */
  201. static void
  202. cancel_wrappers (struct Barrier *barrier)
  203. {
  204. struct WBarrier *wrapper;
  205. while (NULL != (wrapper = barrier->whead))
  206. {
  207. GNUNET_TESTBED_barrier_cancel (wrapper->hbarrier);
  208. GNUNET_CONTAINER_DLL_remove (barrier->whead,
  209. barrier->wtail,
  210. wrapper);
  211. GNUNET_free (wrapper);
  212. }
  213. }
  214. /**
  215. * Send a status message about a barrier to the given client
  216. *
  217. * @param client the client to send the message to
  218. * @param name the barrier name
  219. * @param status the status of the barrier
  220. * @param emsg the error message; should be non-NULL for
  221. * status=GNUNET_TESTBED_BARRIERSTATUS_ERROR
  222. */
  223. static void
  224. send_client_status_msg (struct GNUNET_SERVICE_Client *client,
  225. const char *name,
  226. enum GNUNET_TESTBED_BarrierStatus status,
  227. const char *emsg)
  228. {
  229. struct GNUNET_MQ_Envelope *env;
  230. struct GNUNET_TESTBED_BarrierStatusMsg *msg;
  231. size_t name_len;
  232. size_t err_len;
  233. GNUNET_assert ( (NULL == emsg) ||
  234. (GNUNET_TESTBED_BARRIERSTATUS_ERROR == status) );
  235. name_len = strlen (name) + 1;
  236. err_len = ((NULL == emsg) ? 0 : (strlen (emsg) + 1));
  237. env = GNUNET_MQ_msg_extra (msg,
  238. name_len + err_len,
  239. GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
  240. msg->status = htons (status);
  241. msg->name_len = htons ((uint16_t) name_len - 1);
  242. GNUNET_memcpy (msg->data,
  243. name,
  244. name_len);
  245. GNUNET_memcpy (msg->data + name_len,
  246. emsg,
  247. err_len);
  248. GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
  249. env);
  250. }
  251. /**
  252. * Sends a barrier failed message
  253. *
  254. * @param barrier the corresponding barrier
  255. * @param emsg the error message; should be non-NULL for
  256. * status=GNUNET_TESTBED_BARRIERSTATUS_ERROR
  257. */
  258. static void
  259. send_barrier_status_msg (struct Barrier *barrier,
  260. const char *emsg)
  261. {
  262. GNUNET_assert (0 != barrier->status);
  263. send_client_status_msg (barrier->mc,
  264. barrier->name,
  265. barrier->status,
  266. emsg);
  267. }
  268. /**
  269. * Check #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages.
  270. *
  271. * @param cls identification of the client
  272. * @param message the actual message
  273. */
  274. static int
  275. check_barrier_wait (void *cls,
  276. const struct GNUNET_TESTBED_BarrierWait *msg)
  277. {
  278. return GNUNET_OK; /* always well-formed */
  279. }
  280. /**
  281. * Message handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages. This
  282. * message should come from peers or a shared helper service using the
  283. * testbed-barrier client API (@see gnunet_testbed_barrier_service.h)
  284. *
  285. * This handler is queued in the main service and will handle the messages sent
  286. * either from the testbed driver or from a high level controller
  287. *
  288. * @param cls identification of the client
  289. * @param message the actual message
  290. */
  291. static void
  292. handle_barrier_wait (void *cls,
  293. const struct GNUNET_TESTBED_BarrierWait *msg)
  294. {
  295. struct ClientCtx *client_ctx = cls;
  296. struct Barrier *barrier;
  297. char *name;
  298. struct GNUNET_HashCode key;
  299. size_t name_len;
  300. uint16_t msize;
  301. msize = ntohs (msg->header.size);
  302. if (NULL == barrier_map)
  303. {
  304. GNUNET_break (0);
  305. GNUNET_SERVICE_client_drop (client_ctx->client);
  306. return;
  307. }
  308. name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierWait);
  309. name = GNUNET_malloc (name_len + 1);
  310. name[name_len] = '\0';
  311. GNUNET_memcpy (name,
  312. msg->name,
  313. name_len);
  314. LOG_DEBUG ("Received BARRIER_WAIT for barrier `%s'\n",
  315. name);
  316. GNUNET_CRYPTO_hash (name,
  317. name_len,
  318. &key);
  319. GNUNET_free (name);
  320. if (NULL == (barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key)))
  321. {
  322. GNUNET_break (0);
  323. GNUNET_SERVICE_client_drop (client_ctx->client);
  324. return;
  325. }
  326. if (NULL != client_ctx->barrier)
  327. {
  328. GNUNET_break (0);
  329. GNUNET_SERVICE_client_drop (client_ctx->client);
  330. return;
  331. }
  332. client_ctx->barrier = barrier;
  333. GNUNET_CONTAINER_DLL_insert_tail (barrier->head,
  334. barrier->tail,
  335. client_ctx);
  336. barrier->nreached++;
  337. if ( (barrier->num_wbarriers_reached == barrier->num_wbarriers) &&
  338. (LOCAL_QUORUM_REACHED (barrier)) )
  339. {
  340. barrier->status = GNUNET_TESTBED_BARRIERSTATUS_CROSSED;
  341. send_barrier_status_msg (barrier,
  342. NULL);
  343. }
  344. GNUNET_SERVICE_client_continue (client_ctx->client);
  345. }
  346. /**
  347. * Function called when a client connects to the testbed-barrier service.
  348. *
  349. * @param cls NULL
  350. * @param client the connecting client
  351. * @param mq queue to talk to @a client
  352. * @return our `struct ClientCtx`
  353. */
  354. static void *
  355. connect_cb (void *cls,
  356. struct GNUNET_SERVICE_Client *client,
  357. struct GNUNET_MQ_Handle *mq)
  358. {
  359. struct ClientCtx *client_ctx;
  360. LOG_DEBUG ("Client connected to testbed-barrier service\n");
  361. client_ctx = GNUNET_new (struct ClientCtx);
  362. client_ctx->client = client;
  363. return client_ctx;
  364. }
  365. /**
  366. * Functions with this signature are called whenever a client
  367. * is disconnected on the network level.
  368. *
  369. * @param cls closure
  370. * @param client identification of the client; NULL
  371. * for the last call when the server is destroyed
  372. */
  373. static void
  374. disconnect_cb (void *cls,
  375. struct GNUNET_SERVICE_Client *client,
  376. void *app_ctx)
  377. {
  378. struct ClientCtx *client_ctx = app_ctx;
  379. struct Barrier *barrier = client_ctx->barrier;
  380. if (NULL != barrier)
  381. {
  382. GNUNET_CONTAINER_DLL_remove (barrier->head,
  383. barrier->tail,
  384. client_ctx);
  385. client_ctx->barrier = NULL;
  386. }
  387. GNUNET_free (client_ctx);
  388. LOG_DEBUG ("Client disconnected from testbed-barrier service\n");
  389. }
  390. /**
  391. * Function to initialise barrriers component
  392. *
  393. * @param cfg the configuration to use for initialisation
  394. */
  395. void
  396. GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg)
  397. {
  398. struct GNUNET_MQ_MessageHandler message_handlers[] = {
  399. GNUNET_MQ_hd_var_size (barrier_wait,
  400. GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT,
  401. struct GNUNET_TESTBED_BarrierWait,
  402. NULL),
  403. GNUNET_MQ_handler_end ()
  404. };
  405. LOG_DEBUG ("Launching testbed-barrier service\n");
  406. barrier_map = GNUNET_CONTAINER_multihashmap_create (3,
  407. GNUNET_YES);
  408. ctx = GNUNET_SERVICE_start ("testbed-barrier",
  409. cfg,
  410. &connect_cb,
  411. &disconnect_cb,
  412. NULL,
  413. message_handlers);
  414. }
  415. /**
  416. * Iterator over hash map entries.
  417. *
  418. * @param cls closure
  419. * @param key current key code
  420. * @param value value in the hash map
  421. * @return #GNUNET_YES if we should continue to
  422. * iterate,
  423. * #GNUNET_NO if not.
  424. */
  425. static int
  426. barrier_destroy_iterator (void *cls,
  427. const struct GNUNET_HashCode *key,
  428. void *value)
  429. {
  430. struct Barrier *barrier = value;
  431. GNUNET_assert (NULL != barrier);
  432. cancel_wrappers (barrier);
  433. remove_barrier (barrier);
  434. return GNUNET_YES;
  435. }
  436. /**
  437. * Function to stop the barrier service
  438. */
  439. void
  440. GST_barriers_destroy ()
  441. {
  442. GNUNET_assert (NULL != barrier_map);
  443. GNUNET_assert (GNUNET_SYSERR !=
  444. GNUNET_CONTAINER_multihashmap_iterate (barrier_map,
  445. &barrier_destroy_iterator,
  446. NULL));
  447. GNUNET_CONTAINER_multihashmap_destroy (barrier_map);
  448. GNUNET_assert (NULL != ctx);
  449. GNUNET_SERVICE_stop (ctx);
  450. }
  451. /**
  452. * Functions of this type are to be given as callback argument to
  453. * GNUNET_TESTBED_barrier_init(). The callback will be called when status
  454. * information is available for the barrier.
  455. *
  456. * @param cls the closure given to GNUNET_TESTBED_barrier_init()
  457. * @param name the name of the barrier
  458. * @param b_ the barrier handle
  459. * @param status status of the barrier; #GNUNET_OK if the barrier is crossed;
  460. * #GNUNET_SYSERR upon error
  461. * @param emsg if the status were to be #GNUNET_SYSERR, this parameter has the
  462. * error messsage
  463. */
  464. static void
  465. wbarrier_status_cb (void *cls,
  466. const char *name,
  467. struct GNUNET_TESTBED_Barrier *b_,
  468. enum GNUNET_TESTBED_BarrierStatus status,
  469. const char *emsg)
  470. {
  471. struct WBarrier *wrapper = cls;
  472. struct Barrier *barrier = wrapper->barrier;
  473. GNUNET_assert (b_ == wrapper->hbarrier);
  474. switch (status)
  475. {
  476. case GNUNET_TESTBED_BARRIERSTATUS_ERROR:
  477. LOG (GNUNET_ERROR_TYPE_ERROR,
  478. "Initialising barrier `%s' failed at a sub-controller: %s\n",
  479. barrier->name,
  480. (NULL != emsg) ? emsg : "NULL");
  481. cancel_wrappers (barrier);
  482. if (NULL == emsg)
  483. emsg = "Initialisation failed at a sub-controller";
  484. barrier->status = GNUNET_TESTBED_BARRIERSTATUS_ERROR;
  485. send_barrier_status_msg (barrier, emsg);
  486. return;
  487. case GNUNET_TESTBED_BARRIERSTATUS_CROSSED:
  488. if (GNUNET_TESTBED_BARRIERSTATUS_INITIALISED != barrier->status)
  489. {
  490. GNUNET_break_op (0);
  491. return;
  492. }
  493. barrier->num_wbarriers_reached++;
  494. if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
  495. && (LOCAL_QUORUM_REACHED (barrier)))
  496. {
  497. barrier->status = GNUNET_TESTBED_BARRIERSTATUS_CROSSED;
  498. send_barrier_status_msg (barrier, NULL);
  499. }
  500. return;
  501. case GNUNET_TESTBED_BARRIERSTATUS_INITIALISED:
  502. if (0 != barrier->status)
  503. {
  504. GNUNET_break_op (0);
  505. return;
  506. }
  507. barrier->num_wbarriers_inited++;
  508. if (barrier->num_wbarriers_inited == barrier->num_wbarriers)
  509. {
  510. barrier->status = GNUNET_TESTBED_BARRIERSTATUS_INITIALISED;
  511. send_barrier_status_msg (barrier, NULL);
  512. }
  513. return;
  514. }
  515. }
  516. /**
  517. * Function called upon timeout while waiting for a response from the
  518. * subcontrollers to barrier init message
  519. *
  520. * @param cls barrier
  521. */
  522. static void
  523. fwd_tout_barrier_init (void *cls)
  524. {
  525. struct Barrier *barrier = cls;
  526. cancel_wrappers (barrier);
  527. barrier->status = GNUNET_TESTBED_BARRIERSTATUS_ERROR;
  528. send_barrier_status_msg (barrier,
  529. "Timedout while propagating barrier initialisation\n");
  530. remove_barrier (barrier);
  531. }
  532. /**
  533. * Check #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages.
  534. *
  535. * @param cls identification of the client
  536. * @param msg the actual message
  537. * @return #GNUNET_OK if @a msg is well-formed
  538. */
  539. int
  540. check_barrier_init (void *cls,
  541. const struct GNUNET_TESTBED_BarrierInit *msg)
  542. {
  543. return GNUNET_OK; /* always well-formed */
  544. }
  545. /**
  546. * Message handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages. This
  547. * message should always come from a parent controller or the testbed API if we
  548. * are the root controller.
  549. *
  550. * This handler is queued in the main service and will handle the messages sent
  551. * either from the testbed driver or from a high level controller
  552. *
  553. * @param cls identification of the client
  554. * @param msg the actual message
  555. */
  556. void
  557. handle_barrier_init (void *cls,
  558. const struct GNUNET_TESTBED_BarrierInit *msg)
  559. {
  560. struct GNUNET_SERVICE_Client *client = cls;
  561. char *name;
  562. struct Barrier *barrier;
  563. struct Slave *slave;
  564. struct WBarrier *wrapper;
  565. struct GNUNET_HashCode hash;
  566. size_t name_len;
  567. unsigned int cnt;
  568. uint16_t msize;
  569. if (NULL == GST_context)
  570. {
  571. GNUNET_break_op (0);
  572. GNUNET_SERVICE_client_drop (client);
  573. return;
  574. }
  575. if (client != GST_context->client)
  576. {
  577. GNUNET_break_op (0);
  578. GNUNET_SERVICE_client_drop (client);
  579. return;
  580. }
  581. msize = ntohs (msg->header.size);
  582. name_len = (size_t) msize - sizeof (struct GNUNET_TESTBED_BarrierInit);
  583. name = GNUNET_malloc (name_len + 1);
  584. GNUNET_memcpy (name, msg->name, name_len);
  585. GNUNET_CRYPTO_hash (name, name_len, &hash);
  586. LOG_DEBUG ("Received BARRIER_INIT for barrier `%s'\n",
  587. name);
  588. if (GNUNET_YES ==
  589. GNUNET_CONTAINER_multihashmap_contains (barrier_map,
  590. &hash))
  591. {
  592. send_client_status_msg (client,
  593. name,
  594. GNUNET_TESTBED_BARRIERSTATUS_ERROR,
  595. "A barrier with the same name already exists");
  596. GNUNET_free (name);
  597. GNUNET_SERVICE_client_continue (client);
  598. return;
  599. }
  600. barrier = GNUNET_new (struct Barrier);
  601. barrier->hash = hash;
  602. barrier->quorum = msg->quorum;
  603. barrier->name = name;
  604. barrier->mc = client;
  605. GNUNET_assert (GNUNET_OK ==
  606. GNUNET_CONTAINER_multihashmap_put (barrier_map,
  607. &barrier->hash,
  608. barrier,
  609. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
  610. GNUNET_SERVICE_client_continue (client);
  611. /* Propagate barrier init to subcontrollers */
  612. for (cnt = 0; cnt < GST_slave_list_size; cnt++)
  613. {
  614. if (NULL == (slave = GST_slave_list[cnt]))
  615. continue;
  616. if (NULL == slave->controller)
  617. {
  618. GNUNET_break (0);/* May happen when we are connecting to the controller */
  619. continue;
  620. }
  621. wrapper = GNUNET_new (struct WBarrier);
  622. wrapper->barrier = barrier;
  623. wrapper->controller = slave->controller;
  624. GNUNET_CONTAINER_DLL_insert_tail (barrier->whead,
  625. barrier->wtail,
  626. wrapper);
  627. barrier->num_wbarriers++;
  628. wrapper->hbarrier = GNUNET_TESTBED_barrier_init_ (wrapper->controller,
  629. barrier->name,
  630. barrier->quorum,
  631. &wbarrier_status_cb,
  632. wrapper,
  633. GNUNET_NO);
  634. }
  635. if (NULL == barrier->whead) /* No further propagation */
  636. {
  637. barrier->status = GNUNET_TESTBED_BARRIERSTATUS_INITIALISED;
  638. LOG_DEBUG ("Sending GNUNET_TESTBED_BARRIERSTATUS_INITIALISED for barrier `%s'\n",
  639. barrier->name);
  640. send_barrier_status_msg (barrier, NULL);
  641. }else
  642. barrier->tout_task = GNUNET_SCHEDULER_add_delayed (MESSAGE_SEND_TIMEOUT (30),
  643. &fwd_tout_barrier_init,
  644. barrier);
  645. }
  646. /**
  647. * Check #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL messages.
  648. *
  649. * @param cls identification of the client
  650. * @param msg the actual message
  651. * @return #GNUNET_OK if @a msg is well-formed
  652. */
  653. int
  654. check_barrier_cancel (void *cls,
  655. const struct GNUNET_TESTBED_BarrierCancel *msg)
  656. {
  657. return GNUNET_OK; /* all are well-formed */
  658. }
  659. /**
  660. * Message handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL messages. This
  661. * message should always come from a parent controller or the testbed API if we
  662. * are the root controller.
  663. *
  664. * This handler is queued in the main service and will handle the messages sent
  665. * either from the testbed driver or from a high level controller
  666. *
  667. * @param cls identification of the client
  668. * @param msg the actual message
  669. */
  670. void
  671. handle_barrier_cancel (void *cls,
  672. const struct GNUNET_TESTBED_BarrierCancel *msg)
  673. {
  674. struct GNUNET_SERVICE_Client *client = cls;
  675. char *name;
  676. struct Barrier *barrier;
  677. struct GNUNET_HashCode hash;
  678. size_t name_len;
  679. uint16_t msize;
  680. if (NULL == GST_context)
  681. {
  682. GNUNET_break_op (0);
  683. GNUNET_SERVICE_client_drop (client);
  684. return;
  685. }
  686. if (client != GST_context->client)
  687. {
  688. GNUNET_break_op (0);
  689. GNUNET_SERVICE_client_drop (client);
  690. return;
  691. }
  692. msize = ntohs (msg->header.size);
  693. name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierCancel);
  694. name = GNUNET_malloc (name_len + 1);
  695. GNUNET_memcpy (name,
  696. msg->name,
  697. name_len);
  698. LOG_DEBUG ("Received BARRIER_CANCEL for barrier `%s'\n",
  699. name);
  700. GNUNET_CRYPTO_hash (name,
  701. name_len,
  702. &hash);
  703. if (GNUNET_NO ==
  704. GNUNET_CONTAINER_multihashmap_contains (barrier_map,
  705. &hash))
  706. {
  707. GNUNET_break_op (0);
  708. GNUNET_SERVICE_client_drop (client);
  709. return;
  710. }
  711. barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map,
  712. &hash);
  713. GNUNET_assert (NULL != barrier);
  714. cancel_wrappers (barrier);
  715. remove_barrier (barrier);
  716. GNUNET_SERVICE_client_continue (client);
  717. }
  718. /**
  719. * Check #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS messages.
  720. *
  721. * @param cls identification of the client
  722. * @param msg the actual message
  723. * @return #GNUNET_OK if @a msg is well-formed
  724. */
  725. int
  726. check_barrier_status (void *cls,
  727. const struct GNUNET_TESTBED_BarrierStatusMsg *msg)
  728. {
  729. uint16_t msize;
  730. uint16_t name_len;
  731. const char *name;
  732. enum GNUNET_TESTBED_BarrierStatus status;
  733. msize = ntohs (msg->header.size) - sizeof (*msg);
  734. status = ntohs (msg->status);
  735. if (GNUNET_TESTBED_BARRIERSTATUS_CROSSED != status)
  736. {
  737. GNUNET_break_op (0); /* current we only expect BARRIER_CROSSED
  738. status message this way */
  739. return GNUNET_SYSERR;
  740. }
  741. name = msg->data;
  742. name_len = ntohs (msg->name_len);
  743. if ((name_len + 1) != msize)
  744. {
  745. GNUNET_break_op (0);
  746. return GNUNET_SYSERR;
  747. }
  748. if ('\0' != name[name_len])
  749. {
  750. GNUNET_break_op (0);
  751. return GNUNET_SYSERR;
  752. }
  753. return GNUNET_OK;
  754. }
  755. /**
  756. * Message handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS messages.
  757. * This handler is queued in the main service and will handle the messages sent
  758. * either from the testbed driver or from a high level controller
  759. *
  760. * @param cls identification of the client
  761. * @param msg the actual message
  762. */
  763. void
  764. handle_barrier_status (void *cls,
  765. const struct GNUNET_TESTBED_BarrierStatusMsg *msg)
  766. {
  767. struct GNUNET_SERVICE_Client *client = cls;
  768. struct Barrier *barrier;
  769. struct ClientCtx *client_ctx;
  770. struct WBarrier *wrapper;
  771. const char *name;
  772. struct GNUNET_HashCode key;
  773. uint16_t name_len;
  774. struct GNUNET_MQ_Envelope *env;
  775. if (NULL == GST_context)
  776. {
  777. GNUNET_break_op (0);
  778. GNUNET_SERVICE_client_drop (client);
  779. return;
  780. }
  781. if (client != GST_context->client)
  782. {
  783. GNUNET_break_op (0);
  784. GNUNET_SERVICE_client_drop (client);
  785. return;
  786. }
  787. name = msg->data;
  788. name_len = ntohs (msg->name_len);
  789. LOG_DEBUG ("Received BARRIER_STATUS for barrier `%s'\n",
  790. name);
  791. GNUNET_CRYPTO_hash (name,
  792. name_len,
  793. &key);
  794. barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map,
  795. &key);
  796. if (NULL == barrier)
  797. {
  798. GNUNET_break_op (0);
  799. GNUNET_SERVICE_client_drop (client);
  800. return;
  801. }
  802. GNUNET_SERVICE_client_continue (client);
  803. for(client_ctx = barrier->head; NULL != client_ctx; client_ctx = client_ctx->next) /* Notify peers */
  804. {
  805. env = GNUNET_MQ_msg_copy (&msg->header);
  806. GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client_ctx->client),
  807. env);
  808. }
  809. /**
  810. * The wrapper barriers do not echo the barrier status, so we have to do it
  811. * here
  812. */
  813. for (wrapper = barrier->whead; NULL != wrapper; wrapper = wrapper->next)
  814. {
  815. GNUNET_TESTBED_queue_message_ (wrapper->controller,
  816. GNUNET_copy_message (&msg->header));
  817. }
  818. }
  819. /* end of gnunet-service-testbed_barriers.c */