gnunet-service-core_sessions.c 30 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043
  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2009-2014, 2016 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file core/gnunet-service-core_sessions.c
  18. * @brief code for managing of 'encrypted' sessions (key exchange done)
  19. * @author Christian Grothoff
  20. */
  21. #include "platform.h"
  22. #include "gnunet-service-core.h"
  23. #include "gnunet-service-core_kx.h"
  24. #include "gnunet-service-core_typemap.h"
  25. #include "gnunet-service-core_sessions.h"
  26. #include "gnunet_constants.h"
  27. #include "core.h"
  28. /**
  29. * How many encrypted messages do we queue at most?
  30. * Needed to bound memory consumption.
  31. */
  32. #define MAX_ENCRYPTED_MESSAGE_QUEUE_SIZE 4
  33. /**
  34. * Message ready for encryption. This struct is followed by the
  35. * actual content of the message.
  36. */
  37. struct SessionMessageEntry
  38. {
  39. /**
  40. * We keep messages in a doubly linked list.
  41. */
  42. struct SessionMessageEntry *next;
  43. /**
  44. * We keep messages in a doubly linked list.
  45. */
  46. struct SessionMessageEntry *prev;
  47. /**
  48. * How important is this message.
  49. */
  50. enum GNUNET_MQ_PriorityPreferences priority;
  51. /**
  52. * Flag set to #GNUNET_YES if this is a typemap message.
  53. */
  54. int is_typemap;
  55. /**
  56. * Flag set to #GNUNET_YES if this is a typemap confirmation message.
  57. */
  58. int is_typemap_confirm;
  59. /**
  60. * Deadline for transmission, 1s after we received it (if we
  61. * are not corking), otherwise "now". Note that this message
  62. * does NOT expire past its deadline.
  63. */
  64. struct GNUNET_TIME_Absolute deadline;
  65. /**
  66. * How long is the message? (number of bytes following the `struct
  67. * MessageEntry`, but not including the size of `struct
  68. * MessageEntry` itself!)
  69. */
  70. size_t size;
  71. };
  72. /**
  73. * Data kept per session.
  74. */
  75. struct Session
  76. {
  77. /**
  78. * Identity of the other peer.
  79. */
  80. const struct GNUNET_PeerIdentity *peer;
  81. /**
  82. * Key exchange state for this peer.
  83. */
  84. struct GSC_KeyExchangeInfo *kx;
  85. /**
  86. * Head of list of requests from clients for transmission to
  87. * this peer.
  88. */
  89. struct GSC_ClientActiveRequest *active_client_request_head;
  90. /**
  91. * Tail of list of requests from clients for transmission to
  92. * this peer.
  93. */
  94. struct GSC_ClientActiveRequest *active_client_request_tail;
  95. /**
  96. * Head of list of messages ready for encryption.
  97. */
  98. struct SessionMessageEntry *sme_head;
  99. /**
  100. * Tail of list of messages ready for encryption.
  101. */
  102. struct SessionMessageEntry *sme_tail;
  103. /**
  104. * Current type map for this peer.
  105. */
  106. struct GSC_TypeMap *tmap;
  107. /**
  108. * Task to transmit corked messages with a delay.
  109. */
  110. struct GNUNET_SCHEDULER_Task *cork_task;
  111. /**
  112. * Task to transmit our type map.
  113. */
  114. struct GNUNET_SCHEDULER_Task *typemap_task;
  115. /**
  116. * Retransmission delay we currently use for the typemap
  117. * transmissions (if not confirmed).
  118. */
  119. struct GNUNET_TIME_Relative typemap_delay;
  120. /**
  121. * Is this the first time we're sending the typemap? If so,
  122. * we want to send it a bit faster the second time. 0 if
  123. * we are sending for the first time, 1 if not.
  124. */
  125. int first_typemap;
  126. };
  127. GNUNET_NETWORK_STRUCT_BEGIN
  128. /**
  129. * Message sent to confirm that a typemap was received.
  130. */
  131. struct TypeMapConfirmationMessage
  132. {
  133. /**
  134. * Header with type #GNUNET_MESSAGE_TYPE_CORE_CONFIRM_TYPE_MAP.
  135. */
  136. struct GNUNET_MessageHeader header;
  137. /**
  138. * Reserved, always zero.
  139. */
  140. uint32_t reserved GNUNET_PACKED;
  141. /**
  142. * Hash of the (decompressed) type map that was received.
  143. */
  144. struct GNUNET_HashCode tm_hash;
  145. };
  146. GNUNET_NETWORK_STRUCT_END
  147. /**
  148. * Map of peer identities to `struct Session`.
  149. */
  150. static struct GNUNET_CONTAINER_MultiPeerMap *sessions;
  151. /**
  152. * Find the session for the given peer.
  153. *
  154. * @param peer identity of the peer
  155. * @return NULL if we are not connected, otherwise the
  156. * session handle
  157. */
  158. static struct Session *
  159. find_session (const struct GNUNET_PeerIdentity *peer)
  160. {
  161. if (NULL == sessions)
  162. return NULL;
  163. return GNUNET_CONTAINER_multipeermap_get (sessions, peer);
  164. }
  165. /**
  166. * End the session with the given peer (we are no longer
  167. * connected).
  168. *
  169. * @param pid identity of peer to kill session with
  170. */
  171. void
  172. GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid)
  173. {
  174. struct Session *session;
  175. struct GSC_ClientActiveRequest *car;
  176. struct SessionMessageEntry *sme;
  177. session = find_session (pid);
  178. if (NULL == session)
  179. return;
  180. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  181. "Destroying session for peer `%s'\n",
  182. GNUNET_i2s (session->peer));
  183. if (NULL != session->cork_task)
  184. {
  185. GNUNET_SCHEDULER_cancel (session->cork_task);
  186. session->cork_task = NULL;
  187. }
  188. while (NULL != (car = session->active_client_request_head))
  189. {
  190. GNUNET_CONTAINER_DLL_remove (session->active_client_request_head,
  191. session->active_client_request_tail,
  192. car);
  193. GSC_CLIENTS_reject_request (car, GNUNET_NO);
  194. }
  195. while (NULL != (sme = session->sme_head))
  196. {
  197. GNUNET_CONTAINER_DLL_remove (session->sme_head, session->sme_tail, sme);
  198. GNUNET_free (sme);
  199. }
  200. if (NULL != session->typemap_task)
  201. {
  202. GNUNET_SCHEDULER_cancel (session->typemap_task);
  203. session->typemap_task = NULL;
  204. }
  205. GSC_CLIENTS_notify_clients_about_neighbour (session->peer,
  206. session->tmap,
  207. NULL);
  208. GNUNET_assert (
  209. GNUNET_YES ==
  210. GNUNET_CONTAINER_multipeermap_remove (sessions, session->peer, session));
  211. GNUNET_STATISTICS_set (GSC_stats,
  212. gettext_noop ("# peers connected"),
  213. GNUNET_CONTAINER_multipeermap_size (sessions),
  214. GNUNET_NO);
  215. GSC_TYPEMAP_destroy (session->tmap);
  216. session->tmap = NULL;
  217. GNUNET_free (session);
  218. }
  219. /**
  220. * Transmit our current typemap message to the other peer.
  221. * (Done periodically until the typemap is confirmed).
  222. *
  223. * @param cls the `struct Session *`
  224. */
  225. static void
  226. transmit_typemap_task (void *cls)
  227. {
  228. struct Session *session = cls;
  229. struct GNUNET_MessageHeader *hdr;
  230. struct GNUNET_TIME_Relative delay;
  231. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  232. "Sending TYPEMAP to %s\n",
  233. GNUNET_i2s (session->peer));
  234. session->typemap_delay = GNUNET_TIME_STD_BACKOFF (session->typemap_delay);
  235. delay = session->typemap_delay;
  236. /* randomize a bit to avoid spont. sync */
  237. delay.rel_value_us +=
  238. GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000 * 1000);
  239. session->typemap_task =
  240. GNUNET_SCHEDULER_add_delayed (delay, &transmit_typemap_task, session);
  241. GNUNET_STATISTICS_update (GSC_stats,
  242. gettext_noop ("# type map refreshes sent"),
  243. 1,
  244. GNUNET_NO);
  245. hdr = GSC_TYPEMAP_compute_type_map_message ();
  246. GSC_KX_encrypt_and_transmit (session->kx, hdr, ntohs (hdr->size));
  247. GNUNET_free (hdr);
  248. }
  249. /**
  250. * Restart the typemap task for the given session.
  251. *
  252. * @param session session to restart typemap transmission for
  253. */
  254. static void
  255. start_typemap_task (struct Session *session)
  256. {
  257. if (NULL != session->typemap_task)
  258. GNUNET_SCHEDULER_cancel (session->typemap_task);
  259. session->typemap_delay = GNUNET_TIME_UNIT_SECONDS;
  260. session->typemap_task = GNUNET_SCHEDULER_add_delayed (session->typemap_delay,
  261. &transmit_typemap_task,
  262. session);
  263. }
  264. /**
  265. * Create a session, a key exchange was just completed.
  266. *
  267. * @param peer peer that is now connected
  268. * @param kx key exchange that completed
  269. */
  270. void
  271. GSC_SESSIONS_create (const struct GNUNET_PeerIdentity *peer,
  272. struct GSC_KeyExchangeInfo *kx)
  273. {
  274. struct Session *session;
  275. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  276. "Creating session for peer `%s'\n",
  277. GNUNET_i2s (peer));
  278. session = GNUNET_new (struct Session);
  279. session->tmap = GSC_TYPEMAP_create ();
  280. session->peer = peer;
  281. session->kx = kx;
  282. GNUNET_assert (GNUNET_OK ==
  283. GNUNET_CONTAINER_multipeermap_put (
  284. sessions,
  285. session->peer,
  286. session,
  287. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
  288. GNUNET_STATISTICS_set (GSC_stats,
  289. gettext_noop ("# peers connected"),
  290. GNUNET_CONTAINER_multipeermap_size (sessions),
  291. GNUNET_NO);
  292. GSC_CLIENTS_notify_clients_about_neighbour (peer, NULL, session->tmap);
  293. start_typemap_task (session);
  294. }
  295. /**
  296. * The other peer has indicated that it 'lost' the session
  297. * (KX down), reinitialize the session on our end, in particular
  298. * this means to restart the typemap transmission.
  299. *
  300. * @param peer peer that is now connected
  301. */
  302. void
  303. GSC_SESSIONS_reinit (const struct GNUNET_PeerIdentity *peer)
  304. {
  305. struct Session *session;
  306. session = find_session (peer);
  307. if (NULL == session)
  308. {
  309. /* KX/session is new for both sides; thus no need to restart what
  310. has not yet begun */
  311. return;
  312. }
  313. start_typemap_task (session);
  314. }
  315. /**
  316. * The other peer has confirmed receiving our type map,
  317. * check if it is current and if so, stop retransmitting it.
  318. *
  319. * @param peer peer that confirmed the type map
  320. * @param msg confirmation message we received
  321. */
  322. void
  323. GSC_SESSIONS_confirm_typemap (const struct GNUNET_PeerIdentity *peer,
  324. const struct GNUNET_MessageHeader *msg)
  325. {
  326. const struct TypeMapConfirmationMessage *cmsg;
  327. struct Session *session;
  328. session = find_session (peer);
  329. if (NULL == session)
  330. {
  331. GNUNET_break (0);
  332. return;
  333. }
  334. if (ntohs (msg->size) != sizeof(struct TypeMapConfirmationMessage))
  335. {
  336. GNUNET_break_op (0);
  337. return;
  338. }
  339. cmsg = (const struct TypeMapConfirmationMessage *) msg;
  340. if (GNUNET_YES != GSC_TYPEMAP_check_hash (&cmsg->tm_hash))
  341. {
  342. /* our typemap has changed in the meantime, do not
  343. accept confirmation */
  344. GNUNET_STATISTICS_update (GSC_stats,
  345. gettext_noop (
  346. "# outdated typemap confirmations received"),
  347. 1,
  348. GNUNET_NO);
  349. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  350. "Got outdated typemap confirmated from peer `%s'\n",
  351. GNUNET_i2s (session->peer));
  352. return;
  353. }
  354. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  355. "Got typemap confirmation from peer `%s'\n",
  356. GNUNET_i2s (session->peer));
  357. if (NULL != session->typemap_task)
  358. {
  359. GNUNET_SCHEDULER_cancel (session->typemap_task);
  360. session->typemap_task = NULL;
  361. }
  362. GNUNET_STATISTICS_update (GSC_stats,
  363. gettext_noop (
  364. "# valid typemap confirmations received"),
  365. 1,
  366. GNUNET_NO);
  367. }
  368. /**
  369. * Notify the given client about the session (client is new).
  370. *
  371. * @param cls the `struct GSC_Client`
  372. * @param key peer identity
  373. * @param value the `struct Session`
  374. * @return #GNUNET_OK (continue to iterate)
  375. */
  376. static int
  377. notify_client_about_session (void *cls,
  378. const struct GNUNET_PeerIdentity *key,
  379. void *value)
  380. {
  381. struct GSC_Client *client = cls;
  382. struct Session *session = value;
  383. GSC_CLIENTS_notify_client_about_neighbour (client,
  384. session->peer,
  385. NULL, /* old TMAP: none */
  386. session->tmap);
  387. return GNUNET_OK;
  388. }
  389. /**
  390. * We have a new client, notify it about all current sessions.
  391. *
  392. * @param client the new client
  393. */
  394. void
  395. GSC_SESSIONS_notify_client_about_sessions (struct GSC_Client *client)
  396. {
  397. /* notify new client about existing sessions */
  398. GNUNET_CONTAINER_multipeermap_iterate (sessions,
  399. &notify_client_about_session,
  400. client);
  401. }
  402. /**
  403. * Try to perform a transmission on the given session. Will solicit
  404. * additional messages if the 'sme' queue is not full enough.
  405. *
  406. * @param session session to transmit messages from
  407. */
  408. static void
  409. try_transmission (struct Session *session);
  410. /**
  411. * Queue a request from a client for transmission to a particular peer.
  412. *
  413. * @param car request to queue; this handle is then shared between
  414. * the caller (CLIENTS subsystem) and SESSIONS and must not
  415. * be released by either until either #GSC_SESSIONS_dequeue(),
  416. * #GSC_SESSIONS_transmit() or #GSC_CLIENTS_failed()
  417. * have been invoked on it
  418. */
  419. void
  420. GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car)
  421. {
  422. struct Session *session;
  423. session = find_session (&car->target);
  424. if (NULL == session)
  425. {
  426. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  427. "Dropped client request for transmission (am disconnected)\n");
  428. GNUNET_break (0); /* should have been rejected earlier */
  429. GSC_CLIENTS_reject_request (car, GNUNET_NO);
  430. return;
  431. }
  432. if (car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
  433. {
  434. GNUNET_break (0);
  435. GSC_CLIENTS_reject_request (car, GNUNET_YES);
  436. return;
  437. }
  438. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  439. "Received client transmission request. queueing\n");
  440. GNUNET_CONTAINER_DLL_insert_tail (session->active_client_request_head,
  441. session->active_client_request_tail,
  442. car);
  443. try_transmission (session);
  444. }
  445. /**
  446. * Dequeue a request from a client from transmission to a particular peer.
  447. *
  448. * @param car request to dequeue; this handle will then be 'owned' by
  449. * the caller (CLIENTS sysbsystem)
  450. */
  451. void
  452. GSC_SESSIONS_dequeue_request (struct GSC_ClientActiveRequest *car)
  453. {
  454. struct Session *session;
  455. if (0 == memcmp (&car->target,
  456. &GSC_my_identity,
  457. sizeof(struct GNUNET_PeerIdentity)))
  458. return;
  459. session = find_session (&car->target);
  460. GNUNET_assert (NULL != session);
  461. GNUNET_CONTAINER_DLL_remove (session->active_client_request_head,
  462. session->active_client_request_tail,
  463. car);
  464. /* dequeueing of 'high' priority messages may unblock
  465. transmission for lower-priority messages, so we also
  466. need to try in this case. */
  467. try_transmission (session);
  468. }
  469. /**
  470. * Solicit messages for transmission, starting with those of the highest
  471. * priority.
  472. *
  473. * @param session session to solict messages for
  474. * @param msize how many bytes do we have already
  475. */
  476. static void
  477. solicit_messages (struct Session *session, size_t msize)
  478. {
  479. struct GSC_ClientActiveRequest *car;
  480. struct GSC_ClientActiveRequest *nxt;
  481. size_t so_size;
  482. enum GNUNET_MQ_PriorityPreferences pmax;
  483. so_size = msize;
  484. pmax = GNUNET_MQ_PRIO_BACKGROUND;
  485. for (car = session->active_client_request_head; NULL != car; car = car->next)
  486. {
  487. if (GNUNET_YES == car->was_solicited)
  488. continue;
  489. pmax = GNUNET_MAX (pmax, car->priority & GNUNET_MQ_PRIORITY_MASK);
  490. }
  491. nxt = session->active_client_request_head;
  492. while (NULL != (car = nxt))
  493. {
  494. nxt = car->next;
  495. if (car->priority < pmax)
  496. continue;
  497. if (so_size + car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
  498. break;
  499. so_size += car->msize;
  500. if (GNUNET_YES == car->was_solicited)
  501. continue;
  502. car->was_solicited = GNUNET_YES;
  503. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  504. "Soliciting message with priority %u\n",
  505. car->priority);
  506. GSC_CLIENTS_solicit_request (car);
  507. /* The above call may *dequeue* requests and thereby
  508. clobber 'nxt'. Hence we need to restart from the
  509. head of the list. */
  510. nxt = session->active_client_request_head;
  511. so_size = msize;
  512. }
  513. }
  514. /**
  515. * Some messages were delayed (corked), but the timeout has now expired.
  516. * Send them now.
  517. *
  518. * @param cls `struct Session` with the messages to transmit now
  519. */
  520. static void
  521. pop_cork_task (void *cls)
  522. {
  523. struct Session *session = cls;
  524. session->cork_task = NULL;
  525. try_transmission (session);
  526. }
  527. /**
  528. * Try to perform a transmission on the given session. Will solicit
  529. * additional messages if the 'sme' queue is not full enough or has
  530. * only low-priority messages.
  531. *
  532. * @param session session to transmit messages from
  533. */
  534. static void
  535. try_transmission (struct Session *session)
  536. {
  537. struct SessionMessageEntry *pos;
  538. size_t msize;
  539. struct GNUNET_TIME_Absolute now;
  540. struct GNUNET_TIME_Absolute min_deadline;
  541. enum GNUNET_MQ_PriorityPreferences maxp;
  542. enum GNUNET_MQ_PriorityPreferences maxpc;
  543. struct GSC_ClientActiveRequest *car;
  544. int excess;
  545. msize = 0;
  546. min_deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
  547. /* if the peer has excess bandwidth, background traffic is allowed,
  548. otherwise not */
  549. if (MAX_ENCRYPTED_MESSAGE_QUEUE_SIZE <=
  550. GSC_NEIGHBOURS_get_queue_length (session->kx))
  551. {
  552. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  553. "Transmission queue already very long, waiting...\n");
  554. return; /* queue already too long */
  555. }
  556. excess = GSC_NEIGHBOURS_check_excess_bandwidth (session->kx);
  557. if (GNUNET_YES == excess)
  558. maxp = GNUNET_MQ_PRIO_BACKGROUND;
  559. else
  560. maxp = GNUNET_MQ_PRIO_BEST_EFFORT;
  561. /* determine highest priority of 'ready' messages we already solicited from clients */
  562. pos = session->sme_head;
  563. while ((NULL != pos) &&
  564. (msize + pos->size <= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE))
  565. {
  566. GNUNET_assert (pos->size < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE);
  567. msize += pos->size;
  568. maxp = GNUNET_MAX (maxp, pos->priority & GNUNET_MQ_PRIORITY_MASK);
  569. min_deadline = GNUNET_TIME_absolute_min (min_deadline, pos->deadline);
  570. pos = pos->next;
  571. }
  572. GNUNET_log (
  573. GNUNET_ERROR_TYPE_DEBUG,
  574. "Calculating transmission set with %u priority (%s) and %s earliest deadline\n",
  575. maxp,
  576. (GNUNET_YES == excess) ? "excess bandwidth" : "limited bandwidth",
  577. GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (
  578. min_deadline),
  579. GNUNET_YES));
  580. if (maxp < GNUNET_MQ_PRIO_CRITICAL_CONTROL)
  581. {
  582. /* if highest already solicited priority from clients is not critical,
  583. check if there are higher-priority messages to be solicited from clients */
  584. if (GNUNET_YES == excess)
  585. maxpc = GNUNET_MQ_PRIO_BACKGROUND;
  586. else
  587. maxpc = GNUNET_MQ_PRIO_BEST_EFFORT;
  588. for (car = session->active_client_request_head; NULL != car;
  589. car = car->next)
  590. {
  591. if (GNUNET_YES == car->was_solicited)
  592. continue;
  593. maxpc = GNUNET_MAX (maxpc, car->priority & GNUNET_MQ_PRIORITY_MASK);
  594. }
  595. if (maxpc > maxp)
  596. {
  597. /* we have messages waiting for solicitation that have a higher
  598. priority than those that we already accepted; solicit the
  599. high-priority messages first */
  600. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  601. "Soliciting messages based on priority (%u > %u)\n",
  602. maxpc,
  603. maxp);
  604. solicit_messages (session, 0);
  605. return;
  606. }
  607. }
  608. else
  609. {
  610. /* never solicit more, we have critical messages to process */
  611. excess = GNUNET_NO;
  612. maxpc = GNUNET_MQ_PRIO_BACKGROUND;
  613. }
  614. now = GNUNET_TIME_absolute_get ();
  615. if (((GNUNET_YES == excess) || (maxpc >= GNUNET_MQ_PRIO_BEST_EFFORT)) &&
  616. ((0 == msize) ||
  617. ((msize < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE / 2) &&
  618. (min_deadline.abs_value_us > now.abs_value_us))))
  619. {
  620. /* not enough ready yet (tiny message & cork possible), or no messages at all,
  621. and either excess bandwidth or best-effort or higher message waiting at
  622. client; in this case, we try to solicit more */
  623. GNUNET_log (
  624. GNUNET_ERROR_TYPE_DEBUG,
  625. "Soliciting messages (excess %d, maxpc %d, message size %u, deadline %s)\n",
  626. excess,
  627. maxpc,
  628. (unsigned int) msize,
  629. GNUNET_STRINGS_relative_time_to_string (
  630. GNUNET_TIME_absolute_get_remaining (
  631. min_deadline),
  632. GNUNET_YES));
  633. solicit_messages (session, msize);
  634. if (msize > 0)
  635. {
  636. /* if there is data to send, just not yet, make sure we do transmit
  637. * it once the deadline is reached */
  638. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  639. "Corking until %s\n",
  640. GNUNET_STRINGS_relative_time_to_string (
  641. GNUNET_TIME_absolute_get_remaining (min_deadline),
  642. GNUNET_YES));
  643. if (NULL != session->cork_task)
  644. GNUNET_SCHEDULER_cancel (session->cork_task);
  645. session->cork_task =
  646. GNUNET_SCHEDULER_add_at (min_deadline, &pop_cork_task, session);
  647. }
  648. else
  649. {
  650. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  651. "Queue empty, waiting for solicitations\n");
  652. }
  653. return;
  654. }
  655. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  656. "Building combined plaintext buffer to transmit message!\n");
  657. /* create plaintext buffer of all messages (that fit), encrypt and
  658. transmit */
  659. {
  660. static unsigned long long total_bytes;
  661. static unsigned int total_msgs;
  662. char pbuf[msize]; /* plaintext */
  663. size_t used;
  664. used = 0;
  665. while ((NULL != (pos = session->sme_head)) && (used + pos->size <= msize))
  666. {
  667. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  668. "Adding message of type %d (%d/%d) to payload for %s\n",
  669. ntohs (((const struct GNUNET_MessageHeader *) &pos[1])->type),
  670. pos->is_typemap,
  671. pos->is_typemap_confirm,
  672. GNUNET_i2s (session->peer));
  673. GNUNET_memcpy (&pbuf[used], &pos[1], pos->size);
  674. used += pos->size;
  675. GNUNET_CONTAINER_DLL_remove (session->sme_head, session->sme_tail, pos);
  676. GNUNET_free (pos);
  677. }
  678. /* compute average payload size */
  679. total_bytes += used;
  680. total_msgs++;
  681. if (0 == total_msgs)
  682. {
  683. /* 2^32 messages, wrap around... */
  684. total_msgs = 1;
  685. total_bytes = used;
  686. }
  687. GNUNET_STATISTICS_set (GSC_stats,
  688. "# avg payload per encrypted message",
  689. total_bytes / total_msgs,
  690. GNUNET_NO);
  691. /* now actually transmit... */
  692. GSC_KX_encrypt_and_transmit (session->kx, pbuf, used);
  693. }
  694. }
  695. /**
  696. * Send an updated typemap message to the neighbour now,
  697. * and restart typemap transmissions.
  698. *
  699. * @param cls the message
  700. * @param key neighbour's identity
  701. * @param value `struct Neighbour` of the target
  702. * @return always #GNUNET_OK
  703. */
  704. static int
  705. do_restart_typemap_message (void *cls,
  706. const struct GNUNET_PeerIdentity *key,
  707. void *value)
  708. {
  709. const struct GNUNET_MessageHeader *hdr = cls;
  710. struct Session *session = value;
  711. struct SessionMessageEntry *sme;
  712. uint16_t size;
  713. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  714. "Restarting sending TYPEMAP to %s\n",
  715. GNUNET_i2s (session->peer));
  716. size = ntohs (hdr->size);
  717. for (sme = session->sme_head; NULL != sme; sme = sme->next)
  718. {
  719. if (GNUNET_YES == sme->is_typemap)
  720. {
  721. GNUNET_CONTAINER_DLL_remove (session->sme_head, session->sme_tail, sme);
  722. GNUNET_free (sme);
  723. break;
  724. }
  725. }
  726. sme = GNUNET_malloc (sizeof(struct SessionMessageEntry) + size);
  727. sme->is_typemap = GNUNET_YES;
  728. GNUNET_memcpy (&sme[1], hdr, size);
  729. sme->size = size;
  730. sme->priority = GNUNET_MQ_PRIO_CRITICAL_CONTROL;
  731. GNUNET_CONTAINER_DLL_insert (session->sme_head, session->sme_tail, sme);
  732. try_transmission (session);
  733. start_typemap_task (session);
  734. return GNUNET_OK;
  735. }
  736. /**
  737. * Broadcast an updated typemap message to all neighbours.
  738. * Restarts the retransmissions until the typemaps are confirmed.
  739. *
  740. * @param msg message to transmit
  741. */
  742. void
  743. GSC_SESSIONS_broadcast_typemap (const struct GNUNET_MessageHeader *msg)
  744. {
  745. if (NULL == sessions)
  746. return;
  747. GNUNET_CONTAINER_multipeermap_iterate (sessions,
  748. &do_restart_typemap_message,
  749. (void *) msg);
  750. }
  751. /**
  752. * Traffic is being solicited for the given peer. This means that the
  753. * message queue on the transport-level (NEIGHBOURS subsystem) is now
  754. * empty and it is now OK to transmit another (non-control) message.
  755. *
  756. * @param pid identity of peer ready to receive data
  757. */
  758. void
  759. GSC_SESSIONS_solicit (const struct GNUNET_PeerIdentity *pid)
  760. {
  761. struct Session *session;
  762. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  763. "Transport solicits for %s\n",
  764. GNUNET_i2s (pid));
  765. session = find_session (pid);
  766. if (NULL == session)
  767. return;
  768. try_transmission (session);
  769. }
  770. /**
  771. * Transmit a message to a particular peer.
  772. *
  773. * @param car original request that was queued and then solicited;
  774. * this handle will now be 'owned' by the SESSIONS subsystem
  775. * @param msg message to transmit
  776. * @param priority how important is this message
  777. */
  778. void
  779. GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car,
  780. const struct GNUNET_MessageHeader *msg,
  781. enum GNUNET_MQ_PriorityPreferences priority)
  782. {
  783. struct Session *session;
  784. struct SessionMessageEntry *sme;
  785. struct SessionMessageEntry *pos;
  786. size_t msize;
  787. session = find_session (&car->target);
  788. if (NULL == session)
  789. return;
  790. msize = ntohs (msg->size);
  791. sme = GNUNET_malloc (sizeof(struct SessionMessageEntry) + msize);
  792. GNUNET_memcpy (&sme[1], msg, msize);
  793. sme->size = msize;
  794. sme->priority = priority;
  795. if (0 != (GNUNET_MQ_PREF_CORK_ALLOWED & priority))
  796. {
  797. sme->deadline =
  798. GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_MAX_CORK_DELAY);
  799. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  800. "Mesage corked, delaying transmission\n");
  801. }
  802. pos = session->sme_head;
  803. while ((NULL != pos) && (pos->priority >= sme->priority))
  804. pos = pos->next;
  805. if (NULL == pos)
  806. GNUNET_CONTAINER_DLL_insert_tail (session->sme_head,
  807. session->sme_tail,
  808. sme);
  809. else
  810. GNUNET_CONTAINER_DLL_insert_after (session->sme_head,
  811. session->sme_tail,
  812. pos->prev,
  813. sme);
  814. try_transmission (session);
  815. }
  816. /**
  817. * We have received a typemap message from a peer, update ours.
  818. * Notifies clients about the session.
  819. *
  820. * @param peer peer this is about
  821. * @param msg typemap update message
  822. */
  823. void
  824. GSC_SESSIONS_set_typemap (const struct GNUNET_PeerIdentity *peer,
  825. const struct GNUNET_MessageHeader *msg)
  826. {
  827. struct Session *session;
  828. struct GSC_TypeMap *nmap;
  829. struct SessionMessageEntry *sme;
  830. struct TypeMapConfirmationMessage *tmc;
  831. nmap = GSC_TYPEMAP_get_from_message (msg);
  832. if (NULL == nmap)
  833. {
  834. GNUNET_break_op (0);
  835. return; /* malformed */
  836. }
  837. session = find_session (peer);
  838. if (NULL == session)
  839. {
  840. GSC_TYPEMAP_destroy (nmap);
  841. GNUNET_break (0);
  842. return;
  843. }
  844. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  845. "Received TYPEMAP from %s\n",
  846. GNUNET_i2s (session->peer));
  847. for (sme = session->sme_head; NULL != sme; sme = sme->next)
  848. {
  849. if (GNUNET_YES == sme->is_typemap_confirm)
  850. {
  851. GNUNET_CONTAINER_DLL_remove (session->sme_head, session->sme_tail, sme);
  852. GNUNET_free (sme);
  853. break;
  854. }
  855. }
  856. sme = GNUNET_malloc (sizeof(struct SessionMessageEntry)
  857. + sizeof(struct TypeMapConfirmationMessage));
  858. sme->deadline = GNUNET_TIME_absolute_get ();
  859. sme->size = sizeof(struct TypeMapConfirmationMessage);
  860. sme->priority = GNUNET_MQ_PRIO_CRITICAL_CONTROL;
  861. sme->is_typemap_confirm = GNUNET_YES;
  862. tmc = (struct TypeMapConfirmationMessage *) &sme[1];
  863. tmc->header.size = htons (sizeof(struct TypeMapConfirmationMessage));
  864. tmc->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIRM_TYPE_MAP);
  865. tmc->reserved = htonl (0);
  866. GSC_TYPEMAP_hash (nmap, &tmc->tm_hash);
  867. GNUNET_CONTAINER_DLL_insert (session->sme_head, session->sme_tail, sme);
  868. try_transmission (session);
  869. GSC_CLIENTS_notify_clients_about_neighbour (peer, session->tmap, nmap);
  870. GSC_TYPEMAP_destroy (session->tmap);
  871. session->tmap = nmap;
  872. }
  873. /**
  874. * The given peer send a message of the specified type. Make sure the
  875. * respective bit is set in its type-map and that clients are notified
  876. * about the session.
  877. *
  878. * @param peer peer this is about
  879. * @param type type of the message
  880. */
  881. void
  882. GSC_SESSIONS_add_to_typemap (const struct GNUNET_PeerIdentity *peer,
  883. uint16_t type)
  884. {
  885. struct Session *session;
  886. struct GSC_TypeMap *nmap;
  887. if (0 == memcmp (peer, &GSC_my_identity, sizeof(struct GNUNET_PeerIdentity)))
  888. return;
  889. session = find_session (peer);
  890. GNUNET_assert (NULL != session);
  891. if (GNUNET_YES == GSC_TYPEMAP_test_match (session->tmap, &type, 1))
  892. return; /* already in it */
  893. nmap = GSC_TYPEMAP_extend (session->tmap, &type, 1);
  894. GSC_CLIENTS_notify_clients_about_neighbour (peer, session->tmap, nmap);
  895. GSC_TYPEMAP_destroy (session->tmap);
  896. session->tmap = nmap;
  897. }
  898. /**
  899. * Initialize sessions subsystem.
  900. */
  901. void
  902. GSC_SESSIONS_init ()
  903. {
  904. sessions = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES);
  905. }
  906. /**
  907. * Helper function for #GSC_SESSIONS_done() to free all
  908. * active sessions.
  909. *
  910. * @param cls NULL
  911. * @param key identity of the connected peer
  912. * @param value the `struct Session` for the peer
  913. * @return #GNUNET_OK (continue to iterate)
  914. */
  915. static int
  916. free_session_helper (void *cls,
  917. const struct GNUNET_PeerIdentity *key,
  918. void *value)
  919. {
  920. /* struct Session *session = value; */
  921. GSC_SESSIONS_end (key);
  922. return GNUNET_OK;
  923. }
  924. /**
  925. * Shutdown sessions subsystem.
  926. */
  927. void
  928. GSC_SESSIONS_done ()
  929. {
  930. if (NULL != sessions)
  931. {
  932. GNUNET_CONTAINER_multipeermap_iterate (sessions,
  933. &free_session_helper,
  934. NULL);
  935. GNUNET_CONTAINER_multipeermap_destroy (sessions);
  936. sessions = NULL;
  937. }
  938. }
  939. /* end of gnunet-service-core_sessions.c */