transport_api_monitor_plugins.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463
  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2014, 2016 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file transport/transport_api_monitor_plugins.c
  18. * @brief montoring api for transport plugin session status
  19. * @author Christian Grothoff
  20. */
  21. #include "platform.h"
  22. #include "gnunet_util_lib.h"
  23. #include "gnunet_arm_service.h"
  24. #include "gnunet_hello_lib.h"
  25. #include "gnunet_protocols.h"
  26. #include "gnunet_transport_service.h"
  27. #include "transport.h"
  28. /**
  29. * Handle for a plugin session state monitor.
  30. */
  31. struct GNUNET_TRANSPORT_PluginMonitor
  32. {
  33. /**
  34. * Connection to the service.
  35. */
  36. struct GNUNET_MQ_Handle *mq;
  37. /**
  38. * Our configuration.
  39. */
  40. const struct GNUNET_CONFIGURATION_Handle *cfg;
  41. /**
  42. * Callback to call.
  43. */
  44. GNUNET_TRANSPORT_SessionMonitorCallback cb;
  45. /**
  46. * Closure for @e cb
  47. */
  48. void *cb_cls;
  49. /**
  50. * Map of session_ids (reduced to 32-bits) to
  51. * `struct GNUNET_TRANSPORT_PluginSession` objects.
  52. */
  53. struct GNUNET_CONTAINER_MultiHashMap32 *sessions;
  54. /**
  55. * Backoff for reconnect.
  56. */
  57. struct GNUNET_TIME_Relative backoff;
  58. /**
  59. * Task ID for reconnect.
  60. */
  61. struct GNUNET_SCHEDULER_Task *reconnect_task;
  62. };
  63. /**
  64. * Abstract representation of a plugin's session.
  65. * Corresponds to the `struct GNUNET_ATS_Session` within the TRANSPORT service.
  66. */
  67. struct GNUNET_TRANSPORT_PluginSession
  68. {
  69. /**
  70. * Unique session identifier.
  71. */
  72. uint64_t session_id;
  73. /**
  74. * Location for the client to store "data".
  75. */
  76. void *client_ctx;
  77. };
  78. /**
  79. * Task run to re-establish the connection.
  80. *
  81. * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
  82. */
  83. static void
  84. do_plugin_connect (void *cls);
  85. /**
  86. * Free the session entry and notify the callback about its demise.
  87. *
  88. * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor`
  89. * @param key key of the session in the map
  90. * @param value the session to free
  91. * @return #GNUNET_OK (continue to iterate)
  92. */
  93. static int
  94. free_entry (void *cls,
  95. uint32_t key,
  96. void *value)
  97. {
  98. struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
  99. struct GNUNET_TRANSPORT_PluginSession *ps = value;
  100. pm->cb (pm->cb_cls,
  101. ps,
  102. &ps->client_ctx,
  103. NULL);
  104. GNUNET_break (GNUNET_YES ==
  105. GNUNET_CONTAINER_multihashmap32_remove (pm->sessions,
  106. key,
  107. ps));
  108. GNUNET_break (NULL == ps->client_ctx);
  109. GNUNET_free (ps);
  110. return GNUNET_OK;
  111. }
  112. /**
  113. * Cut the existing connection and reconnect.
  114. *
  115. * @param pm our context
  116. */
  117. static void
  118. reconnect_plugin_ctx (struct GNUNET_TRANSPORT_PluginMonitor *pm)
  119. {
  120. GNUNET_MQ_destroy (pm->mq);
  121. pm->mq = NULL;
  122. GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions,
  123. &free_entry,
  124. pm);
  125. pm->backoff = GNUNET_TIME_STD_BACKOFF (pm->backoff);
  126. pm->reconnect_task = GNUNET_SCHEDULER_add_delayed (pm->backoff,
  127. &do_plugin_connect,
  128. pm);
  129. }
  130. /**
  131. * Convert 64-bit session ID to 32-bit index for hash map.
  132. *
  133. * @param id 64-bit session ID
  134. * @return 32-bit hash map index
  135. */
  136. static uint32_t
  137. wrap_id (uint64_t id)
  138. {
  139. return ((uint32_t) id) ^ ((uint32_t) (id >> 32));
  140. }
  141. /**
  142. * Context for #locate_by_id().
  143. */
  144. struct SearchContext
  145. {
  146. /**
  147. * Result.
  148. */
  149. struct GNUNET_TRANSPORT_PluginSession *ps;
  150. /**
  151. * ID to locate.
  152. */
  153. uint64_t session_id;
  154. };
  155. /**
  156. * Locate a session entry.
  157. *
  158. * @param cls our `struct SearchContext`
  159. * @param key key of the session in the map
  160. * @param value a session
  161. * @return #GNUNET_OK (continue to iterate), or #GNUNET_SYSERR (match found)
  162. */
  163. static int
  164. locate_by_id (void *cls,
  165. uint32_t key,
  166. void *value)
  167. {
  168. struct SearchContext *sc = cls;
  169. struct GNUNET_TRANSPORT_PluginSession *ps = value;
  170. if (sc->session_id == ps->session_id)
  171. {
  172. sc->ps = ps;
  173. return GNUNET_SYSERR;
  174. }
  175. return GNUNET_OK;
  176. }
  177. /**
  178. * Function called with responses from the service.
  179. *
  180. * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
  181. * @paramm tpmm message with event data
  182. * @return #GNUNET_Ok if message is well-formed
  183. */
  184. static int
  185. check_event (void *cls,
  186. const struct TransportPluginMonitorMessage *tpmm)
  187. {
  188. const char *pname;
  189. size_t pname_len;
  190. size_t paddr_len;
  191. pname = (const char *) &tpmm[1];
  192. pname_len = ntohs (tpmm->plugin_name_len);
  193. paddr_len = ntohs (tpmm->plugin_address_len);
  194. if ((pname_len
  195. + paddr_len
  196. + sizeof(struct TransportPluginMonitorMessage) != ntohs (
  197. tpmm->header.size)) ||
  198. ((0 != pname_len) &&
  199. ('\0' != pname[pname_len - 1])))
  200. {
  201. GNUNET_break (0);
  202. return GNUNET_SYSERR;
  203. }
  204. return GNUNET_OK;
  205. }
  206. /**
  207. * Function called with responses from the service.
  208. *
  209. * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
  210. * @paramm tpmm message with event data
  211. */
  212. static void
  213. handle_event (void *cls,
  214. const struct TransportPluginMonitorMessage *tpmm)
  215. {
  216. struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
  217. struct GNUNET_TRANSPORT_PluginSession *ps;
  218. const char *pname;
  219. const void *paddr;
  220. enum GNUNET_TRANSPORT_SessionState ss;
  221. size_t pname_len;
  222. size_t paddr_len;
  223. struct GNUNET_TRANSPORT_SessionInfo info;
  224. struct GNUNET_HELLO_Address addr;
  225. struct SearchContext rv;
  226. pname = (const char *) &tpmm[1];
  227. pname_len = ntohs (tpmm->plugin_name_len);
  228. paddr_len = ntohs (tpmm->plugin_address_len);
  229. paddr = &pname[pname_len];
  230. ps = NULL;
  231. ss = (enum GNUNET_TRANSPORT_SessionState) ntohs (tpmm->session_state);
  232. if (GNUNET_TRANSPORT_SS_INIT == ss)
  233. {
  234. ps = GNUNET_new (struct GNUNET_TRANSPORT_PluginSession);
  235. ps->session_id = tpmm->session_id;
  236. (void) GNUNET_CONTAINER_multihashmap32_put (pm->sessions,
  237. wrap_id (tpmm->session_id),
  238. ps,
  239. GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
  240. }
  241. else
  242. {
  243. rv.session_id = tpmm->session_id;
  244. rv.ps = NULL;
  245. (void) GNUNET_CONTAINER_multihashmap32_get_multiple (pm->sessions,
  246. wrap_id (
  247. tpmm->session_id),
  248. &locate_by_id,
  249. &rv);
  250. ps = rv.ps;
  251. if (NULL == ps)
  252. {
  253. GNUNET_break (0);
  254. reconnect_plugin_ctx (pm);
  255. return;
  256. }
  257. }
  258. info.state = ss;
  259. info.is_inbound = (int16_t) ntohs (tpmm->is_inbound);
  260. info.num_msg_pending = ntohl (tpmm->msgs_pending);
  261. info.num_bytes_pending = ntohl (tpmm->bytes_pending);
  262. info.receive_delay = GNUNET_TIME_absolute_ntoh (tpmm->delay);
  263. info.session_timeout = GNUNET_TIME_absolute_ntoh (tpmm->timeout);
  264. info.address = &addr;
  265. addr.peer = tpmm->peer;
  266. addr.address = (0 == paddr_len) ? NULL : paddr;
  267. addr.address_length = paddr_len;
  268. addr.transport_name = (0 == pname_len) ? NULL : pname;
  269. addr.local_info = GNUNET_HELLO_ADDRESS_INFO_NONE;
  270. pm->cb (pm->cb_cls,
  271. ps,
  272. &ps->client_ctx,
  273. &info);
  274. if (GNUNET_TRANSPORT_SS_DONE == ss)
  275. {
  276. GNUNET_break (NULL == ps->client_ctx);
  277. GNUNET_assert (GNUNET_YES ==
  278. GNUNET_CONTAINER_multihashmap32_remove (pm->sessions,
  279. wrap_id (
  280. tpmm->session_id),
  281. ps));
  282. GNUNET_free (ps);
  283. }
  284. }
  285. /**
  286. * Function called with sync responses from the service.
  287. *
  288. * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
  289. * @param msg message from the service
  290. */
  291. static void
  292. handle_sync (void *cls,
  293. const struct GNUNET_MessageHeader *msg)
  294. {
  295. struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
  296. /* we are in sync, notify callback */
  297. pm->cb (pm->cb_cls,
  298. NULL,
  299. NULL,
  300. NULL);
  301. }
  302. /**
  303. * Generic error handler, called with the appropriate
  304. * error code and the same closure specified at the creation of
  305. * the message queue.
  306. * Not every message queue implementation supports an error handler.
  307. *
  308. * @param cls closure with the `struct GNUNET_NSE_Handle *`
  309. * @param error error code
  310. */
  311. static void
  312. mq_error_handler (void *cls,
  313. enum GNUNET_MQ_Error error)
  314. {
  315. struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
  316. reconnect_plugin_ctx (pm);
  317. }
  318. /**
  319. * Task run to re-establish the connection.
  320. *
  321. * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
  322. */
  323. static void
  324. do_plugin_connect (void *cls)
  325. {
  326. struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
  327. struct GNUNET_MQ_MessageHandler handlers[] = {
  328. GNUNET_MQ_hd_var_size (event,
  329. GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_EVENT,
  330. struct TransportPluginMonitorMessage,
  331. pm),
  332. GNUNET_MQ_hd_fixed_size (sync,
  333. GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_SYNC,
  334. struct GNUNET_MessageHeader,
  335. pm),
  336. GNUNET_MQ_handler_end ()
  337. };
  338. struct GNUNET_MessageHeader *msg;
  339. struct GNUNET_MQ_Envelope *env;
  340. pm->reconnect_task = NULL;
  341. pm->mq = GNUNET_CLIENT_connect (pm->cfg,
  342. "transport",
  343. handlers,
  344. &mq_error_handler,
  345. pm);
  346. if (NULL == pm->mq)
  347. return;
  348. env = GNUNET_MQ_msg (msg,
  349. GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_START);
  350. GNUNET_MQ_send (pm->mq,
  351. env);
  352. }
  353. /**
  354. * Install a plugin session state monitor callback. The callback
  355. * will be notified whenever the session changes.
  356. *
  357. * @param cfg configuration to use
  358. * @param cb callback to invoke on events
  359. * @param cb_cls closure for @a cb
  360. * @return NULL on error, otherwise handle for cancellation
  361. */
  362. struct GNUNET_TRANSPORT_PluginMonitor *
  363. GNUNET_TRANSPORT_monitor_plugins (const struct GNUNET_CONFIGURATION_Handle *cfg,
  364. GNUNET_TRANSPORT_SessionMonitorCallback cb,
  365. void *cb_cls)
  366. {
  367. struct GNUNET_TRANSPORT_PluginMonitor *pm;
  368. pm = GNUNET_new (struct GNUNET_TRANSPORT_PluginMonitor);
  369. pm->cb = cb;
  370. pm->cb_cls = cb_cls;
  371. pm->cfg = cfg;
  372. do_plugin_connect (pm);
  373. if (NULL == pm->mq)
  374. {
  375. GNUNET_free (pm);
  376. return NULL;
  377. }
  378. pm->sessions = GNUNET_CONTAINER_multihashmap32_create (128);
  379. return pm;
  380. }
  381. /**
  382. * Cancel monitoring the plugin session state. The callback will
  383. * be called once for each session that is up with the information
  384. * #GNUNET_TRANSPORT_SS_FINI (even though the session may stay up;
  385. * this is just to enable client-side cleanup).
  386. *
  387. * @param pm handle of the request that is to be cancelled
  388. */
  389. void
  390. GNUNET_TRANSPORT_monitor_plugins_cancel (struct
  391. GNUNET_TRANSPORT_PluginMonitor *pm)
  392. {
  393. if (NULL != pm->mq)
  394. {
  395. GNUNET_MQ_destroy (pm->mq);
  396. pm->mq = NULL;
  397. }
  398. if (NULL != pm->reconnect_task)
  399. {
  400. GNUNET_SCHEDULER_cancel (pm->reconnect_task);
  401. pm->reconnect_task = NULL;
  402. }
  403. GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions,
  404. &free_entry,
  405. pm);
  406. GNUNET_CONTAINER_multihashmap32_destroy (pm->sessions);
  407. GNUNET_free (pm);
  408. }
  409. /* end of transport_api_monitor_plugins.c */