gnunet-service-testbed_barriers.c 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936
  1. /*
  2. This file is part of GNUnet.
  3. (C) 2008--2013 Christian Grothoff (and other contributing authors)
  4. GNUnet is free software; you can redistribute it and/or modify
  5. it under the terms of the GNU General Public License as published
  6. by the Free Software Foundation; either version 3, or (at your
  7. option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. General Public License for more details.
  12. You should have received a copy of the GNU General Public License
  13. along with GNUnet; see the file COPYING. If not, write to the
  14. Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  15. Boston, MA 02111-1307, USA.
  16. */
  17. /**
  18. * @file testbed/gnunet-service-testbed_barriers.c
  19. * @brief barrier handling at the testbed controller
  20. * @author Sree Harsha Totakura <sreeharsha@totakura.in>
  21. */
  22. #include "gnunet-service-testbed.h"
  23. #include "gnunet-service-testbed_barriers.h"
  24. #include "testbed_api_barriers.h"
  25. /**
  26. * timeout for outgoing message transmissions in seconds
  27. */
  28. #define MESSAGE_SEND_TIMEOUT(s) \
  29. GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, s)
  30. /**
  31. * Test to see if local peers have reached the required quorum of a barrier
  32. */
  33. #define LOCAL_QUORUM_REACHED(barrier) \
  34. ((barrier->quorum * GST_num_local_peers) <= (barrier->nreached * 100))
  35. #ifdef LOG
  36. #undef LOG
  37. #endif
  38. /**
  39. * Logging shorthand
  40. */
  41. #define LOG(kind,...) \
  42. GNUNET_log_from (kind, "testbed-barriers", __VA_ARGS__)
  43. /**
  44. * Barrier
  45. */
  46. struct Barrier;
  47. /**
  48. * Message queue for transmitting messages
  49. */
  50. struct MessageQueue
  51. {
  52. /**
  53. * next pointer for DLL
  54. */
  55. struct MessageQueue *next;
  56. /**
  57. * prev pointer for DLL
  58. */
  59. struct MessageQueue *prev;
  60. /**
  61. * The message to be sent
  62. */
  63. struct GNUNET_MessageHeader *msg;
  64. };
  65. /**
  66. * Context to be associated with each client
  67. */
  68. struct ClientCtx
  69. {
  70. /**
  71. * The barrier this client is waiting for
  72. */
  73. struct Barrier *barrier;
  74. /**
  75. * DLL next ptr
  76. */
  77. struct ClientCtx *next;
  78. /**
  79. * DLL prev ptr
  80. */
  81. struct ClientCtx *prev;
  82. /**
  83. * The client handle
  84. */
  85. struct GNUNET_SERVER_Client *client;
  86. /**
  87. * the transmission handle
  88. */
  89. struct GNUNET_SERVER_TransmitHandle *tx;
  90. /**
  91. * message queue head
  92. */
  93. struct MessageQueue *mq_head;
  94. /**
  95. * message queue tail
  96. */
  97. struct MessageQueue *mq_tail;
  98. };
  99. /**
  100. * Wrapper around Barrier handle
  101. */
  102. struct WBarrier
  103. {
  104. /**
  105. * DLL next pointer
  106. */
  107. struct WBarrier *next;
  108. /**
  109. * DLL prev pointer
  110. */
  111. struct WBarrier *prev;
  112. /**
  113. * The local barrier associated with the creation of this wrapper
  114. */
  115. struct Barrier *barrier;
  116. /**
  117. * The barrier handle from API
  118. */
  119. struct GNUNET_TESTBED_Barrier *hbarrier;
  120. /**
  121. * Has this barrier been crossed?
  122. */
  123. uint8_t reached;
  124. };
  125. /**
  126. * Barrier
  127. */
  128. struct Barrier
  129. {
  130. /**
  131. * The hashcode of the barrier name
  132. */
  133. struct GNUNET_HashCode hash;
  134. /**
  135. * The client handle to the master controller
  136. */
  137. struct GNUNET_SERVER_Client *mc;
  138. /**
  139. * The name of the barrier
  140. */
  141. char *name;
  142. /**
  143. * DLL head for the list of clients waiting for this barrier
  144. */
  145. struct ClientCtx *head;
  146. /**
  147. * DLL tail for the list of clients waiting for this barrier
  148. */
  149. struct ClientCtx *tail;
  150. /**
  151. * DLL head for the list of barrier handles
  152. */
  153. struct WBarrier *whead;
  154. /**
  155. * DLL tail for the list of barrier handles
  156. */
  157. struct WBarrier *wtail;
  158. /**
  159. * Identifier for the timeout task
  160. */
  161. GNUNET_SCHEDULER_TaskIdentifier tout_task;
  162. /**
  163. * The status of this barrier
  164. */
  165. enum GNUNET_TESTBED_BarrierStatus status;
  166. /**
  167. * Number of barriers wrapped in the above DLL
  168. */
  169. unsigned int num_wbarriers;
  170. /**
  171. * Number of wrapped barriers reached so far
  172. */
  173. unsigned int num_wbarriers_reached;
  174. /**
  175. * Number of wrapped barrier initialised so far
  176. */
  177. unsigned int num_wbarriers_inited;
  178. /**
  179. * Number of peers which have reached this barrier
  180. */
  181. unsigned int nreached;
  182. /**
  183. * Number of slaves we have initialised this barrier
  184. */
  185. unsigned int nslaves;
  186. /**
  187. * Quorum percentage to be reached
  188. */
  189. uint8_t quorum;
  190. };
  191. /**
  192. * Hashtable handle for storing initialised barriers
  193. */
  194. static struct GNUNET_CONTAINER_MultiHashMap *barrier_map;
  195. /**
  196. * Service context
  197. */
  198. static struct GNUNET_SERVICE_Context *ctx;
  199. /**
  200. * Function called to notify a client about the connection
  201. * begin ready to queue more data. "buf" will be
  202. * NULL and "size" zero if the connection was closed for
  203. * writing in the meantime.
  204. *
  205. * @param cls client context
  206. * @param size number of bytes available in buf
  207. * @param buf where the callee should write the message
  208. * @return number of bytes written to buf
  209. */
  210. static size_t
  211. transmit_ready_cb (void *cls, size_t size, void *buf)
  212. {
  213. struct ClientCtx *ctx = cls;
  214. struct GNUNET_SERVER_Client *client = ctx->client;
  215. struct MessageQueue *mq;
  216. struct GNUNET_MessageHeader *msg;
  217. size_t wrote;
  218. ctx->tx = NULL;
  219. if ((0 == size) || (NULL == buf))
  220. {
  221. GNUNET_assert (NULL != ctx->client);
  222. GNUNET_SERVER_client_drop (ctx->client);
  223. ctx->client = NULL;
  224. return 0;
  225. }
  226. mq = ctx->mq_head;
  227. msg = mq->msg;
  228. wrote = ntohs (msg->size);
  229. GNUNET_assert (size >= wrote);
  230. (void) memcpy (buf, msg, wrote);
  231. GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
  232. GNUNET_free (mq->msg);
  233. GNUNET_free (mq);
  234. if (NULL != (mq = ctx->mq_head))
  235. ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
  236. MESSAGE_SEND_TIMEOUT (30),
  237. &transmit_ready_cb, ctx);
  238. return wrote;
  239. }
  240. /**
  241. * Queue a message into a clients message queue
  242. *
  243. * @param ctx the context associated with the client
  244. * @param msg the message to queue. Will be consumed
  245. */
  246. static void
  247. queue_message (struct ClientCtx *ctx, struct GNUNET_MessageHeader *msg)
  248. {
  249. struct MessageQueue *mq;
  250. struct GNUNET_SERVER_Client *client = ctx->client;
  251. mq = GNUNET_new (struct MessageQueue);
  252. mq->msg = msg;
  253. LOG_DEBUG ("Queueing message of type %u, size %u for sending\n",
  254. ntohs (msg->type), ntohs (msg->size));
  255. GNUNET_CONTAINER_DLL_insert_tail (ctx->mq_head, ctx->mq_tail, mq);
  256. if (NULL == ctx->tx)
  257. ctx->tx = GNUNET_SERVER_notify_transmit_ready (client, ntohs (msg->size),
  258. MESSAGE_SEND_TIMEOUT (30),
  259. &transmit_ready_cb, ctx);
  260. }
  261. /**
  262. * Function to cleanup client context data structure
  263. *
  264. * @param ctx the client context data structure
  265. */
  266. static void
  267. cleanup_clientctx (struct ClientCtx *ctx)
  268. {
  269. struct MessageQueue *mq;
  270. if (NULL != ctx->client)
  271. {
  272. GNUNET_SERVER_client_set_user_context_ (ctx->client, NULL, 0);
  273. GNUNET_SERVER_client_drop (ctx->client);
  274. }
  275. if (NULL != ctx->tx)
  276. GNUNET_SERVER_notify_transmit_ready_cancel (ctx->tx);
  277. if (NULL != (mq = ctx->mq_head))
  278. {
  279. GNUNET_CONTAINER_DLL_remove (ctx->mq_head, ctx->mq_tail, mq);
  280. GNUNET_free (mq->msg);
  281. GNUNET_free (mq);
  282. }
  283. GNUNET_free (ctx);
  284. }
  285. /**
  286. * Function to remove a barrier from the barrier map and cleanup resources
  287. * occupied by a barrier
  288. *
  289. * @param barrier the barrier handle
  290. */
  291. static void
  292. remove_barrier (struct Barrier *barrier)
  293. {
  294. struct ClientCtx *ctx;
  295. GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (barrier_map,
  296. &barrier->hash,
  297. barrier));
  298. while (NULL != (ctx = barrier->head))
  299. {
  300. GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, ctx);
  301. cleanup_clientctx (ctx);
  302. }
  303. GNUNET_free (barrier->name);
  304. GNUNET_SERVER_client_drop (barrier->mc);
  305. GNUNET_free (barrier);
  306. }
  307. /**
  308. * Cancels all subcontroller barrier handles
  309. *
  310. * @param barrier the local barrier
  311. */
  312. static void
  313. cancel_wrappers (struct Barrier *barrier)
  314. {
  315. struct WBarrier *wrapper;
  316. while (NULL != (wrapper = barrier->whead))
  317. {
  318. GNUNET_TESTBED_barrier_cancel (wrapper->hbarrier);
  319. GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
  320. GNUNET_free (wrapper);
  321. }
  322. }
  323. /**
  324. * Send a status message about a barrier to the given client
  325. *
  326. * @param client the client to send the message to
  327. * @param name the barrier name
  328. * @param status the status of the barrier
  329. * @param emsg the error message; should be non-NULL for
  330. * status=GNUNET_TESTBED_BARRIERSTATUS_ERROR
  331. */
  332. static void
  333. send_client_status_msg (struct GNUNET_SERVER_Client *client,
  334. const char *name,
  335. enum GNUNET_TESTBED_BarrierStatus status,
  336. const char *emsg)
  337. {
  338. struct GNUNET_TESTBED_BarrierStatusMsg *msg;
  339. size_t name_len;
  340. uint16_t msize;
  341. GNUNET_assert ((NULL == emsg) || (GNUNET_TESTBED_BARRIERSTATUS_ERROR == status));
  342. name_len = strlen (name);
  343. msize = sizeof (struct GNUNET_TESTBED_BarrierStatusMsg)
  344. + (name_len + 1)
  345. + ((NULL == emsg) ? 0 : (strlen (emsg) + 1));
  346. msg = GNUNET_malloc (msize);
  347. msg->header.size = htons (msize);
  348. msg->header.type = htons (GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS);
  349. msg->status = htons (status);
  350. msg->name_len = htons ((uint16_t) name_len);
  351. (void) memcpy (msg->data, name, name_len);
  352. if (NULL != emsg)
  353. (void) memcpy (msg->data + name_len + 1, emsg, strlen (emsg));
  354. GST_queue_message (client, &msg->header);
  355. }
  356. /**
  357. * Sends a barrier failed message
  358. *
  359. * @param barrier the corresponding barrier
  360. * @param emsg the error message; should be non-NULL for
  361. * status=GNUNET_TESTBED_BARRIERSTATUS_ERROR
  362. */
  363. static void
  364. send_barrier_status_msg (struct Barrier *barrier, const char *emsg)
  365. {
  366. GNUNET_assert (0 != barrier->status);
  367. send_client_status_msg (barrier->mc, barrier->name, barrier->status, emsg);
  368. }
  369. /**
  370. * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT messages. This
  371. * message should come from peers or a shared helper service using the
  372. * testbed-barrier client API (@see gnunet_testbed_barrier_service.h)
  373. *
  374. * This handler is queued in the main service and will handle the messages sent
  375. * either from the testbed driver or from a high level controller
  376. *
  377. * @param cls NULL
  378. * @param client identification of the client
  379. * @param message the actual message
  380. */
  381. static void
  382. handle_barrier_wait (void *cls, struct GNUNET_SERVER_Client *client,
  383. const struct GNUNET_MessageHeader *message)
  384. {
  385. const struct GNUNET_TESTBED_BarrierWait *msg;
  386. struct Barrier *barrier;
  387. char *name;
  388. struct ClientCtx *client_ctx;
  389. struct GNUNET_HashCode key;
  390. size_t name_len;
  391. uint16_t msize;
  392. msize = ntohs (message->size);
  393. if (msize <= sizeof (struct GNUNET_TESTBED_BarrierWait))
  394. {
  395. GNUNET_break_op (0);
  396. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  397. return;
  398. }
  399. if (NULL == barrier_map)
  400. {
  401. GNUNET_break (0);
  402. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  403. return;
  404. }
  405. msg = (const struct GNUNET_TESTBED_BarrierWait *) message;
  406. name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierWait);
  407. name = GNUNET_malloc (name_len + 1);
  408. name[name_len] = '\0';
  409. (void) memcpy (name, msg->name, name_len);
  410. LOG_DEBUG ("Received BARRIER_WAIT for barrier `%s'\n", name);
  411. GNUNET_CRYPTO_hash (name, name_len, &key);
  412. GNUNET_free (name);
  413. if (NULL == (barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key)))
  414. {
  415. GNUNET_break (0);
  416. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  417. return;
  418. }
  419. client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
  420. if (NULL == client_ctx)
  421. {
  422. client_ctx = GNUNET_new (struct ClientCtx);
  423. client_ctx->client = client;
  424. GNUNET_SERVER_client_keep (client);
  425. client_ctx->barrier = barrier;
  426. GNUNET_CONTAINER_DLL_insert_tail (barrier->head, barrier->tail, client_ctx);
  427. GNUNET_SERVER_client_set_user_context (client, client_ctx);
  428. }
  429. barrier->nreached++;
  430. if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
  431. && (LOCAL_QUORUM_REACHED (barrier)))
  432. {
  433. barrier->status = GNUNET_TESTBED_BARRIERSTATUS_CROSSED;
  434. send_barrier_status_msg (barrier, NULL);
  435. }
  436. GNUNET_SERVER_receive_done (client, GNUNET_OK);
  437. }
  438. /**
  439. * Functions with this signature are called whenever a client
  440. * is disconnected on the network level.
  441. *
  442. * @param cls closure
  443. * @param client identification of the client; NULL
  444. * for the last call when the server is destroyed
  445. */
  446. static void
  447. disconnect_cb (void *cls, struct GNUNET_SERVER_Client *client)
  448. {
  449. struct ClientCtx *client_ctx;
  450. if (NULL == client)
  451. return;
  452. client_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientCtx);
  453. if (NULL == client_ctx)
  454. return;
  455. cleanup_clientctx (client_ctx);
  456. }
  457. /**
  458. * Function to initialise barrriers component
  459. *
  460. * @param cfg the configuration to use for initialisation
  461. */
  462. void
  463. GST_barriers_init (struct GNUNET_CONFIGURATION_Handle *cfg)
  464. {
  465. static const struct GNUNET_SERVER_MessageHandler message_handlers[] = {
  466. {&handle_barrier_wait, NULL, GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_WAIT, 0},
  467. {NULL, NULL, 0, 0}
  468. };
  469. struct GNUNET_SERVER_Handle *srv;
  470. barrier_map = GNUNET_CONTAINER_multihashmap_create (3, GNUNET_YES);
  471. ctx = GNUNET_SERVICE_start ("testbed-barrier", cfg,
  472. GNUNET_SERVICE_OPTION_MANUAL_SHUTDOWN);
  473. srv = GNUNET_SERVICE_get_server (ctx);
  474. GNUNET_SERVER_add_handlers (srv, message_handlers);
  475. GNUNET_SERVER_disconnect_notify (srv, &disconnect_cb, NULL);
  476. }
  477. /**
  478. * Iterator over hash map entries.
  479. *
  480. * @param cls closure
  481. * @param key current key code
  482. * @param value value in the hash map
  483. * @return #GNUNET_YES if we should continue to
  484. * iterate,
  485. * #GNUNET_NO if not.
  486. */
  487. static int
  488. barrier_destroy_iterator (void *cls,
  489. const struct GNUNET_HashCode *key,
  490. void *value)
  491. {
  492. struct Barrier *barrier = value;
  493. GNUNET_assert (NULL != barrier);
  494. cancel_wrappers (barrier);
  495. remove_barrier (barrier);
  496. return GNUNET_YES;
  497. }
  498. /**
  499. * Function to stop the barrier service
  500. */
  501. void
  502. GST_barriers_destroy ()
  503. {
  504. GNUNET_assert (NULL != barrier_map);
  505. GNUNET_assert (GNUNET_SYSERR !=
  506. GNUNET_CONTAINER_multihashmap_iterate (barrier_map,
  507. &barrier_destroy_iterator,
  508. NULL));
  509. GNUNET_CONTAINER_multihashmap_destroy (barrier_map);
  510. GNUNET_assert (NULL != ctx);
  511. GNUNET_SERVICE_stop (ctx);
  512. }
  513. /**
  514. * Functions of this type are to be given as callback argument to
  515. * GNUNET_TESTBED_barrier_init(). The callback will be called when status
  516. * information is available for the barrier.
  517. *
  518. * @param cls the closure given to GNUNET_TESTBED_barrier_init()
  519. * @param name the name of the barrier
  520. * @param b_ the barrier handle
  521. * @param status status of the barrier; GNUNET_OK if the barrier is crossed;
  522. * GNUNET_SYSERR upon error
  523. * @param emsg if the status were to be GNUNET_SYSERR, this parameter has the
  524. * error messsage
  525. */
  526. static void
  527. wbarrier_status_cb (void *cls, const char *name,
  528. struct GNUNET_TESTBED_Barrier *b_,
  529. enum GNUNET_TESTBED_BarrierStatus status,
  530. const char *emsg)
  531. {
  532. struct WBarrier *wrapper = cls;
  533. struct Barrier *barrier = wrapper->barrier;
  534. GNUNET_assert (b_ == wrapper->hbarrier);
  535. wrapper->hbarrier = NULL;
  536. GNUNET_CONTAINER_DLL_remove (barrier->whead, barrier->wtail, wrapper);
  537. GNUNET_free (wrapper);
  538. switch (status)
  539. {
  540. case GNUNET_TESTBED_BARRIERSTATUS_ERROR:
  541. LOG (GNUNET_ERROR_TYPE_ERROR,
  542. "Initialising barrier `%s' failed at a sub-controller: %s\n",
  543. barrier->name, (NULL != emsg) ? emsg : "NULL");
  544. cancel_wrappers (barrier);
  545. if (NULL == emsg)
  546. emsg = "Initialisation failed at a sub-controller";
  547. barrier->status = GNUNET_TESTBED_BARRIERSTATUS_ERROR;
  548. send_barrier_status_msg (barrier, emsg);
  549. return;
  550. case GNUNET_TESTBED_BARRIERSTATUS_CROSSED:
  551. if (GNUNET_TESTBED_BARRIERSTATUS_INITIALISED != barrier->status)
  552. {
  553. GNUNET_break_op (0);
  554. return;
  555. }
  556. barrier->num_wbarriers_reached++;
  557. if ((barrier->num_wbarriers_reached == barrier->num_wbarriers)
  558. && (LOCAL_QUORUM_REACHED (barrier)))
  559. {
  560. barrier->status = GNUNET_TESTBED_BARRIERSTATUS_CROSSED;
  561. send_barrier_status_msg (barrier, NULL);
  562. }
  563. return;
  564. case GNUNET_TESTBED_BARRIERSTATUS_INITIALISED:
  565. if (0 != barrier->status)
  566. {
  567. GNUNET_break_op (0);
  568. return;
  569. }
  570. barrier->num_wbarriers_inited++;
  571. if (barrier->num_wbarriers_inited == barrier->num_wbarriers)
  572. {
  573. barrier->status = GNUNET_TESTBED_BARRIERSTATUS_INITIALISED;
  574. send_barrier_status_msg (barrier, NULL);
  575. }
  576. return;
  577. }
  578. }
  579. /**
  580. * Function called upon timeout while waiting for a response from the
  581. * subcontrollers to barrier init message
  582. *
  583. * @param cls barrier
  584. * @param tc scheduler task context
  585. */
  586. static void
  587. fwd_tout_barrier_init (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  588. {
  589. struct Barrier *barrier = cls;
  590. cancel_wrappers (barrier);
  591. barrier->status = GNUNET_TESTBED_BARRIERSTATUS_ERROR;
  592. send_barrier_status_msg (barrier,
  593. "Timedout while propagating barrier initialisation\n");
  594. remove_barrier (barrier);
  595. }
  596. /**
  597. * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_INIT messages. This
  598. * message should always come from a parent controller or the testbed API if we
  599. * are the root controller.
  600. *
  601. * This handler is queued in the main service and will handle the messages sent
  602. * either from the testbed driver or from a high level controller
  603. *
  604. * @param cls NULL
  605. * @param client identification of the client
  606. * @param message the actual message
  607. */
  608. void
  609. GST_handle_barrier_init (void *cls, struct GNUNET_SERVER_Client *client,
  610. const struct GNUNET_MessageHeader *message)
  611. {
  612. const struct GNUNET_TESTBED_BarrierInit *msg;
  613. char *name;
  614. struct Barrier *barrier;
  615. struct Slave *slave;
  616. struct WBarrier *wrapper;
  617. struct GNUNET_HashCode hash;
  618. size_t name_len;
  619. unsigned int cnt;
  620. uint16_t msize;
  621. if (NULL == GST_context)
  622. {
  623. GNUNET_break_op (0);
  624. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  625. return;
  626. }
  627. if (client != GST_context->client)
  628. {
  629. GNUNET_break_op (0);
  630. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  631. return;
  632. }
  633. msize = ntohs (message->size);
  634. if (msize <= sizeof (struct GNUNET_TESTBED_BarrierInit))
  635. {
  636. GNUNET_break_op (0);
  637. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  638. return;
  639. }
  640. msg = (const struct GNUNET_TESTBED_BarrierInit *) message;
  641. name_len = (size_t) msize - sizeof (struct GNUNET_TESTBED_BarrierInit);
  642. name = GNUNET_malloc (name_len + 1);
  643. (void) memcpy (name, msg->name, name_len);
  644. GNUNET_CRYPTO_hash (name, name_len, &hash);
  645. LOG_DEBUG ("Received BARRIER_INIT for barrier `%s'\n", name);
  646. if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
  647. {
  648. send_client_status_msg (client, name, GNUNET_TESTBED_BARRIERSTATUS_ERROR,
  649. "A barrier with the same name already exists");
  650. GNUNET_free (name);
  651. GNUNET_SERVER_receive_done (client, GNUNET_OK);
  652. return;
  653. }
  654. barrier = GNUNET_new (struct Barrier);
  655. (void) memcpy (&barrier->hash, &hash, sizeof (struct GNUNET_HashCode));
  656. barrier->quorum = msg->quorum;
  657. barrier->name = name;
  658. barrier->mc = client;
  659. GNUNET_SERVER_client_keep (client);
  660. GNUNET_assert (GNUNET_OK ==
  661. GNUNET_CONTAINER_multihashmap_put (barrier_map,
  662. &barrier->hash,
  663. barrier,
  664. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
  665. GNUNET_SERVER_receive_done (client, GNUNET_OK);
  666. /* Propagate barrier init to subcontrollers */
  667. for (cnt = 0; cnt < GST_slave_list_size; cnt++)
  668. {
  669. if (NULL == (slave = GST_slave_list[cnt]))
  670. continue;
  671. if (NULL == slave->controller)
  672. {
  673. GNUNET_break (0);/* May happen when we are connecting to the controller */
  674. continue;
  675. }
  676. wrapper = GNUNET_new (struct WBarrier);
  677. wrapper->barrier = barrier;
  678. GNUNET_CONTAINER_DLL_insert_tail (barrier->whead, barrier->wtail, wrapper);
  679. wrapper->hbarrier = GNUNET_TESTBED_barrier_init_ (slave->controller,
  680. barrier->name,
  681. barrier->quorum,
  682. &wbarrier_status_cb,
  683. wrapper,
  684. GNUNET_NO);
  685. }
  686. if (NULL == barrier->whead) /* No further propagation */
  687. {
  688. barrier->status = GNUNET_TESTBED_BARRIERSTATUS_INITIALISED;
  689. LOG_DEBUG ("Sending GNUNET_TESTBED_BARRIERSTATUS_INITIALISED for barrier `%s'\n",
  690. barrier->name);
  691. send_barrier_status_msg (barrier, NULL);
  692. }else
  693. barrier->tout_task = GNUNET_SCHEDULER_add_delayed (MESSAGE_SEND_TIMEOUT (30),
  694. &fwd_tout_barrier_init,
  695. barrier);
  696. }
  697. /**
  698. * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_CANCEL messages. This
  699. * message should always come from a parent controller or the testbed API if we
  700. * are the root controller.
  701. *
  702. * This handler is queued in the main service and will handle the messages sent
  703. * either from the testbed driver or from a high level controller
  704. *
  705. * @param cls NULL
  706. * @param client identification of the client
  707. * @param message the actual message
  708. */
  709. void
  710. GST_handle_barrier_cancel (void *cls, struct GNUNET_SERVER_Client *client,
  711. const struct GNUNET_MessageHeader *message)
  712. {
  713. const struct GNUNET_TESTBED_BarrierCancel *msg;
  714. char *name;
  715. struct Barrier *barrier;
  716. struct GNUNET_HashCode hash;
  717. size_t name_len;
  718. uint16_t msize;
  719. if (NULL == GST_context)
  720. {
  721. GNUNET_break_op (0);
  722. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  723. return;
  724. }
  725. if (client != GST_context->client)
  726. {
  727. GNUNET_break_op (0);
  728. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  729. return;
  730. }
  731. msize = ntohs (message->size);
  732. if (msize <= sizeof (struct GNUNET_TESTBED_BarrierCancel))
  733. {
  734. GNUNET_break_op (0);
  735. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  736. return;
  737. }
  738. msg = (const struct GNUNET_TESTBED_BarrierCancel *) message;
  739. name_len = msize - sizeof (struct GNUNET_TESTBED_BarrierCancel);
  740. name = GNUNET_malloc (name_len + 1);
  741. (void) memcpy (name, msg->name, name_len);
  742. GNUNET_CRYPTO_hash (name, name_len, &hash);
  743. if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_contains (barrier_map, &hash))
  744. {
  745. GNUNET_break_op (0);
  746. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  747. return;
  748. }
  749. barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &hash);
  750. GNUNET_assert (NULL != barrier);
  751. cancel_wrappers (barrier);
  752. remove_barrier (barrier);
  753. GNUNET_SERVER_receive_done (client, GNUNET_OK);
  754. }
  755. /**
  756. * Message handler for GNUNET_MESSAGE_TYPE_TESTBED_BARRIER_STATUS messages.
  757. * This handler is queued in the main service and will handle the messages sent
  758. * either from the testbed driver or from a high level controller
  759. *
  760. * @param cls NULL
  761. * @param client identification of the client
  762. * @param message the actual message
  763. */
  764. void
  765. GST_handle_barrier_status (void *cls, struct GNUNET_SERVER_Client *client,
  766. const struct GNUNET_MessageHeader *message)
  767. {
  768. const struct GNUNET_TESTBED_BarrierStatusMsg *msg;
  769. struct Barrier *barrier;
  770. struct ClientCtx *client_ctx;
  771. const char *name;
  772. struct GNUNET_HashCode key;
  773. enum GNUNET_TESTBED_BarrierStatus status;
  774. uint16_t msize;
  775. uint16_t name_len;
  776. if (NULL == GST_context)
  777. {
  778. GNUNET_break_op (0);
  779. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  780. return;
  781. }
  782. if (client != GST_context->client)
  783. {
  784. GNUNET_break_op (0);
  785. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  786. return;
  787. }
  788. msize = ntohs (message->size);
  789. if (msize <= sizeof (struct GNUNET_TESTBED_BarrierStatusMsg))
  790. {
  791. GNUNET_break_op (0);
  792. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  793. return;
  794. }
  795. msg = (const struct GNUNET_TESTBED_BarrierStatusMsg *) message;
  796. status = ntohs (msg->status);
  797. if (GNUNET_TESTBED_BARRIERSTATUS_CROSSED != status)
  798. {
  799. GNUNET_break_op (0); /* current we only expect BARRIER_CROSSED
  800. status message this way */
  801. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  802. return;
  803. }
  804. name = msg->data;
  805. name_len = ntohs (msg->name_len);
  806. if ((sizeof (struct GNUNET_TESTBED_BarrierStatusMsg) + name_len + 1) != msize)
  807. {
  808. GNUNET_break_op (0);
  809. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  810. return;
  811. }
  812. if ('\0' != name[name_len])
  813. {
  814. GNUNET_break_op (0);
  815. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  816. return;
  817. }
  818. GNUNET_CRYPTO_hash (name, name_len, &key);
  819. barrier = GNUNET_CONTAINER_multihashmap_get (barrier_map, &key);
  820. if (NULL == barrier)
  821. {
  822. GNUNET_break_op (0);
  823. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  824. return;
  825. }
  826. GNUNET_SERVER_receive_done (client, GNUNET_OK);
  827. while (NULL != (client_ctx = barrier->head)) /* Notify peers */
  828. {
  829. queue_message (client_ctx, GNUNET_copy_message (message));
  830. GNUNET_CONTAINER_DLL_remove (barrier->head, barrier->tail, client_ctx);
  831. }
  832. }
  833. /* end of gnunet-service-testbed_barriers.c */