gnunet-service-testbed_barriers.c 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926
  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2008--2016 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file testbed/gnunet-service-testbed_barriers.c
  18. * @brief barrier handling at the testbed controller
  19. * @author Sree Harsha Totakura <sreeharsha@totakura.in>
  20. */
  21. #include "gnunet-service-testbed.h"
  22. #include "gnunet-service-testbed_barriers.h"
  23. #include "testbed_api.h"
  24. /**
  25. * timeout for outgoing message transmissions in seconds
  26. */
  27. #define MESSAGE_SEND_TIMEOUT(s) \
  28. GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, s)
  29. /**
  30. * Test to see if local peers have reached the required quorum of a barrier
  31. */
  32. #define LOCAL_QUORUM_REACHED(barrier) \
  33. ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100))
  34. #ifdef LOG
  35. #undef LOG
  36. #endif
  37. /**
  38. * Logging shorthand
  39. */
  40. #define LOG(kind, ...) \
  41. GNUNET_log_from (kind, "testbed-barriers", __VA_ARGS__)
  42. /**
  43. * Barrier
  44. */
  45. struct Barrier;
  46. /**
  47. * Context to be associated with each client
  48. */
  49. struct ClientCtx
  50. {
  51. /**
  52. * The barrier this client is waiting for
  53. */
  54. struct Barrier *barrier;
  55. /**
  56. * DLL next ptr
  57. */
  58. struct ClientCtx *next;
  59. /**
  60. * DLL prev ptr
  61. */
  62. struct ClientCtx *prev;
  63. /**
  64. * The client handle
  65. */
  66. struct GNUNET_SERVICE_Client *client;
  67. };
  68. /**
  69. * Wrapper around Barrier handle
  70. */
  71. struct WBarrier
  72. {
  73. /**
  74. * DLL next pointer
  75. */
  76. struct WBarrier *next;
  77. /**
  78. * DLL prev pointer
  79. */
  80. struct WBarrier *prev;
  81. /**
  82. * The local barrier associated with the creation of this wrapper
  83. */
  84. struct Barrier *barrier;
  85. /**
  86. * Handle to the slave controller where this wrapper creates a barrier
  87. */
  88. struct GNUNET_TESTBED_Controller *controller;
  89. /**
  90. * The barrier handle from API
  91. */
  92. struct GNUNET_TESTBED_Barrier *hbarrier;
  93. /**
  94. * Has this barrier been crossed?
  95. */
  96. uint8_t reached;
  97. };
  98. /**
  99. * Barrier
  100. */
  101. struct Barrier
  102. {
  103. /**
  104. * The hashcode of the barrier name
  105. */
  106. struct GNUNET_HashCode hash;
  107. /**
  108. * The client handle to the master controller
  109. */
  110. struct GNUNET_SERVICE_Client *mc;
  111. /**
  112. * The name of the barrier
  113. */
  114. char *name;
  115. /**
  116. * DLL head for the list of clients waiting for this barrier
  117. */
  118. struct ClientCtx *head;
  119. /**
  120. * DLL tail for the list of clients waiting for this barrier
  121. */
  122. struct ClientCtx *tail;
  123. /**
  124. * DLL head for the list of barrier handles
  125. */
  126. struct WBarrier *whead;
  127. /**
  128. * DLL tail for the list of barrier handles
  129. */
  130. struct WBarrier *wtail;
  131. /**
  132. * Identifier for the timeout task
  133. */
  134. struct GNUNET_SCHEDULER_Task *tout_task;
  135. /**
  136. * The status of this barrier
  137. */
  138. enum GNUNET_TESTBED_BarrierStatus status;
  139. /**
  140. * Number of barriers wrapped in the above DLL
  141. */
  142. unsigned int num_wbarriers;
  143. /**
  144. * Number of wrapped barriers reached so far
  145. */
  146. unsigned int num_wbarriers_reached;
  147. /**
  148. * Number of wrapped barrier initialised so far
  149. */
  150. unsigned int num_wbarriers_inited;
  151. /**
  152. * Number of peers which have reached this barrier
  153. */
  154. unsigned int nreached;
  155. /**
  156. * Number of slaves we have initialised this barrier
  157. */
  158. unsigned int nslaves;
  159. /**
  160. * Quorum percentage to be reached
  161. */
  162. uint8_t quorum;
  163. };
  164. /**
  165. * Hashtable handle for storing initialised barriers
  166. */
  167. static struct GNUNET_CONTAINER_MultiHashMap *barrier_map;
  168. /**
  169. * Service context
  170. */
  171. static struct GNUNET_SERVICE_Handle *ctx;
  172. /**
  173. * Function to remove a barrier from the barrier map and cleanup resources
  174. * occupied by a barrier
  175. *
  176. * @param barrier the barrier handle
  177. */
  178. static void
  179. remove_barrier (struct Barrier *barrier)
  180. {
  181. struct ClientCtx *ctx;
  182. GNUNET_assert (GNUNET_YES ==
  183. GNUNET_CONTAINER_multihashmap_remove (barrier_map,
  184. &barrier->hash,
  185. barrier));
  186. while (NULL != (ctx = barrier->head))
  187. {
  188. GNUNET_CONTAINER_DLL_remove (barrier->head,
  189. barrier->tail,
  190. ctx);
  191. ctx->barrier = NULL;
  192. }
  193. GNUNET_free (barrier->name);
  194. GNUNET_free (barrier);
  195. }
  196. /**
  197. * Cancels all subcontroller barrier handles
  198. *
  199. * @param barrier the local barrier
  200. */
  201. static void
  202. cancel_wrappers (struct Barrier *barrier)
  203. {
  204. struct WBarrier *wrapper;
  205. while (NULL != (wrapper = barrier->whead))
  206. {
  207. GNUNET_TESTBED_barrier_cancel (wrapper->hbarrier);
  208. GNUNET_CONTAINER_DLL_remove (barrier->whead,
  209. barrier->wtail,
  210. wrapper);
  211. GNUNET_free (wrapper);
  212. }
  213. }
  214. /**
  215. * Send a status message about a barrier to the given client
  216. *
  217. * @param client the client to send the message to
  218. * @param name the barrier name
  219. * @param status the status of the barrier
  220. * @param emsg the error message; should be non-NULL for
  221. * status=GNUNET_TESTBED_BARRIERSTATUS_ERROR
  222. */
  223. static void
  224. send_client_status_msg (struct GNUNET_SERVICE_Client *client,
  225. const char *name,
  226. enum GNUNET_TESTBED_BarrierStatus status,
  227. const char *emsg)
  228. {
  229. struct GNUNET_MQ_Envelope *env;
  230. struct GNUNET_TESTBED_BarrierStatusMsg *msg;
  231. size_t name_len;
  232. size_t err_len;
  233. GNUNET_assert ((NULL == emsg) ||
  234. (GNUNET_TESTBED_BARRIERSTATUS_ERROR == status));
  235. name_len = strlen (name) + 1;
  236. err_len = ((NULL == emsg) ? 0 : (strlen (emsg) + 1));
  237. env = GNUNET_MQ_msg_extra (msg,
  238. name_len + err_len,
  239. GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
  240. msg->status = htons (status);
  241. msg->name_len = htons ((uint16_t) name_len - 1);
  242. GNUNET_memcpy (msg->data,
  243. name,
  244. name_len);
  245. GNUNET_memcpy (msg->data + name_len,
  246. emsg,
  247. err_len);
  248. GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client),
  249. env);
  250. }
  251. /**
  252. * Sends a barrier failed message
  253. *
  254. * @param barrier the corresponding barrier
  255. * @param emsg the error message; should be non-NULL for
  256. * status=GNUNET_TESTBED_BARRIERSTATUS_ERROR
  257. */
  258. static void
  259. send_barrier_status_msg (struct Barrier *barrier,
  260. const char *emsg)
  261. {
  262. GNUNET_assert (0 != barrier->status);
  263. send_client_status_msg (barrier->mc,
  264. barrier->name,
  265. barrier->status,
  266. emsg);
  267. }
  268. /**
  269. * Check #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages.
  270. *
  271. * @param cls identification of the client
  272. * @param message the actual message
  273. */
  274. static int
  275. check_barrier_wait (void *cls,
  276. const struct GNUNET_TESTBED_BarrierWait *msg)
  277. {
  278. return GNUNET_OK; /* always well-formed */
  279. }
  280. /**
  281. * Message handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages. This
  282. * message should come from peers or a shared helper service using the
  283. * testbed-barrier client API (@see gnunet_testbed_barrier_service.h)
  284. *
  285. * This handler is queued in the main service and will handle the messages sent
  286. * either from the testbed driver or from a high level controller
  287. *
  288. * @param cls identification of the client
  289. * @param message the actual message
  290. */
  291. static void
  292. handle_barrier_wait (void *cls,
  293. const struct GNUNET_TESTBED_BarrierWait *msg)
  294. {
  295. struct ClientCtx *client_ctx = cls;
  296. struct Barrier *barrier;
  297. char *name;
  298. struct GNUNET_HashCode key;
  299. size_t name_len;
  300. uint16_t msize;
  301. msize = ntohs (msg->header.size);
  302. if (NULL == barrier_map)
  303. {
  304. GNUNET_break (0);
  305. GNUNET_SERVICE_client_drop (client_ctx->client);
  306. return;
  307. }
  308. name_len = msize - sizeof(struct GNUNET_TESTBED_BarrierWait);
  309. name = GNUNET_malloc (name_len + 1);
  310. name[name_len] = '\0';
  311. GNUNET_memcpy (name,
  312. msg->name,
  313. name_len);
  314. LOG_DEBUG ("Received BARRIER_WAIT for barrier `%s'\n",
  315. name);
  316. GNUNET_CRYPTO_hash (name,
  317. name_len,
  318. &key);
  319. GNUNET_free (name);
  320. if (NULL == (barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key)))
  321. {
  322. GNUNET_break (0);
  323. GNUNET_SERVICE_client_drop (client_ctx->client);
  324. return;
  325. }
  326. if (NULL != client_ctx->barrier)
  327. {
  328. GNUNET_break (0);
  329. GNUNET_SERVICE_client_drop (client_ctx->client);
  330. return;
  331. }
  332. client_ctx->barrier = barrier;
  333. GNUNET_CONTAINER_DLL_insert_tail (barrier->head,
  334. barrier->tail,
  335. client_ctx);
  336. barrier->nreached++;
  337. if ((barrier->num_wbarriers_reached == barrier->num_wbarriers) &&
  338. (LOCAL_QUORUM_REACHED (barrier)))
  339. {
  340. barrier->status = GNUNET_TESTBED_BARRIERSTATUS_CROSSED;
  341. send_barrier_status_msg (barrier,
  342. NULL);
  343. }
  344. GNUNET_SERVICE_client_continue (client_ctx->client);
  345. }
  346. /**
  347. * Function called when a client connects to the testbed-barrier service.
  348. *
  349. * @param cls NULL
  350. * @param client the connecting client
  351. * @param mq queue to talk to @a client
  352. * @return our `struct ClientCtx`
  353. */
  354. static void *
  355. connect_cb (void *cls,
  356. struct GNUNET_SERVICE_Client *client,
  357. struct GNUNET_MQ_Handle *mq)
  358. {
  359. struct ClientCtx *client_ctx;
  360. LOG_DEBUG ("Client connected to testbed-barrier service\n");
  361. client_ctx = GNUNET_new (struct ClientCtx);
  362. client_ctx->client = client;
  363. return client_ctx;
  364. }
  365. /**
  366. * Functions with this signature are called whenever a client
  367. * is disconnected on the network level.
  368. *
  369. * @param cls closure
  370. * @param client identification of the client; NULL
  371. * for the last call when the server is destroyed
  372. */
  373. static void
  374. disconnect_cb (void *cls,
  375. struct GNUNET_SERVICE_Client *client,
  376. void *app_ctx)
  377. {
  378. struct ClientCtx *client_ctx = app_ctx;
  379. struct Barrier *barrier = client_ctx->barrier;
  380. if (NULL != barrier)
  381. {
  382. GNUNET_CONTAINER_DLL_remove (barrier->head,
  383. barrier->tail,
  384. client_ctx);
  385. client_ctx->barrier = NULL;
  386. }
  387. GNUNET_free (client_ctx);
  388. LOG_DEBUG ("Client disconnected from testbed-barrier service\n");
  389. }
  390. /**
  391. * Function to initialise barrriers component
  392. *
  393. * @param cfg the configuration to use for initialisation
  394. */
  395. void
  396. GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg)
  397. {
  398. struct GNUNET_MQ_MessageHandler message_handlers[] = {
  399. GNUNET_MQ_hd_var_size (barrier_wait,
  400. GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT,
  401. struct GNUNET_TESTBED_BarrierWait,
  402. NULL),
  403. GNUNET_MQ_handler_end ()
  404. };
  405. LOG_DEBUG ("Launching testbed-barrier service\n");
  406. barrier_map = GNUNET_CONTAINER_multihashmap_create (3,
  407. GNUNET_YES);
  408. ctx = GNUNET_SERVICE_start ("testbed-barrier",
  409. cfg,
  410. &connect_cb,
  411. &disconnect_cb,
  412. NULL,
  413. message_handlers);
  414. }
  415. /**
  416. * Iterator over hash map entries.
  417. *
  418. * @param cls closure
  419. * @param key current key code
  420. * @param value value in the hash map
  421. * @return #GNUNET_YES if we should continue to
  422. * iterate,
  423. * #GNUNET_NO if not.
  424. */
  425. static int
  426. barrier_destroy_iterator (void *cls,
  427. const struct GNUNET_HashCode *key,
  428. void *value)
  429. {
  430. struct Barrier *barrier = value;
  431. GNUNET_assert (NULL != barrier);
  432. cancel_wrappers (barrier);
  433. remove_barrier (barrier);
  434. return GNUNET_YES;
  435. }
  436. /**
  437. * Function to stop the barrier service
  438. */
  439. void
  440. GST_barriers_destroy ()
  441. {
  442. GNUNET_assert (NULL != barrier_map);
  443. GNUNET_assert (GNUNET_SYSERR !=
  444. GNUNET_CONTAINER_multihashmap_iterate (barrier_map,
  445. &
  446. barrier_destroy_iterator,
  447. NULL));
  448. GNUNET_CONTAINER_multihashmap_destroy (barrier_map);
  449. GNUNET_assert (NULL != ctx);
  450. GNUNET_SERVICE_stop (ctx);
  451. }
  452. /**
  453. * Functions of this type are to be given as callback argument to
  454. * GNUNET_TESTBED_barrier_init(). The callback will be called when status
  455. * information is available for the barrier.
  456. *
  457. * @param cls the closure given to GNUNET_TESTBED_barrier_init()
  458. * @param name the name of the barrier
  459. * @param b_ the barrier handle
  460. * @param status status of the barrier; #GNUNET_OK if the barrier is crossed;
  461. * #GNUNET_SYSERR upon error
  462. * @param emsg if the status were to be #GNUNET_SYSERR, this parameter has the
  463. * error messsage
  464. */
  465. static void
  466. wbarrier_status_cb (void *cls,
  467. const char *name,
  468. struct GNUNET_TESTBED_Barrier *b_,
  469. enum GNUNET_TESTBED_BarrierStatus status,
  470. const char *emsg)
  471. {
  472. struct WBarrier *wrapper = cls;
  473. struct Barrier *barrier = wrapper->barrier;
  474. GNUNET_assert (b_ == wrapper->hbarrier);
  475. switch (status)
  476. {
  477. case GNUNET_TESTBED_BARRIERSTATUS_ERROR:
  478. LOG (GNUNET_ERROR_TYPE_ERROR,
  479. "Initialising barrier `%s' failed at a sub-controller: %s\n",
  480. barrier->name,
  481. (NULL != emsg) ? emsg : "NULL");
  482. cancel_wrappers (barrier);
  483. if (NULL == emsg)
  484. emsg = "Initialisation failed at a sub-controller";
  485. barrier->status = GNUNET_TESTBED_BARRIERSTATUS_ERROR;
  486. send_barrier_status_msg (barrier, emsg);
  487. return;
  488. case GNUNET_TESTBED_BARRIERSTATUS_CROSSED:
  489. if (GNUNET_TESTBED_BARRIERSTATUS_INITIALISED != barrier->status)
  490. {
  491. GNUNET_break_op (0);
  492. return;
  493. }
  494. barrier->num_wbarriers_reached++;
  495. if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
  496. && (LOCAL_QUORUM_REACHED (barrier)))
  497. {
  498. barrier->status = GNUNET_TESTBED_BARRIERSTATUS_CROSSED;
  499. send_barrier_status_msg (barrier, NULL);
  500. }
  501. return;
  502. case GNUNET_TESTBED_BARRIERSTATUS_INITIALISED:
  503. if (0 != barrier->status)
  504. {
  505. GNUNET_break_op (0);
  506. return;
  507. }
  508. barrier->num_wbarriers_inited++;
  509. if (barrier->num_wbarriers_inited == barrier->num_wbarriers)
  510. {
  511. barrier->status = GNUNET_TESTBED_BARRIERSTATUS_INITIALISED;
  512. send_barrier_status_msg (barrier, NULL);
  513. }
  514. return;
  515. }
  516. }
  517. /**
  518. * Function called upon timeout while waiting for a response from the
  519. * subcontrollers to barrier init message
  520. *
  521. * @param cls barrier
  522. */
  523. static void
  524. fwd_tout_barrier_init (void *cls)
  525. {
  526. struct Barrier *barrier = cls;
  527. cancel_wrappers (barrier);
  528. barrier->status = GNUNET_TESTBED_BARRIERSTATUS_ERROR;
  529. send_barrier_status_msg (barrier,
  530. "Timedout while propagating barrier initialisation\n");
  531. remove_barrier (barrier);
  532. }
  533. /**
  534. * Check #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages.
  535. *
  536. * @param cls identification of the client
  537. * @param msg the actual message
  538. * @return #GNUNET_OK if @a msg is well-formed
  539. */
  540. int
  541. check_barrier_init (void *cls,
  542. const struct GNUNET_TESTBED_BarrierInit *msg)
  543. {
  544. return GNUNET_OK; /* always well-formed */
  545. }
  546. /**
  547. * Message handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages. This
  548. * message should always come from a parent controller or the testbed API if we
  549. * are the root controller.
  550. *
  551. * This handler is queued in the main service and will handle the messages sent
  552. * either from the testbed driver or from a high level controller
  553. *
  554. * @param cls identification of the client
  555. * @param msg the actual message
  556. */
  557. void
  558. handle_barrier_init (void *cls,
  559. const struct GNUNET_TESTBED_BarrierInit *msg)
  560. {
  561. struct GNUNET_SERVICE_Client *client = cls;
  562. char *name;
  563. struct Barrier *barrier;
  564. struct Slave *slave;
  565. struct WBarrier *wrapper;
  566. struct GNUNET_HashCode hash;
  567. size_t name_len;
  568. unsigned int cnt;
  569. uint16_t msize;
  570. if (NULL == GST_context)
  571. {
  572. GNUNET_break_op (0);
  573. GNUNET_SERVICE_client_drop (client);
  574. return;
  575. }
  576. if (client != GST_context->client)
  577. {
  578. GNUNET_break_op (0);
  579. GNUNET_SERVICE_client_drop (client);
  580. return;
  581. }
  582. msize = ntohs (msg->header.size);
  583. name_len = (size_t) msize - sizeof(struct GNUNET_TESTBED_BarrierInit);
  584. name = GNUNET_malloc (name_len + 1);
  585. GNUNET_memcpy (name, msg->name, name_len);
  586. GNUNET_CRYPTO_hash (name, name_len, &hash);
  587. LOG_DEBUG ("Received BARRIER_INIT for barrier `%s'\n",
  588. name);
  589. if (GNUNET_YES ==
  590. GNUNET_CONTAINER_multihashmap_contains (barrier_map,
  591. &hash))
  592. {
  593. send_client_status_msg (client,
  594. name,
  595. GNUNET_TESTBED_BARRIERSTATUS_ERROR,
  596. "A barrier with the same name already exists");
  597. GNUNET_free (name);
  598. GNUNET_SERVICE_client_continue (client);
  599. return;
  600. }
  601. barrier = GNUNET_new (struct Barrier);
  602. barrier->hash = hash;
  603. barrier->quorum = msg->quorum;
  604. barrier->name = name;
  605. barrier->mc = client;
  606. GNUNET_assert (GNUNET_OK ==
  607. GNUNET_CONTAINER_multihashmap_put (barrier_map,
  608. &barrier->hash,
  609. barrier,
  610. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
  611. GNUNET_SERVICE_client_continue (client);
  612. /* Propagate barrier init to subcontrollers */
  613. for (cnt = 0; cnt < GST_slave_list_size; cnt++)
  614. {
  615. if (NULL == (slave = GST_slave_list[cnt]))
  616. continue;
  617. if (NULL == slave->controller)
  618. {
  619. GNUNET_break (0); /* May happen when we are connecting to the controller */
  620. continue;
  621. }
  622. wrapper = GNUNET_new (struct WBarrier);
  623. wrapper->barrier = barrier;
  624. wrapper->controller = slave->controller;
  625. GNUNET_CONTAINER_DLL_insert_tail (barrier->whead,
  626. barrier->wtail,
  627. wrapper);
  628. barrier->num_wbarriers++;
  629. wrapper->hbarrier = GNUNET_TESTBED_barrier_init_ (wrapper->controller,
  630. barrier->name,
  631. barrier->quorum,
  632. &wbarrier_status_cb,
  633. wrapper,
  634. GNUNET_NO);
  635. }
  636. if (NULL == barrier->whead) /* No further propagation */
  637. {
  638. barrier->status = GNUNET_TESTBED_BARRIERSTATUS_INITIALISED;
  639. LOG_DEBUG (
  640. "Sending GNUNET_TESTBED_BARRIERSTATUS_INITIALISED for barrier `%s'\n",
  641. barrier->name);
  642. send_barrier_status_msg (barrier, NULL);
  643. }
  644. else
  645. barrier->tout_task = GNUNET_SCHEDULER_add_delayed (MESSAGE_SEND_TIMEOUT (
  646. 30),
  647. &fwd_tout_barrier_init,
  648. barrier);
  649. }
  650. /**
  651. * Check #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL messages.
  652. *
  653. * @param cls identification of the client
  654. * @param msg the actual message
  655. * @return #GNUNET_OK if @a msg is well-formed
  656. */
  657. int
  658. check_barrier_cancel (void *cls,
  659. const struct GNUNET_TESTBED_BarrierCancel *msg)
  660. {
  661. return GNUNET_OK; /* all are well-formed */
  662. }
  663. /**
  664. * Message handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL messages. This
  665. * message should always come from a parent controller or the testbed API if we
  666. * are the root controller.
  667. *
  668. * This handler is queued in the main service and will handle the messages sent
  669. * either from the testbed driver or from a high level controller
  670. *
  671. * @param cls identification of the client
  672. * @param msg the actual message
  673. */
  674. void
  675. handle_barrier_cancel (void *cls,
  676. const struct GNUNET_TESTBED_BarrierCancel *msg)
  677. {
  678. struct GNUNET_SERVICE_Client *client = cls;
  679. char *name;
  680. struct Barrier *barrier;
  681. struct GNUNET_HashCode hash;
  682. size_t name_len;
  683. uint16_t msize;
  684. if (NULL == GST_context)
  685. {
  686. GNUNET_break_op (0);
  687. GNUNET_SERVICE_client_drop (client);
  688. return;
  689. }
  690. if (client != GST_context->client)
  691. {
  692. GNUNET_break_op (0);
  693. GNUNET_SERVICE_client_drop (client);
  694. return;
  695. }
  696. msize = ntohs (msg->header.size);
  697. name_len = msize - sizeof(struct GNUNET_TESTBED_BarrierCancel);
  698. name = GNUNET_malloc (name_len + 1);
  699. GNUNET_memcpy (name,
  700. msg->name,
  701. name_len);
  702. LOG_DEBUG ("Received BARRIER_CANCEL for barrier `%s'\n",
  703. name);
  704. GNUNET_CRYPTO_hash (name,
  705. name_len,
  706. &hash);
  707. if (GNUNET_NO ==
  708. GNUNET_CONTAINER_multihashmap_contains (barrier_map,
  709. &hash))
  710. {
  711. GNUNET_break_op (0);
  712. GNUNET_SERVICE_client_drop (client);
  713. return;
  714. }
  715. barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map,
  716. &hash);
  717. GNUNET_assert (NULL != barrier);
  718. cancel_wrappers (barrier);
  719. remove_barrier (barrier);
  720. GNUNET_SERVICE_client_continue (client);
  721. }
  722. /**
  723. * Check #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS messages.
  724. *
  725. * @param cls identification of the client
  726. * @param msg the actual message
  727. * @return #GNUNET_OK if @a msg is well-formed
  728. */
  729. int
  730. check_barrier_status (void *cls,
  731. const struct GNUNET_TESTBED_BarrierStatusMsg *msg)
  732. {
  733. uint16_t msize;
  734. uint16_t name_len;
  735. const char *name;
  736. enum GNUNET_TESTBED_BarrierStatus status;
  737. msize = ntohs (msg->header.size) - sizeof(*msg);
  738. status = ntohs (msg->status);
  739. if (GNUNET_TESTBED_BARRIERSTATUS_CROSSED != status)
  740. {
  741. GNUNET_break_op (0); /* current we only expect BARRIER_CROSSED
  742. status message this way */
  743. return GNUNET_SYSERR;
  744. }
  745. name = msg->data;
  746. name_len = ntohs (msg->name_len);
  747. if ((name_len + 1) != msize)
  748. {
  749. GNUNET_break_op (0);
  750. return GNUNET_SYSERR;
  751. }
  752. if ('\0' != name[name_len])
  753. {
  754. GNUNET_break_op (0);
  755. return GNUNET_SYSERR;
  756. }
  757. return GNUNET_OK;
  758. }
  759. /**
  760. * Message handler for #GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS messages.
  761. * This handler is queued in the main service and will handle the messages sent
  762. * either from the testbed driver or from a high level controller
  763. *
  764. * @param cls identification of the client
  765. * @param msg the actual message
  766. */
  767. void
  768. handle_barrier_status (void *cls,
  769. const struct GNUNET_TESTBED_BarrierStatusMsg *msg)
  770. {
  771. struct GNUNET_SERVICE_Client *client = cls;
  772. struct Barrier *barrier;
  773. struct ClientCtx *client_ctx;
  774. struct WBarrier *wrapper;
  775. const char *name;
  776. struct GNUNET_HashCode key;
  777. uint16_t name_len;
  778. struct GNUNET_MQ_Envelope *env;
  779. if (NULL == GST_context)
  780. {
  781. GNUNET_break_op (0);
  782. GNUNET_SERVICE_client_drop (client);
  783. return;
  784. }
  785. if (client != GST_context->client)
  786. {
  787. GNUNET_break_op (0);
  788. GNUNET_SERVICE_client_drop (client);
  789. return;
  790. }
  791. name = msg->data;
  792. name_len = ntohs (msg->name_len);
  793. LOG_DEBUG ("Received BARRIER_STATUS for barrier `%s'\n",
  794. name);
  795. GNUNET_CRYPTO_hash (name,
  796. name_len,
  797. &key);
  798. barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map,
  799. &key);
  800. if (NULL == barrier)
  801. {
  802. GNUNET_break_op (0);
  803. GNUNET_SERVICE_client_drop (client);
  804. return;
  805. }
  806. GNUNET_SERVICE_client_continue (client);
  807. for (client_ctx = barrier->head; NULL != client_ctx; client_ctx =
  808. client_ctx->next) /* Notify peers */
  809. {
  810. env = GNUNET_MQ_msg_copy (&msg->header);
  811. GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client_ctx->client),
  812. env);
  813. }
  814. /**
  815. * The wrapper barriers do not echo the barrier status, so we have to do it
  816. * here
  817. */
  818. for (wrapper = barrier->whead; NULL != wrapper; wrapper = wrapper->next)
  819. {
  820. GNUNET_TESTBED_queue_message_ (wrapper->controller,
  821. GNUNET_copy_message (&msg->header));
  822. }
  823. }
  824. /* end of gnunet-service-testbed_barriers.c */