ats_api2_transport.c 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686
  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2010-2015, 2018 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file ats/ats_api2_transport.c
  18. * @brief address suggestions and bandwidth allocation
  19. * @author Christian Grothoff
  20. * @author Matthias Wachs
  21. */
  22. #include "platform.h"
  23. #include "gnunet_ats_transport_service.h"
  24. #include "ats2.h"
  25. #define LOG(kind, ...) GNUNET_log_from(kind, "ats-transport-api", __VA_ARGS__)
  26. /**
  27. * Information we track per session, incoming or outgoing. It also
  28. * doesn't matter if we have a session, any session that ATS is
  29. * allowed to suggest right now should be tracked.
  30. */
  31. struct GNUNET_ATS_SessionRecord {
  32. /**
  33. * Transport handle this session record belongs to.
  34. */
  35. struct GNUNET_ATS_TransportHandle *ath;
  36. /**
  37. * Address data.
  38. */
  39. const char *address;
  40. /**
  41. * Session handle, NULL if inbound-only (also implies we cannot
  42. * actually control inbound traffic via transport!). So if
  43. * @e session is NULL, the @e properties are informative for
  44. * ATS (connection exists, utilization) but ATS cannot directly
  45. * influence it (and should thus not call the
  46. * #GNUNET_ATS_AllocationCallback for this @e session, which is
  47. * obvious as NULL is not a meaningful session to allocation
  48. * resources to).
  49. */
  50. struct GNUNET_ATS_Session *session;
  51. /**
  52. * Identity of the peer reached at @e address.
  53. */
  54. struct GNUNET_PeerIdentity pid;
  55. /**
  56. * Performance data about the @e session.
  57. */
  58. struct GNUNET_ATS_Properties properties;
  59. /**
  60. * Unique ID to identify this session at this @a pid in IPC
  61. * messages.
  62. */
  63. uint32_t slot;
  64. };
  65. /**
  66. * Handle to the ATS subsystem for bandwidth/transport transport information.
  67. */
  68. struct GNUNET_ATS_TransportHandle {
  69. /**
  70. * Our configuration.
  71. */
  72. const struct GNUNET_CONFIGURATION_Handle *cfg;
  73. /**
  74. * Callback to invoke on suggestions.
  75. */
  76. GNUNET_ATS_SuggestionCallback suggest_cb;
  77. /**
  78. * Closure for @e suggest_cb.
  79. */
  80. void *suggest_cb_cls;
  81. /**
  82. * Callback to invoke on allocations.
  83. */
  84. GNUNET_ATS_AllocationCallback alloc_cb;
  85. /**
  86. * Closure for @e alloc_cb.
  87. */
  88. void *alloc_cb_cls;
  89. /**
  90. * Message queue for sending requests to the ATS service.
  91. */
  92. struct GNUNET_MQ_Handle *mq;
  93. /**
  94. * Task to trigger reconnect.
  95. */
  96. struct GNUNET_SCHEDULER_Task *task;
  97. /**
  98. * Hash map mapping PIDs to session records.
  99. */
  100. struct GNUNET_CONTAINER_MultiPeerMap *records;
  101. /**
  102. * Reconnect backoff delay.
  103. */
  104. struct GNUNET_TIME_Relative backoff;
  105. };
  106. /**
  107. * Convert ATS properties from host to network byte order.
  108. *
  109. * @param nbo[OUT] value written
  110. * @param hbo value read
  111. */
  112. static void
  113. properties_hton(struct PropertiesNBO *nbo,
  114. const struct GNUNET_ATS_Properties *hbo)
  115. {
  116. nbo->delay = GNUNET_TIME_relative_hton(hbo->delay);
  117. nbo->goodput_out = htonl(hbo->goodput_out);
  118. nbo->goodput_in = htonl(hbo->goodput_in);
  119. nbo->utilization_out = htonl(hbo->utilization_out);
  120. nbo->utilization_in = htonl(hbo->utilization_in);
  121. nbo->distance = htonl(hbo->distance);
  122. nbo->mtu = htonl(hbo->mtu);
  123. nbo->nt = htonl((uint32_t)hbo->nt);
  124. nbo->cc = htonl((uint32_t)hbo->cc);
  125. }
  126. /**
  127. * Re-establish the connection to the ATS service.
  128. *
  129. * @param sh handle to use to re-connect.
  130. */
  131. static void
  132. reconnect(struct GNUNET_ATS_TransportHandle *ath);
  133. /**
  134. * Re-establish the connection to the ATS service.
  135. *
  136. * @param cls handle to use to re-connect.
  137. */
  138. static void
  139. reconnect_task(void *cls)
  140. {
  141. struct GNUNET_ATS_TransportHandle *ath = cls;
  142. ath->task = NULL;
  143. reconnect(ath);
  144. }
  145. /**
  146. * Disconnect from ATS and then reconnect.
  147. *
  148. * @param ath our handle
  149. */
  150. static void
  151. force_reconnect(struct GNUNET_ATS_TransportHandle *ath)
  152. {
  153. if (NULL != ath->mq)
  154. {
  155. GNUNET_MQ_destroy(ath->mq);
  156. ath->mq = NULL;
  157. }
  158. /* FIXME: do we tell transport service about disconnect events? CON:
  159. initially ATS will have a really screwed picture of the world and
  160. the rapid change would be bad. PRO: if we don't, ATS and
  161. transport may disagree about the allocation for a while...
  162. For now: lazy: do nothing. */
  163. ath->backoff = GNUNET_TIME_STD_BACKOFF(ath->backoff);
  164. ath->task = GNUNET_SCHEDULER_add_delayed(ath->backoff,
  165. &reconnect_task,
  166. ath);
  167. }
  168. /**
  169. * Check format of address suggestion message from the service.
  170. *
  171. * @param cls the `struct GNUNET_ATS_TransportHandle`
  172. * @param m message received
  173. */
  174. static int
  175. check_ats_address_suggestion(void *cls,
  176. const struct AddressSuggestionMessage *m)
  177. {
  178. (void)cls;
  179. GNUNET_MQ_check_zero_termination(m);
  180. return GNUNET_SYSERR;
  181. }
  182. /**
  183. * We received an address suggestion message from the service.
  184. *
  185. * @param cls the `struct GNUNET_ATS_TransportHandle`
  186. * @param m message received
  187. */
  188. static void
  189. handle_ats_address_suggestion(void *cls,
  190. const struct AddressSuggestionMessage *m)
  191. {
  192. struct GNUNET_ATS_TransportHandle *ath = cls;
  193. const char *address = (const char *)&m[1];
  194. ath->suggest_cb(ath->suggest_cb_cls,
  195. &m->peer,
  196. address);
  197. }
  198. /**
  199. * Closure for #match_session_cb.
  200. */
  201. struct FindContext {
  202. /**
  203. * Key to look for.
  204. */
  205. uint32_t session_id;
  206. /**
  207. * Where to store the result.
  208. */
  209. struct GNUNET_ATS_SessionRecord *sr;
  210. };
  211. /**
  212. * Finds matching session record.
  213. *
  214. * @param cls a `struct FindContext`
  215. * @param pid peer identity (unused)
  216. * @param value a `struct GNUNET_ATS_SessionRecord`
  217. * @return #GNUNET_NO if match found, #GNUNET_YES to continue searching
  218. */
  219. static int
  220. match_session_cb(void *cls,
  221. const struct GNUNET_PeerIdentity *pid,
  222. void *value)
  223. {
  224. struct FindContext *fc = cls;
  225. struct GNUNET_ATS_SessionRecord *sr = value;
  226. (void)pid;
  227. if (fc->session_id == sr->slot)
  228. {
  229. fc->sr = sr;
  230. return GNUNET_NO;
  231. }
  232. return GNUNET_YES;
  233. }
  234. /**
  235. * Find session record for peer @a pid and session @a session_id
  236. *
  237. * @param ath transport handle to search
  238. * @param session_id session ID to match
  239. * @param pid peer to search under
  240. * @return NULL if no such record exists
  241. */
  242. static struct GNUNET_ATS_SessionRecord *
  243. find_session(struct GNUNET_ATS_TransportHandle *ath,
  244. uint32_t session_id,
  245. const struct GNUNET_PeerIdentity *pid)
  246. {
  247. struct FindContext fc = {
  248. .session_id = session_id,
  249. .sr = NULL
  250. };
  251. GNUNET_CONTAINER_multipeermap_get_multiple(ath->records,
  252. pid,
  253. &match_session_cb,
  254. &fc);
  255. return fc.sr;
  256. }
  257. /**
  258. * We received a session allocation message from the service.
  259. *
  260. * @param cls the `struct GNUNET_ATS_TransportHandle`
  261. * @param m message received
  262. */
  263. static void
  264. handle_ats_session_allocation(void *cls,
  265. const struct SessionAllocationMessage *m)
  266. {
  267. struct GNUNET_ATS_TransportHandle *ath = cls;
  268. struct GNUNET_ATS_SessionRecord *ar;
  269. uint32_t session_id;
  270. session_id = ntohl(m->session_id);
  271. ar = find_session(ath,
  272. session_id,
  273. &m->peer);
  274. if (NULL == ar)
  275. {
  276. /* this can (rarely) happen if ATS changes an sessiones allocation
  277. just when the transport service deleted it */
  278. GNUNET_log(GNUNET_ERROR_TYPE_INFO,
  279. "Allocation ignored, session unknown\n");
  280. return;
  281. }
  282. ath->backoff = GNUNET_TIME_UNIT_ZERO;
  283. LOG(GNUNET_ERROR_TYPE_DEBUG,
  284. "ATS allocates bandwidth for peer `%s' using address %s\n",
  285. GNUNET_i2s(&ar->pid),
  286. ar->address);
  287. ath->alloc_cb(ath->alloc_cb_cls,
  288. ar->session,
  289. m->bandwidth_out,
  290. m->bandwidth_in);
  291. }
  292. /**
  293. * We encountered an error handling the MQ to the ATS service.
  294. * Reconnect.
  295. *
  296. * @param cls the `struct GNUNET_ATS_TransportHandle`
  297. * @param error details about the error
  298. */
  299. static void
  300. error_handler(void *cls,
  301. enum GNUNET_MQ_Error error)
  302. {
  303. struct GNUNET_ATS_TransportHandle *ath = cls;
  304. LOG(GNUNET_ERROR_TYPE_DEBUG,
  305. "ATS connection died (code %d), reconnecting\n",
  306. (int)error);
  307. force_reconnect(ath);
  308. }
  309. /**
  310. * Generate and transmit the `struct SessionAddMessage` for the given
  311. * session record.
  312. *
  313. * @param ar the session to inform the ATS service about
  314. */
  315. static void
  316. send_add_session_message(const struct GNUNET_ATS_SessionRecord *ar)
  317. {
  318. struct GNUNET_ATS_TransportHandle *ath = ar->ath;
  319. struct GNUNET_MQ_Envelope *ev;
  320. struct SessionAddMessage *m;
  321. size_t alen;
  322. if (NULL == ath->mq)
  323. return; /* disconnected, skip for now */
  324. alen = strlen(ar->address) + 1;
  325. ev = GNUNET_MQ_msg_extra(m,
  326. alen,
  327. (NULL == ar->session)
  328. ? GNUNET_MESSAGE_TYPE_ATS_SESSION_ADD_INBOUND_ONLY
  329. : GNUNET_MESSAGE_TYPE_ATS_SESSION_ADD);
  330. m->peer = ar->pid;
  331. m->session_id = htonl(ar->slot);
  332. properties_hton(&m->properties,
  333. &ar->properties);
  334. GNUNET_memcpy(&m[1],
  335. ar->address,
  336. alen);
  337. LOG(GNUNET_ERROR_TYPE_DEBUG,
  338. "Adding address `%s' for peer `%s'\n",
  339. ar->address,
  340. GNUNET_i2s(&ar->pid));
  341. GNUNET_MQ_send(ath->mq,
  342. ev);
  343. }
  344. /**
  345. * Send ATS information about the session record.
  346. *
  347. * @param cls our `struct GNUNET_ATS_TransportHandle *`, unused
  348. * @param pid unused
  349. * @param value the `struct GNUNET_ATS_SessionRecord *` to add
  350. * @return #GNUNET_OK
  351. */
  352. static int
  353. send_add_session_cb(void *cls,
  354. const struct GNUNET_PeerIdentity *pid,
  355. void *value)
  356. {
  357. struct GNUNET_ATS_SessionRecord *ar = value;
  358. (void)cls;
  359. (void)pid;
  360. send_add_session_message(ar);
  361. return GNUNET_OK;
  362. }
  363. /**
  364. * Re-establish the connection to the ATS service.
  365. *
  366. * @param ath handle to use to re-connect.
  367. */
  368. static void
  369. reconnect(struct GNUNET_ATS_TransportHandle *ath)
  370. {
  371. struct GNUNET_MQ_MessageHandler handlers[] = {
  372. GNUNET_MQ_hd_var_size(ats_address_suggestion,
  373. GNUNET_MESSAGE_TYPE_ATS_ADDRESS_SUGGESTION,
  374. struct AddressSuggestionMessage,
  375. ath),
  376. GNUNET_MQ_hd_fixed_size(ats_session_allocation,
  377. GNUNET_MESSAGE_TYPE_ATS_SESSION_ALLOCATION,
  378. struct SessionAllocationMessage,
  379. ath),
  380. GNUNET_MQ_handler_end()
  381. };
  382. struct GNUNET_MQ_Envelope *ev;
  383. struct GNUNET_MessageHeader *init;
  384. GNUNET_assert(NULL == ath->mq);
  385. ath->mq = GNUNET_CLIENT_connect(ath->cfg,
  386. "ats",
  387. handlers,
  388. &error_handler,
  389. ath);
  390. if (NULL == ath->mq)
  391. {
  392. GNUNET_break(0);
  393. force_reconnect(ath);
  394. return;
  395. }
  396. ev = GNUNET_MQ_msg(init,
  397. GNUNET_MESSAGE_TYPE_ATS_START);
  398. GNUNET_MQ_send(ath->mq,
  399. ev);
  400. if (NULL == ath->mq)
  401. return;
  402. GNUNET_CONTAINER_multipeermap_iterate(ath->records,
  403. &send_add_session_cb,
  404. ath);
  405. }
  406. /**
  407. * Initialize the ATS subsystem.
  408. *
  409. * @param cfg configuration to use
  410. * @param alloc_cb notification to call whenever the allocation changed
  411. * @param alloc_cb_cls closure for @a alloc_cb
  412. * @param suggest_cb notification to call whenever the suggestation is made
  413. * @param suggest_cb_cls closure for @a suggest_cb
  414. * @return ats context
  415. */
  416. struct GNUNET_ATS_TransportHandle *
  417. GNUNET_ATS_transport_init(const struct GNUNET_CONFIGURATION_Handle *cfg,
  418. GNUNET_ATS_AllocationCallback alloc_cb,
  419. void *alloc_cb_cls,
  420. GNUNET_ATS_SuggestionCallback suggest_cb,
  421. void *suggest_cb_cls)
  422. {
  423. struct GNUNET_ATS_TransportHandle *ath;
  424. ath = GNUNET_new(struct GNUNET_ATS_TransportHandle);
  425. ath->cfg = cfg;
  426. ath->suggest_cb = suggest_cb;
  427. ath->suggest_cb_cls = suggest_cb_cls;
  428. ath->alloc_cb = alloc_cb;
  429. ath->alloc_cb_cls = alloc_cb_cls;
  430. ath->records = GNUNET_CONTAINER_multipeermap_create(128,
  431. GNUNET_YES);
  432. reconnect(ath);
  433. return ath;
  434. }
  435. /**
  436. * Release memory associated with the session record.
  437. *
  438. * @param cls NULL
  439. * @param pid unused
  440. * @param value a `struct GNUNET_ATS_SessionRecord`
  441. * @return #GNUNET_OK
  442. */
  443. static int
  444. free_record(void *cls,
  445. const struct GNUNET_PeerIdentity *pid,
  446. void *value)
  447. {
  448. struct GNUNET_ATS_SessionRecord *ar = value;
  449. (void)cls;
  450. (void)pid;
  451. GNUNET_free(ar);
  452. return GNUNET_OK;
  453. }
  454. /**
  455. * Client is done with ATS transport, release resources.
  456. *
  457. * @param ath handle to release
  458. */
  459. void
  460. GNUNET_ATS_transport_done(struct GNUNET_ATS_TransportHandle *ath)
  461. {
  462. if (NULL != ath->mq)
  463. {
  464. GNUNET_MQ_destroy(ath->mq);
  465. ath->mq = NULL;
  466. }
  467. if (NULL != ath->task)
  468. {
  469. GNUNET_SCHEDULER_cancel(ath->task);
  470. ath->task = NULL;
  471. }
  472. GNUNET_CONTAINER_multipeermap_iterate(ath->records,
  473. &free_record,
  474. NULL);
  475. GNUNET_CONTAINER_multipeermap_destroy(ath->records);
  476. GNUNET_free(ath);
  477. }
  478. /**
  479. * We have a new session ATS should know. Sessiones have to be added
  480. * with this function before they can be: updated, set in use and
  481. * destroyed.
  482. *
  483. * @param ath handle
  484. * @param pid peer we connected to
  485. * @param address the address (human readable version)
  486. * @param session transport-internal handle for the session/queue, NULL if
  487. * the session is inbound-only
  488. * @param prop performance data for the session
  489. * @return handle to the session representation inside ATS, NULL
  490. * on error (i.e. ATS knows this exact session already)
  491. */
  492. struct GNUNET_ATS_SessionRecord *
  493. GNUNET_ATS_session_add(struct GNUNET_ATS_TransportHandle *ath,
  494. const struct GNUNET_PeerIdentity *pid,
  495. const char *address,
  496. struct GNUNET_ATS_Session *session,
  497. const struct GNUNET_ATS_Properties *prop)
  498. {
  499. struct GNUNET_ATS_SessionRecord *ar;
  500. uint32_t s;
  501. size_t alen;
  502. if (NULL == address)
  503. {
  504. /* we need a valid address */
  505. GNUNET_break(0);
  506. return NULL;
  507. }
  508. alen = strlen(address) + 1;
  509. if ((alen + sizeof(struct SessionAddMessage) >= GNUNET_MAX_MESSAGE_SIZE) ||
  510. (alen >= GNUNET_MAX_MESSAGE_SIZE))
  511. {
  512. /* address too large for us, this should not happen */
  513. GNUNET_break(0);
  514. return NULL;
  515. }
  516. /* Spin 's' until we find an unused session ID for this pid */
  517. for (s = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK,
  518. UINT32_MAX);
  519. NULL != find_session(ath,
  520. s,
  521. pid);
  522. s++)
  523. ;
  524. alen = strlen(address) + 1;
  525. ar = GNUNET_malloc(sizeof(struct GNUNET_ATS_SessionRecord) + alen);
  526. ar->ath = ath;
  527. ar->slot = s;
  528. ar->session = session;
  529. ar->address = (const char *)&ar[1];
  530. ar->pid = *pid;
  531. ar->properties = *prop;
  532. memcpy(&ar[1],
  533. address,
  534. alen);
  535. (void)GNUNET_CONTAINER_multipeermap_put(ath->records,
  536. &ar->pid,
  537. ar,
  538. GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
  539. send_add_session_message(ar);
  540. return ar;
  541. }
  542. /**
  543. * We have updated performance statistics for a given session. Note
  544. * that this function can be called for sessiones that are currently
  545. * in use as well as sessiones that are valid but not actively in use.
  546. * Furthermore, the peer may not even be connected to us right now (in
  547. * which case the call may be ignored or the information may be stored
  548. * for later use). Update bandwidth assignments.
  549. *
  550. * @param ar session record to update information for
  551. * @param prop performance data for the session
  552. */
  553. void
  554. GNUNET_ATS_session_update(struct GNUNET_ATS_SessionRecord *ar,
  555. const struct GNUNET_ATS_Properties *prop)
  556. {
  557. struct GNUNET_ATS_TransportHandle *ath = ar->ath;
  558. struct GNUNET_MQ_Envelope *ev;
  559. struct SessionUpdateMessage *m;
  560. LOG(GNUNET_ERROR_TYPE_DEBUG,
  561. "Updating address `%s' for peer `%s'\n",
  562. ar->address,
  563. GNUNET_i2s(&ar->pid));
  564. ar->properties = *prop;
  565. if (NULL == ath->mq)
  566. return; /* disconnected, skip for now */
  567. ev = GNUNET_MQ_msg(m,
  568. GNUNET_MESSAGE_TYPE_ATS_SESSION_UPDATE);
  569. m->session_id = htonl(ar->slot);
  570. m->peer = ar->pid;
  571. properties_hton(&m->properties,
  572. &ar->properties);
  573. GNUNET_MQ_send(ath->mq,
  574. ev);
  575. }
  576. /**
  577. * A session was destroyed, ATS should now schedule and
  578. * allocate under the assumption that this @a ar is no
  579. * longer in use.
  580. *
  581. * @param ar session record to drop
  582. */
  583. void
  584. GNUNET_ATS_session_del(struct GNUNET_ATS_SessionRecord *ar)
  585. {
  586. struct GNUNET_ATS_TransportHandle *ath = ar->ath;
  587. struct GNUNET_MQ_Envelope *ev;
  588. struct SessionDelMessage *m;
  589. LOG(GNUNET_ERROR_TYPE_DEBUG,
  590. "Deleting address `%s' for peer `%s'\n",
  591. ar->address,
  592. GNUNET_i2s(&ar->pid));
  593. if (NULL == ath->mq)
  594. return;
  595. ev = GNUNET_MQ_msg(m,
  596. GNUNET_MESSAGE_TYPE_ATS_SESSION_DEL);
  597. m->session_id = htonl(ar->slot);
  598. m->peer = ar->pid;
  599. GNUNET_MQ_send(ath->mq,
  600. ev);
  601. }
  602. /* end of ats_api2_transport.c */