gnunet-service-cadet_channel.c 63 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451
  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2013 Christian Grothoff (and other contributing authors)
  4. GNUnet is free software; you can redistribute it and/or modify
  5. it under the terms of the GNU General Public License as published
  6. by the Free Software Foundation; either version 3, or (at your
  7. option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. General Public License for more details.
  12. You should have received a copy of the GNU General Public License
  13. along with GNUnet; see the file COPYING. If not, write to the
  14. Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  15. Boston, MA 02111-1307, USA.
  16. */
  17. #include "platform.h"
  18. #include "gnunet_util_lib.h"
  19. #include "gnunet_statistics_service.h"
  20. #include "cadet.h"
  21. #include "cadet_protocol.h"
  22. #include "gnunet-service-cadet_channel.h"
  23. #include "gnunet-service-cadet_local.h"
  24. #include "gnunet-service-cadet_tunnel.h"
  25. #include "gnunet-service-cadet_peer.h"
  26. #define LOG(level, ...) GNUNET_log_from(level,"cadet-chn",__VA_ARGS__)
  27. #define CADET_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(\
  28. GNUNET_TIME_UNIT_MILLISECONDS, 250)
  29. #define CADET_RETRANSMIT_MARGIN 4
  30. /**
  31. * All the states a connection can be in.
  32. */
  33. enum CadetChannelState
  34. {
  35. /**
  36. * Uninitialized status, should never appear in operation.
  37. */
  38. CADET_CHANNEL_NEW,
  39. /**
  40. * Connection create message sent, waiting for ACK.
  41. */
  42. CADET_CHANNEL_SENT,
  43. /**
  44. * Connection confirmed, ready to carry traffic.
  45. */
  46. CADET_CHANNEL_READY,
  47. };
  48. /**
  49. * Info holder for channel messages in queues.
  50. */
  51. struct CadetChannelQueue
  52. {
  53. /**
  54. * Tunnel Queue.
  55. */
  56. struct CadetTunnelQueue *tq;
  57. /**
  58. * Message type (DATA/DATA_ACK)
  59. */
  60. uint16_t type;
  61. /**
  62. * Message copy (for DATAs, to start retransmission timer)
  63. */
  64. struct CadetReliableMessage *copy;
  65. /**
  66. * Reliability (for DATA_ACKs, to access rel->ack_q)
  67. */
  68. struct CadetChannelReliability *rel;
  69. };
  70. /**
  71. * Info needed to retry a message in case it gets lost.
  72. */
  73. struct CadetReliableMessage
  74. {
  75. /**
  76. * Double linked list, FIFO style
  77. */
  78. struct CadetReliableMessage *next;
  79. struct CadetReliableMessage *prev;
  80. /**
  81. * Type of message (payload, channel management).
  82. */
  83. int16_t type;
  84. /**
  85. * Tunnel Reliability queue this message is in.
  86. */
  87. struct CadetChannelReliability *rel;
  88. /**
  89. * ID of the message (ACK needed to free)
  90. */
  91. uint32_t mid;
  92. /**
  93. * Tunnel Queue.
  94. */
  95. struct CadetChannelQueue *chq;
  96. /**
  97. * When was this message issued (to calculate ACK delay)
  98. */
  99. struct GNUNET_TIME_Absolute timestamp;
  100. /* struct GNUNET_CADET_Data with payload */
  101. };
  102. /**
  103. * Info about the traffic state for a client in a channel.
  104. */
  105. struct CadetChannelReliability
  106. {
  107. /**
  108. * Channel this is about.
  109. */
  110. struct CadetChannel *ch;
  111. /**
  112. * DLL of messages sent and not yet ACK'd.
  113. */
  114. struct CadetReliableMessage *head_sent;
  115. struct CadetReliableMessage *tail_sent;
  116. /**
  117. * DLL of messages received out of order.
  118. */
  119. struct CadetReliableMessage *head_recv;
  120. struct CadetReliableMessage *tail_recv;
  121. /**
  122. * Messages received.
  123. */
  124. unsigned int n_recv;
  125. /**
  126. * Next MID to use for outgoing traffic.
  127. */
  128. uint32_t mid_send;
  129. /**
  130. * Next MID expected for incoming traffic.
  131. */
  132. uint32_t mid_recv;
  133. /**
  134. * Handle for queued unique data CREATE, DATA_ACK.
  135. */
  136. struct CadetChannelQueue *uniq;
  137. /**
  138. * Can we send data to the client?
  139. */
  140. int client_ready;
  141. /**
  142. * Can the client send data to us?
  143. */
  144. int client_allowed;
  145. /**
  146. * Task to resend/poll in case no ACK is received.
  147. */
  148. struct GNUNET_SCHEDULER_Task * retry_task;
  149. /**
  150. * Counter for exponential backoff.
  151. */
  152. struct GNUNET_TIME_Relative retry_timer;
  153. /**
  154. * How long does it usually take to get an ACK.
  155. */
  156. struct GNUNET_TIME_Relative expected_delay;
  157. };
  158. /**
  159. * Struct containing all information regarding a channel to a remote client.
  160. */
  161. struct CadetChannel
  162. {
  163. /**
  164. * Tunnel this channel is in.
  165. */
  166. struct CadetTunnel *t;
  167. /**
  168. * Destination port of the channel.
  169. */
  170. uint32_t port;
  171. /**
  172. * Global channel number ( < GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
  173. */
  174. CADET_ChannelNumber gid;
  175. /**
  176. * Local tunnel number for root (owner) client.
  177. * ( >= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI or 0 )
  178. */
  179. CADET_ChannelNumber lid_root;
  180. /**
  181. * Local tunnel number for local destination clients (incoming number)
  182. * ( >= GNUNET_CADET_LOCAL_CHANNEL_ID_SERV or 0).
  183. */
  184. CADET_ChannelNumber lid_dest;
  185. /**
  186. * Channel state.
  187. */
  188. enum CadetChannelState state;
  189. /**
  190. * Is the tunnel bufferless (minimum latency)?
  191. */
  192. int nobuffer;
  193. /**
  194. * Is the tunnel reliable?
  195. */
  196. int reliable;
  197. /**
  198. * Last time the channel was used
  199. */
  200. struct GNUNET_TIME_Absolute timestamp;
  201. /**
  202. * Client owner of the tunnel, if any
  203. */
  204. struct CadetClient *root;
  205. /**
  206. * Client destination of the tunnel, if any.
  207. */
  208. struct CadetClient *dest;
  209. /**
  210. * Flag to signal the destruction of the channel.
  211. * If this is set GNUNET_YES the channel will be destroyed
  212. * when the queue is empty.
  213. */
  214. int destroy;
  215. /**
  216. * Total (reliable) messages pending ACK for this channel.
  217. */
  218. unsigned int pending_messages;
  219. /**
  220. * Reliability data.
  221. * Only present (non-NULL) at the owner of a tunnel.
  222. */
  223. struct CadetChannelReliability *root_rel;
  224. /**
  225. * Reliability data.
  226. * Only present (non-NULL) at the destination of a tunnel.
  227. */
  228. struct CadetChannelReliability *dest_rel;
  229. };
  230. /******************************************************************************/
  231. /******************************* GLOBALS ***********************************/
  232. /******************************************************************************/
  233. /**
  234. * Global handle to the statistics service.
  235. */
  236. extern struct GNUNET_STATISTICS_Handle *stats;
  237. /**
  238. * Local peer own ID (memory efficient handle).
  239. */
  240. extern GNUNET_PEER_Id myid;
  241. /******************************************************************************/
  242. /******************************** STATIC ***********************************/
  243. /******************************************************************************/
  244. /**
  245. * Destroy a reliable message after it has been acknowledged, either by
  246. * direct mid ACK or bitfield. Updates the appropriate data structures and
  247. * timers and frees all memory.
  248. *
  249. * @param copy Message that is no longer needed: remote peer got it.
  250. * @param update_time Is the timing information relevant?
  251. * If this message is ACK in a batch the timing information
  252. * is skewed by the retransmission, count only for the
  253. * retransmitted message.
  254. *
  255. * @return #GNUNET_YES if channel was destroyed as a result of the call,
  256. * #GNUNET_NO otherwise.
  257. */
  258. static int
  259. rel_message_free (struct CadetReliableMessage *copy, int update_time);
  260. /**
  261. * send a channel create message.
  262. *
  263. * @param ch Channel for which to send.
  264. */
  265. static void
  266. send_create (struct CadetChannel *ch);
  267. /**
  268. * Confirm we got a channel create, FWD ack.
  269. *
  270. * @param ch The channel to confirm.
  271. * @param fwd Should we send a FWD ACK? (going dest->root)
  272. */
  273. static void
  274. send_ack (struct CadetChannel *ch, int fwd);
  275. /**
  276. * Test if the channel is loopback: both root and dest are on the local peer.
  277. *
  278. * @param ch Channel to test.
  279. *
  280. * @return #GNUNET_YES if channel is loopback, #GNUNET_NO otherwise.
  281. */
  282. static int
  283. is_loopback (const struct CadetChannel *ch)
  284. {
  285. if (NULL != ch->t)
  286. return GCT_is_loopback (ch->t);
  287. return (NULL != ch->root && NULL != ch->dest);
  288. }
  289. /**
  290. * Save a copy of the data message for later retransmission.
  291. *
  292. * @param msg Message to copy.
  293. * @param mid Message ID.
  294. * @param rel Reliability data for retransmission.
  295. */
  296. static struct CadetReliableMessage *
  297. copy_message (const struct GNUNET_CADET_Data *msg, uint32_t mid,
  298. struct CadetChannelReliability *rel)
  299. {
  300. struct CadetReliableMessage *copy;
  301. uint16_t size;
  302. size = ntohs (msg->header.size);
  303. copy = GNUNET_malloc (sizeof (*copy) + size);
  304. copy->mid = mid;
  305. copy->rel = rel;
  306. copy->type = GNUNET_MESSAGE_TYPE_CADET_DATA;
  307. memcpy (&copy[1], msg, size);
  308. return copy;
  309. }
  310. /**
  311. * We have received a message out of order, or the client is not ready.
  312. * Buffer it until we receive an ACK from the client or the missing
  313. * message from the channel.
  314. *
  315. * @param msg Message to buffer (MUST be of type CADET_DATA).
  316. * @param rel Reliability data to the corresponding direction.
  317. */
  318. static void
  319. add_buffered_data (const struct GNUNET_CADET_Data *msg,
  320. struct CadetChannelReliability *rel)
  321. {
  322. struct CadetReliableMessage *copy;
  323. struct CadetReliableMessage *prev;
  324. uint32_t mid;
  325. mid = ntohl (msg->mid);
  326. LOG (GNUNET_ERROR_TYPE_DEBUG, "add_buffered_data %u\n", mid);
  327. rel->n_recv++;
  328. // FIXME do something better than O(n), although n < 64...
  329. // FIXME start from the end (most messages are the latest ones)
  330. for (prev = rel->head_recv; NULL != prev; prev = prev->next)
  331. {
  332. LOG (GNUNET_ERROR_TYPE_DEBUG, " prev %u\n", prev->mid);
  333. if (prev->mid == mid)
  334. {
  335. LOG (GNUNET_ERROR_TYPE_DEBUG, " already there!\n");
  336. return;
  337. }
  338. else if (GC_is_pid_bigger (prev->mid, mid))
  339. {
  340. LOG (GNUNET_ERROR_TYPE_DEBUG, " bingo!\n");
  341. copy = copy_message (msg, mid, rel);
  342. GNUNET_CONTAINER_DLL_insert_before (rel->head_recv, rel->tail_recv,
  343. prev, copy);
  344. return;
  345. }
  346. }
  347. copy = copy_message (msg, mid, rel);
  348. LOG (GNUNET_ERROR_TYPE_DEBUG, " insert at tail!\n");
  349. GNUNET_CONTAINER_DLL_insert_tail (rel->head_recv, rel->tail_recv, copy);
  350. LOG (GNUNET_ERROR_TYPE_DEBUG, "add_buffered_data END\n");
  351. }
  352. /**
  353. * Add a destination client to a channel, initializing all data structures
  354. * in the channel and the client.
  355. *
  356. * @param ch Channel to which add the destination.
  357. * @param c Client which to add to the channel.
  358. */
  359. static void
  360. add_destination (struct CadetChannel *ch, struct CadetClient *c)
  361. {
  362. if (NULL != ch->dest)
  363. {
  364. GNUNET_break (0);
  365. return;
  366. }
  367. /* Assign local id as destination */
  368. ch->lid_dest = GML_get_next_chid (c);
  369. /* Store in client's hashmap */
  370. GML_channel_add (c, ch->lid_dest, ch);
  371. GNUNET_break (NULL == ch->dest_rel);
  372. ch->dest_rel = GNUNET_new (struct CadetChannelReliability);
  373. ch->dest_rel->ch = ch;
  374. ch->dest_rel->expected_delay.rel_value_us = 0;
  375. ch->dest_rel->retry_timer = CADET_RETRANSMIT_TIME;
  376. ch->dest = c;
  377. }
  378. /**
  379. * Set options in a channel, extracted from a bit flag field.
  380. *
  381. * @param ch Channel to set options to.
  382. * @param options Bit array in host byte order.
  383. */
  384. static void
  385. channel_set_options (struct CadetChannel *ch, uint32_t options)
  386. {
  387. ch->nobuffer = (options & GNUNET_CADET_OPTION_NOBUFFER) != 0 ?
  388. GNUNET_YES : GNUNET_NO;
  389. ch->reliable = (options & GNUNET_CADET_OPTION_RELIABLE) != 0 ?
  390. GNUNET_YES : GNUNET_NO;
  391. }
  392. /**
  393. * Get a bit flag field with the options of a channel.
  394. *
  395. * @param ch Channel to get options from.
  396. *
  397. * @return Bit array in host byte order.
  398. */
  399. static uint32_t
  400. channel_get_options (struct CadetChannel *ch)
  401. {
  402. uint32_t options;
  403. options = 0;
  404. if (ch->nobuffer)
  405. options |= GNUNET_CADET_OPTION_NOBUFFER;
  406. if (ch->reliable)
  407. options |= GNUNET_CADET_OPTION_RELIABLE;
  408. return options;
  409. }
  410. /**
  411. * Notify a client that the channel is no longer valid.
  412. *
  413. * @param ch Channel that is destroyed.
  414. * @param local_only Should we avoid sending it to other peers?
  415. */
  416. static void
  417. send_destroy (struct CadetChannel *ch, int local_only)
  418. {
  419. struct GNUNET_CADET_ChannelManage msg;
  420. msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY);
  421. msg.header.size = htons (sizeof (msg));
  422. msg.chid = htonl (ch->gid);
  423. /* If root is not NULL, notify.
  424. * If it's NULL, check lid_root. When a local destroy comes in, root
  425. * is set to NULL but lid_root is left untouched. In this case, do nothing,
  426. * the client is the one who requested the channel to be destroyed.
  427. */
  428. if (NULL != ch->root)
  429. GML_send_channel_destroy (ch->root, ch->lid_root);
  430. else if (0 == ch->lid_root && GNUNET_NO == local_only)
  431. GCCH_send_prebuilt_message (&msg.header, ch, GNUNET_NO, NULL);
  432. if (NULL != ch->dest)
  433. GML_send_channel_destroy (ch->dest, ch->lid_dest);
  434. else if (0 == ch->lid_dest && GNUNET_NO == local_only)
  435. GCCH_send_prebuilt_message (&msg.header, ch, GNUNET_YES, NULL);
  436. }
  437. /**
  438. * Notify the destination client that a new incoming channel was created.
  439. *
  440. * @param ch Channel that was created.
  441. */
  442. static void
  443. send_client_create (struct CadetChannel *ch)
  444. {
  445. uint32_t opt;
  446. if (NULL == ch->dest)
  447. return;
  448. opt = 0;
  449. opt |= GNUNET_YES == ch->reliable ? GNUNET_CADET_OPTION_RELIABLE : 0;
  450. opt |= GNUNET_YES == ch->nobuffer ? GNUNET_CADET_OPTION_NOBUFFER : 0;
  451. GML_send_channel_create (ch->dest, ch->lid_dest, ch->port, opt,
  452. GCT_get_destination (ch->t));
  453. }
  454. /**
  455. * Send data to a client.
  456. *
  457. * If the client is ready, send directly, otherwise buffer while listening
  458. * for a local ACK.
  459. *
  460. * @param ch Channel
  461. * @param msg Message.
  462. * @param fwd Is this a fwd (root->dest) message?
  463. */
  464. static void
  465. send_client_data (struct CadetChannel *ch,
  466. const struct GNUNET_CADET_Data *msg,
  467. int fwd)
  468. {
  469. if (fwd)
  470. {
  471. if (ch->dest_rel->client_ready)
  472. {
  473. GML_send_data (ch->dest, msg, ch->lid_dest);
  474. ch->dest_rel->client_ready = GNUNET_NO;
  475. ch->dest_rel->mid_recv++;
  476. }
  477. else
  478. add_buffered_data (msg, ch->dest_rel);
  479. }
  480. else
  481. {
  482. if (ch->root_rel->client_ready)
  483. {
  484. GML_send_data (ch->root, msg, ch->lid_root);
  485. ch->root_rel->client_ready = GNUNET_NO;
  486. ch->root_rel->mid_recv++;
  487. }
  488. else
  489. add_buffered_data (msg, ch->root_rel);
  490. }
  491. }
  492. /**
  493. * Send a buffered message to the client, for in order delivery or
  494. * as result of client ACK.
  495. *
  496. * @param ch Channel on which to empty the message buffer.
  497. * @param c Client to send to.
  498. * @param fwd Is this to send FWD data?.
  499. */
  500. static void
  501. send_client_buffered_data (struct CadetChannel *ch,
  502. struct CadetClient *c,
  503. int fwd)
  504. {
  505. struct CadetReliableMessage *copy;
  506. struct CadetChannelReliability *rel;
  507. LOG (GNUNET_ERROR_TYPE_DEBUG, "send_buffered_data\n");
  508. rel = fwd ? ch->dest_rel : ch->root_rel;
  509. if (GNUNET_NO == rel->client_ready)
  510. {
  511. LOG (GNUNET_ERROR_TYPE_DEBUG, "client not ready\n");
  512. return;
  513. }
  514. copy = rel->head_recv;
  515. /* We never buffer channel management messages */
  516. if (NULL != copy)
  517. {
  518. if (copy->mid == rel->mid_recv || GNUNET_NO == ch->reliable)
  519. {
  520. struct GNUNET_CADET_Data *msg = (struct GNUNET_CADET_Data *) &copy[1];
  521. LOG (GNUNET_ERROR_TYPE_DEBUG, " have %u! now expecting %u\n",
  522. copy->mid, rel->mid_recv + 1);
  523. send_client_data (ch, msg, fwd);
  524. rel->n_recv--;
  525. GNUNET_CONTAINER_DLL_remove (rel->head_recv, rel->tail_recv, copy);
  526. LOG (GNUNET_ERROR_TYPE_DEBUG, " COPYFREE RECV %p\n", copy);
  527. GNUNET_free (copy);
  528. GCCH_send_data_ack (ch, fwd);
  529. }
  530. else
  531. {
  532. LOG (GNUNET_ERROR_TYPE_DEBUG, " reliable && don't have %u, next is %u\n",
  533. rel->mid_recv, copy->mid);
  534. if (GNUNET_YES == ch->destroy)
  535. {
  536. /* We don't have the next data piece and the remote peer has closed the
  537. * channel. We won't receive it anymore, so just destroy the channel.
  538. * FIXME: wait some time to allow other connections to
  539. * deliver missing messages
  540. */
  541. send_destroy (ch, GNUNET_YES);
  542. GCCH_destroy (ch);
  543. }
  544. }
  545. }
  546. LOG (GNUNET_ERROR_TYPE_DEBUG, "send_buffered_data END\n");
  547. }
  548. /**
  549. * Allow a client to send more data.
  550. *
  551. * In case the client was already allowed to send data, do nothing.
  552. *
  553. * @param ch Channel.
  554. * @param fwd Is this a FWD ACK? (FWD ACKs are sent to root)
  555. */
  556. static void
  557. send_client_ack (struct CadetChannel *ch, int fwd)
  558. {
  559. struct CadetChannelReliability *rel = fwd ? ch->root_rel : ch->dest_rel;
  560. struct CadetClient *c = fwd ? ch->root : ch->dest;
  561. if (NULL == c)
  562. {
  563. GNUNET_break (GNUNET_NO != ch->destroy);
  564. return;
  565. }
  566. LOG (GNUNET_ERROR_TYPE_DEBUG,
  567. " sending %s ack to client on channel %s\n",
  568. GC_f2s (fwd), GCCH_2s (ch));
  569. if (NULL == rel)
  570. {
  571. GNUNET_break (0);
  572. return;
  573. }
  574. if (GNUNET_YES == rel->client_allowed)
  575. {
  576. LOG (GNUNET_ERROR_TYPE_DEBUG, " already allowed\n");
  577. return;
  578. }
  579. rel->client_allowed = GNUNET_YES;
  580. GML_send_ack (c, fwd ? ch->lid_root : ch->lid_dest);
  581. }
  582. /**
  583. * Notify the root that the destination rejected the channel.
  584. *
  585. * @param ch Rejected channel.
  586. */
  587. static void
  588. send_client_nack (struct CadetChannel *ch)
  589. {
  590. if (NULL == ch->root)
  591. {
  592. GNUNET_break (0);
  593. return;
  594. }
  595. GML_send_channel_nack (ch->root, ch->lid_root);
  596. }
  597. /**
  598. * We haven't received an ACK after a certain time: restransmit the message.
  599. *
  600. * @param cls Closure (CadetChannelReliability with the message to restransmit)
  601. * @param tc TaskContext.
  602. */
  603. static void
  604. channel_retransmit_message (void *cls,
  605. const struct GNUNET_SCHEDULER_TaskContext *tc)
  606. {
  607. struct CadetChannelReliability *rel = cls;
  608. struct CadetReliableMessage *copy;
  609. struct CadetChannel *ch;
  610. struct GNUNET_CADET_Data *payload;
  611. int fwd;
  612. rel->retry_task = NULL;
  613. if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
  614. return;
  615. ch = rel->ch;
  616. copy = rel->head_sent;
  617. if (NULL == copy)
  618. {
  619. GNUNET_break (0); // FIXME tripped in rps testcase
  620. return;
  621. }
  622. payload = (struct GNUNET_CADET_Data *) &copy[1];
  623. fwd = (rel == ch->root_rel);
  624. /* Message not found in the queue that we are going to use. */
  625. LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! RETRANSMIT %u\n", copy->mid);
  626. GCCH_send_prebuilt_message (&payload->header, ch, fwd, copy);
  627. GNUNET_STATISTICS_update (stats, "# data retransmitted", 1, GNUNET_NO);
  628. }
  629. /**
  630. * We haven't received an Channel ACK after a certain time: resend the CREATE.
  631. *
  632. * @param cls Closure (CadetChannelReliability of the channel to recreate)
  633. * @param tc TaskContext.
  634. */
  635. static void
  636. channel_recreate (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  637. {
  638. struct CadetChannelReliability *rel = cls;
  639. rel->retry_task = NULL;
  640. if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
  641. return;
  642. LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! RE-CREATE\n");
  643. GNUNET_STATISTICS_update (stats, "# data retransmitted", 1, GNUNET_NO);
  644. if (rel == rel->ch->root_rel)
  645. {
  646. send_create (rel->ch);
  647. }
  648. else if (rel == rel->ch->dest_rel)
  649. {
  650. send_ack (rel->ch, GNUNET_YES);
  651. }
  652. else
  653. {
  654. GNUNET_break (0);
  655. }
  656. }
  657. /**
  658. * Message has been sent: start retransmission timer.
  659. *
  660. * @param cls Closure (queue structure).
  661. * @param t Tunnel.
  662. * @param q Queue handler (no longer valid).
  663. * @param type Type of message.
  664. * @param size Size of the message.
  665. */
  666. static void
  667. ch_message_sent (void *cls,
  668. struct CadetTunnel *t,
  669. struct CadetTunnelQueue *q,
  670. uint16_t type, size_t size)
  671. {
  672. struct CadetChannelQueue *chq = cls;
  673. struct CadetReliableMessage *copy = chq->copy;
  674. struct CadetChannelReliability *rel;
  675. LOG (GNUNET_ERROR_TYPE_DEBUG, "channel message sent callback %s\n",
  676. GC_m2s (chq->type));
  677. switch (chq->type)
  678. {
  679. case GNUNET_MESSAGE_TYPE_CADET_DATA:
  680. LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! SENT DATA MID %u\n", copy->mid);
  681. GNUNET_assert (chq == copy->chq);
  682. copy->timestamp = GNUNET_TIME_absolute_get ();
  683. rel = copy->rel;
  684. if (NULL == rel->retry_task)
  685. {
  686. LOG (GNUNET_ERROR_TYPE_DEBUG, "!! scheduling retry in 4 * %s\n",
  687. GNUNET_STRINGS_relative_time_to_string (rel->expected_delay,
  688. GNUNET_YES));
  689. if (0 != rel->expected_delay.rel_value_us)
  690. {
  691. LOG (GNUNET_ERROR_TYPE_DEBUG, "!! delay != 0\n");
  692. rel->retry_timer =
  693. GNUNET_TIME_relative_multiply (rel->expected_delay,
  694. CADET_RETRANSMIT_MARGIN);
  695. }
  696. else
  697. {
  698. LOG (GNUNET_ERROR_TYPE_DEBUG, "!! delay reset\n");
  699. rel->retry_timer = CADET_RETRANSMIT_TIME;
  700. }
  701. LOG (GNUNET_ERROR_TYPE_DEBUG, "!! using delay %s\n",
  702. GNUNET_STRINGS_relative_time_to_string (rel->retry_timer,
  703. GNUNET_NO));
  704. rel->retry_task =
  705. GNUNET_SCHEDULER_add_delayed (rel->retry_timer,
  706. &channel_retransmit_message, rel);
  707. }
  708. else
  709. {
  710. LOG (GNUNET_ERROR_TYPE_DEBUG, "!! retry task %u\n", rel->retry_task);
  711. }
  712. copy->chq = NULL;
  713. break;
  714. case GNUNET_MESSAGE_TYPE_CADET_DATA_ACK:
  715. case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE:
  716. case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_ACK:
  717. LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! SENT %s\n", GC_m2s (chq->type));
  718. rel = chq->rel;
  719. GNUNET_assert (rel->uniq == chq);
  720. rel->uniq = NULL;
  721. if (CADET_CHANNEL_READY != rel->ch->state
  722. && GNUNET_MESSAGE_TYPE_CADET_DATA_ACK != type
  723. && GNUNET_NO == rel->ch->destroy)
  724. {
  725. GNUNET_assert (NULL == rel->retry_task);
  726. LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! STD BACKOFF %s\n",
  727. GNUNET_STRINGS_relative_time_to_string (rel->retry_timer,
  728. GNUNET_NO));
  729. rel->retry_timer = GNUNET_TIME_STD_BACKOFF (rel->retry_timer);
  730. rel->retry_task = GNUNET_SCHEDULER_add_delayed (rel->retry_timer,
  731. &channel_recreate, rel);
  732. }
  733. break;
  734. default:
  735. GNUNET_break (0);
  736. }
  737. GNUNET_free (chq);
  738. }
  739. /**
  740. * send a channel create message.
  741. *
  742. * @param ch Channel for which to send.
  743. */
  744. static void
  745. send_create (struct CadetChannel *ch)
  746. {
  747. struct GNUNET_CADET_ChannelCreate msgcc;
  748. msgcc.header.size = htons (sizeof (msgcc));
  749. msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE);
  750. msgcc.chid = htonl (ch->gid);
  751. msgcc.port = htonl (ch->port);
  752. msgcc.opt = htonl (channel_get_options (ch));
  753. GCCH_send_prebuilt_message (&msgcc.header, ch, GNUNET_YES, NULL);
  754. }
  755. /**
  756. * Confirm we got a channel create or FWD ack.
  757. *
  758. * @param ch The channel to confirm.
  759. * @param fwd Should we send a FWD ACK? (going dest->root)
  760. */
  761. static void
  762. send_ack (struct CadetChannel *ch, int fwd)
  763. {
  764. struct GNUNET_CADET_ChannelManage msg;
  765. msg.header.size = htons (sizeof (msg));
  766. msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_ACK);
  767. LOG (GNUNET_ERROR_TYPE_DEBUG, " sending channel %s ack for channel %s\n",
  768. GC_f2s (fwd), GCCH_2s (ch));
  769. msg.chid = htonl (ch->gid);
  770. GCCH_send_prebuilt_message (&msg.header, ch, !fwd, NULL);
  771. }
  772. /**
  773. * Send a message and don't keep any info about it: we won't need to cancel it
  774. * or resend it.
  775. *
  776. * @param msg Header of the message to fire away.
  777. * @param ch Channel on which the message should go.
  778. * @param force Is this a forced (undroppable) message?
  779. */
  780. static void
  781. fire_and_forget (const struct GNUNET_MessageHeader *msg,
  782. struct CadetChannel *ch,
  783. int force)
  784. {
  785. GNUNET_break (NULL == GCT_send_prebuilt_message (msg, ch->t, NULL,
  786. force, NULL, NULL));
  787. }
  788. /**
  789. * Notify that a channel create didn't succeed.
  790. *
  791. * @param ch The channel to reject.
  792. */
  793. static void
  794. send_nack (struct CadetChannel *ch)
  795. {
  796. struct GNUNET_CADET_ChannelManage msg;
  797. msg.header.size = htons (sizeof (msg));
  798. msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_NACK);
  799. LOG (GNUNET_ERROR_TYPE_DEBUG,
  800. " sending channel NACK for channel %s\n",
  801. GCCH_2s (ch));
  802. msg.chid = htonl (ch->gid);
  803. GCCH_send_prebuilt_message (&msg.header, ch, GNUNET_NO, NULL);
  804. }
  805. /**
  806. * Destroy all reliable messages queued for a channel,
  807. * during a channel destruction.
  808. * Frees the reliability structure itself.
  809. *
  810. * @param rel Reliability data for a channel.
  811. */
  812. static void
  813. channel_rel_free_all (struct CadetChannelReliability *rel)
  814. {
  815. struct CadetReliableMessage *copy;
  816. struct CadetReliableMessage *next;
  817. if (NULL == rel)
  818. return;
  819. for (copy = rel->head_recv; NULL != copy; copy = next)
  820. {
  821. next = copy->next;
  822. GNUNET_CONTAINER_DLL_remove (rel->head_recv, rel->tail_recv, copy);
  823. LOG (GNUNET_ERROR_TYPE_DEBUG, " COPYFREE BATCH RECV %p\n", copy);
  824. GNUNET_break (NULL == copy->chq);
  825. GNUNET_free (copy);
  826. }
  827. for (copy = rel->head_sent; NULL != copy; copy = next)
  828. {
  829. next = copy->next;
  830. GNUNET_CONTAINER_DLL_remove (rel->head_sent, rel->tail_sent, copy);
  831. LOG (GNUNET_ERROR_TYPE_DEBUG, " COPYFREE BATCH %p\n", copy);
  832. if (NULL != copy->chq)
  833. {
  834. if (NULL != copy->chq->tq)
  835. {
  836. GCT_cancel (copy->chq->tq);
  837. /* ch_message_sent will free copy->q */
  838. }
  839. else
  840. {
  841. GNUNET_free (copy->chq);
  842. GNUNET_break (0);
  843. }
  844. }
  845. GNUNET_free (copy);
  846. }
  847. if (NULL != rel->uniq && NULL != rel->uniq->tq)
  848. {
  849. GCT_cancel (rel->uniq->tq);
  850. /* ch_message_sent is called freeing uniq */
  851. }
  852. if (NULL != rel->retry_task)
  853. {
  854. GNUNET_SCHEDULER_cancel (rel->retry_task);
  855. rel->retry_task = NULL;
  856. }
  857. GNUNET_free (rel);
  858. }
  859. /**
  860. * Mark future messages as ACK'd.
  861. *
  862. * @param rel Reliability data.
  863. * @param msg DataACK message with a bitfield of future ACK'd messages.
  864. */
  865. static void
  866. channel_rel_free_sent (struct CadetChannelReliability *rel,
  867. const struct GNUNET_CADET_DataACK *msg)
  868. {
  869. struct CadetReliableMessage *copy;
  870. struct CadetReliableMessage *next;
  871. uint64_t bitfield;
  872. uint64_t mask;
  873. uint32_t mid;
  874. uint32_t target;
  875. unsigned int i;
  876. bitfield = msg->futures;
  877. mid = ntohl (msg->mid);
  878. LOG (GNUNET_ERROR_TYPE_DEBUG, "free_sent_reliable %u %llX\n", mid, bitfield);
  879. LOG (GNUNET_ERROR_TYPE_DEBUG, " rel %p, head %p\n", rel, rel->head_sent);
  880. for (i = 0, copy = rel->head_sent;
  881. i < 64 && NULL != copy && 0 != bitfield;
  882. i++)
  883. {
  884. LOG (GNUNET_ERROR_TYPE_DEBUG, " trying bit %u (mid %u)\n", i, mid + i + 1);
  885. mask = 0x1LL << i;
  886. if (0 == (bitfield & mask))
  887. continue;
  888. LOG (GNUNET_ERROR_TYPE_DEBUG, " set!\n");
  889. /* Bit was set, clear the bit from the bitfield */
  890. bitfield &= ~mask;
  891. /* The i-th bit was set. Do we have that copy? */
  892. /* Skip copies with mid < target */
  893. target = mid + i + 1;
  894. LOG (GNUNET_ERROR_TYPE_DEBUG, " target %u\n", target);
  895. while (NULL != copy && GC_is_pid_bigger (target, copy->mid))
  896. copy = copy->next;
  897. /* Did we run out of copies? (previously freed, it's ok) */
  898. if (NULL == copy)
  899. {
  900. LOG (GNUNET_ERROR_TYPE_DEBUG, "run out of copies...\n");
  901. return;
  902. }
  903. /* Did we overshoot the target? (previously freed, it's ok) */
  904. if (GC_is_pid_bigger (copy->mid, target))
  905. {
  906. LOG (GNUNET_ERROR_TYPE_DEBUG, " next copy %u\n", copy->mid);
  907. continue;
  908. }
  909. /* Now copy->mid == target, free it */
  910. next = copy->next;
  911. GNUNET_break (GNUNET_YES != rel_message_free (copy, GNUNET_YES));
  912. copy = next;
  913. }
  914. LOG (GNUNET_ERROR_TYPE_DEBUG, "free_sent_reliable END\n");
  915. }
  916. /**
  917. * Destroy a reliable message after it has been acknowledged, either by
  918. * direct mid ACK or bitfield. Updates the appropriate data structures and
  919. * timers and frees all memory.
  920. *
  921. * @param copy Message that is no longer needed: remote peer got it.
  922. * @param update_time Is the timing information relevant?
  923. * If this message is ACK in a batch the timing information
  924. * is skewed by the retransmission, count only for the
  925. * retransmitted message.
  926. *
  927. * @return #GNUNET_YES if channel was destroyed as a result of the call,
  928. * #GNUNET_NO otherwise.
  929. */
  930. static int
  931. rel_message_free (struct CadetReliableMessage *copy, int update_time)
  932. {
  933. struct CadetChannelReliability *rel;
  934. struct GNUNET_TIME_Relative time;
  935. rel = copy->rel;
  936. LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! Freeing %u\n", copy->mid);
  937. if (update_time)
  938. {
  939. time = GNUNET_TIME_absolute_get_duration (copy->timestamp);
  940. if (0 == rel->expected_delay.rel_value_us)
  941. rel->expected_delay = time;
  942. else
  943. {
  944. rel->expected_delay.rel_value_us *= 7;
  945. rel->expected_delay.rel_value_us += time.rel_value_us;
  946. rel->expected_delay.rel_value_us /= 8;
  947. }
  948. LOG (GNUNET_ERROR_TYPE_INFO, "!!! took %s, new delay %s\n",
  949. GNUNET_STRINGS_relative_time_to_string (time, GNUNET_NO),
  950. GNUNET_STRINGS_relative_time_to_string (rel->expected_delay,
  951. GNUNET_NO));
  952. rel->retry_timer = rel->expected_delay;
  953. }
  954. else
  955. {
  956. LOG (GNUNET_ERROR_TYPE_INFO, "!!! batch free, ignoring timing\n");
  957. }
  958. rel->ch->pending_messages--;
  959. if (NULL != copy->chq)
  960. {
  961. GCT_cancel (copy->chq->tq);
  962. /* copy->q is set to NULL by ch_message_sent */
  963. }
  964. GNUNET_CONTAINER_DLL_remove (rel->head_sent, rel->tail_sent, copy);
  965. LOG (GNUNET_ERROR_TYPE_DEBUG, " COPYFREE %p\n", copy);
  966. GNUNET_free (copy);
  967. if (GNUNET_NO != rel->ch->destroy && 0 == rel->ch->pending_messages)
  968. {
  969. GCCH_destroy (rel->ch);
  970. return GNUNET_YES;
  971. }
  972. return GNUNET_NO;
  973. }
  974. /**
  975. * Channel was ACK'd by remote peer, mark as ready and cancel retransmission.
  976. *
  977. * @param ch Channel to mark as ready.
  978. * @param fwd Was the ACK message a FWD ACK? (dest->root, SYNACK)
  979. */
  980. static void
  981. channel_confirm (struct CadetChannel *ch, int fwd)
  982. {
  983. struct CadetChannelReliability *rel;
  984. enum CadetChannelState oldstate;
  985. rel = fwd ? ch->root_rel : ch->dest_rel;
  986. if (NULL == rel)
  987. {
  988. GNUNET_break (GNUNET_NO != ch->destroy);
  989. return;
  990. }
  991. LOG (GNUNET_ERROR_TYPE_DEBUG, " channel confirm %s %s\n",
  992. GC_f2s (fwd), GCCH_2s (ch));
  993. oldstate = ch->state;
  994. ch->state = CADET_CHANNEL_READY;
  995. if (CADET_CHANNEL_READY != oldstate || GNUNET_YES == is_loopback (ch))
  996. {
  997. rel->client_ready = GNUNET_YES;
  998. rel->expected_delay = rel->retry_timer;
  999. LOG (GNUNET_ERROR_TYPE_DEBUG, " !! retry timer confirm %s\n",
  1000. GNUNET_STRINGS_relative_time_to_string (rel->retry_timer, GNUNET_NO));
  1001. if (GCT_get_connections_buffer (ch->t) > 0 || GCT_is_loopback (ch->t))
  1002. send_client_ack (ch, fwd);
  1003. if (NULL != rel->retry_task)
  1004. {
  1005. GNUNET_SCHEDULER_cancel (rel->retry_task);
  1006. rel->retry_task = NULL;
  1007. }
  1008. else if (NULL != rel->uniq)
  1009. {
  1010. GCT_cancel (rel->uniq->tq);
  1011. /* ch_message_sent will free and NULL uniq */
  1012. }
  1013. else if (GNUNET_NO == is_loopback (ch))
  1014. {
  1015. /* We SHOULD have been trying to retransmit this! */
  1016. GNUNET_break (0);
  1017. }
  1018. }
  1019. /* In case of a FWD ACK (SYNACK) send a BCK ACK (ACK). */
  1020. if (GNUNET_YES == fwd)
  1021. send_ack (ch, GNUNET_NO);
  1022. }
  1023. /**
  1024. * Save a copy to retransmit in case it gets lost.
  1025. *
  1026. * Initializes all needed callbacks and timers.
  1027. *
  1028. * @param ch Channel this message goes on.
  1029. * @param msg Message to copy.
  1030. * @param fwd Is this fwd traffic?
  1031. */
  1032. static struct CadetReliableMessage *
  1033. channel_save_copy (struct CadetChannel *ch,
  1034. const struct GNUNET_MessageHeader *msg,
  1035. int fwd)
  1036. {
  1037. struct CadetChannelReliability *rel;
  1038. struct CadetReliableMessage *copy;
  1039. uint32_t mid;
  1040. uint16_t type;
  1041. uint16_t size;
  1042. rel = fwd ? ch->root_rel : ch->dest_rel;
  1043. mid = rel->mid_send - 1;
  1044. type = ntohs (msg->type);
  1045. size = ntohs (msg->size);
  1046. LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! SAVE %u %s\n", mid, GC_m2s (type));
  1047. copy = GNUNET_malloc (sizeof (struct CadetReliableMessage) + size);
  1048. LOG (GNUNET_ERROR_TYPE_DEBUG, " at %p\n", copy);
  1049. copy->mid = mid;
  1050. copy->rel = rel;
  1051. copy->type = type;
  1052. memcpy (&copy[1], msg, size);
  1053. GNUNET_CONTAINER_DLL_insert_tail (rel->head_sent, rel->tail_sent, copy);
  1054. ch->pending_messages++;
  1055. return copy;
  1056. }
  1057. /**
  1058. * Create a new channel.
  1059. *
  1060. * @param t Tunnel this channel is in.
  1061. * @param owner Client that owns the channel, NULL for foreign channels.
  1062. * @param lid_root Local ID for root client.
  1063. *
  1064. * @return A new initialized channel. NULL on error.
  1065. */
  1066. static struct CadetChannel *
  1067. channel_new (struct CadetTunnel *t,
  1068. struct CadetClient *owner,
  1069. CADET_ChannelNumber lid_root)
  1070. {
  1071. struct CadetChannel *ch;
  1072. ch = GNUNET_new (struct CadetChannel);
  1073. ch->root = owner;
  1074. ch->lid_root = lid_root;
  1075. ch->t = t;
  1076. GNUNET_STATISTICS_update (stats, "# channels", 1, GNUNET_NO);
  1077. if (NULL != owner)
  1078. {
  1079. ch->gid = GCT_get_next_chid (t);
  1080. GML_channel_add (owner, lid_root, ch);
  1081. }
  1082. GCT_add_channel (t, ch);
  1083. return ch;
  1084. }
  1085. /**
  1086. * Handle a loopback message: call the appropriate handler for the message type.
  1087. *
  1088. * @param ch Channel this message is on.
  1089. * @param msgh Message header.
  1090. * @param fwd Is this FWD traffic?
  1091. */
  1092. void
  1093. handle_loopback (struct CadetChannel *ch,
  1094. const struct GNUNET_MessageHeader *msgh,
  1095. int fwd)
  1096. {
  1097. uint16_t type;
  1098. type = ntohs (msgh->type);
  1099. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1100. "Loopback %s %s message!\n",
  1101. GC_f2s (fwd), GC_m2s (type));
  1102. switch (type)
  1103. {
  1104. case GNUNET_MESSAGE_TYPE_CADET_DATA:
  1105. /* Don't send hop ACK, wait for client to ACK */
  1106. LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! SEND loopback %u (%u)\n",
  1107. ntohl (((struct GNUNET_CADET_Data *) msgh)->mid), ntohs (msgh->size));
  1108. GCCH_handle_data (ch, (struct GNUNET_CADET_Data *) msgh, fwd);
  1109. break;
  1110. case GNUNET_MESSAGE_TYPE_CADET_DATA_ACK:
  1111. GCCH_handle_data_ack (ch, (struct GNUNET_CADET_DataACK *) msgh, fwd);
  1112. break;
  1113. case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE:
  1114. GCCH_handle_create (ch->t,
  1115. (struct GNUNET_CADET_ChannelCreate *) msgh);
  1116. break;
  1117. case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_ACK:
  1118. GCCH_handle_ack (ch,
  1119. (struct GNUNET_CADET_ChannelManage *) msgh,
  1120. fwd);
  1121. break;
  1122. case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_NACK:
  1123. GCCH_handle_nack (ch);
  1124. break;
  1125. case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY:
  1126. GCCH_handle_destroy (ch,
  1127. (struct GNUNET_CADET_ChannelManage *) msgh,
  1128. fwd);
  1129. break;
  1130. default:
  1131. GNUNET_break_op (0);
  1132. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1133. "end-to-end message not known (%u)\n",
  1134. ntohs (msgh->type));
  1135. }
  1136. }
  1137. /******************************************************************************/
  1138. /******************************** API ***********************************/
  1139. /******************************************************************************/
  1140. /**
  1141. * Destroy a channel and free all resources.
  1142. *
  1143. * @param ch Channel to destroy.
  1144. */
  1145. void
  1146. GCCH_destroy (struct CadetChannel *ch)
  1147. {
  1148. struct CadetClient *c;
  1149. struct CadetTunnel *t;
  1150. if (NULL == ch)
  1151. return;
  1152. if (2 == ch->destroy)
  1153. return; /* recursive call */
  1154. ch->destroy = 2;
  1155. LOG (GNUNET_ERROR_TYPE_DEBUG, "destroying channel %s:%u\n",
  1156. GCT_2s (ch->t), ch->gid);
  1157. GCCH_debug (ch);
  1158. c = ch->root;
  1159. if (NULL != c)
  1160. {
  1161. GML_channel_remove (c, ch->lid_root, ch);
  1162. }
  1163. c = ch->dest;
  1164. if (NULL != c)
  1165. {
  1166. GML_channel_remove (c, ch->lid_dest, ch);
  1167. }
  1168. channel_rel_free_all (ch->root_rel);
  1169. channel_rel_free_all (ch->dest_rel);
  1170. t = ch->t;
  1171. GCT_remove_channel (t, ch);
  1172. GNUNET_STATISTICS_update (stats, "# channels", -1, GNUNET_NO);
  1173. GNUNET_free (ch);
  1174. GCT_destroy_if_empty (t);
  1175. }
  1176. /**
  1177. * Get the channel's public ID.
  1178. *
  1179. * @param ch Channel.
  1180. *
  1181. * @return ID used to identify the channel with the remote peer.
  1182. */
  1183. CADET_ChannelNumber
  1184. GCCH_get_id (const struct CadetChannel *ch)
  1185. {
  1186. return ch->gid;
  1187. }
  1188. /**
  1189. * Get the channel tunnel.
  1190. *
  1191. * @param ch Channel to get the tunnel from.
  1192. *
  1193. * @return tunnel of the channel.
  1194. */
  1195. struct CadetTunnel *
  1196. GCCH_get_tunnel (const struct CadetChannel *ch)
  1197. {
  1198. return ch->t;
  1199. }
  1200. /**
  1201. * Get free buffer space towards the client on a specific channel.
  1202. *
  1203. * @param ch Channel.
  1204. * @param fwd Is query about FWD traffic?
  1205. *
  1206. * @return Free buffer space [0 - 64]
  1207. */
  1208. unsigned int
  1209. GCCH_get_buffer (struct CadetChannel *ch, int fwd)
  1210. {
  1211. struct CadetChannelReliability *rel;
  1212. rel = fwd ? ch->dest_rel : ch->root_rel;
  1213. LOG (GNUNET_ERROR_TYPE_DEBUG, " get buffer, channel %s\n", GCCH_2s (ch));
  1214. GCCH_debug (ch);
  1215. /* If rel is NULL it means that the end is not yet created,
  1216. * most probably is a loopback channel at the point of sending
  1217. * the ChannelCreate to itself.
  1218. */
  1219. if (NULL == rel)
  1220. {
  1221. LOG (GNUNET_ERROR_TYPE_DEBUG, " rel is NULL: max\n");
  1222. return 64;
  1223. }
  1224. return (64 - rel->n_recv);
  1225. }
  1226. /**
  1227. * Get flow control status of end point: is client allow to send?
  1228. *
  1229. * @param ch Channel.
  1230. * @param fwd Is query about FWD traffic? (Request root status).
  1231. *
  1232. * @return #GNUNET_YES if client is allowed to send us data.
  1233. */
  1234. int
  1235. GCCH_get_allowed (struct CadetChannel *ch, int fwd)
  1236. {
  1237. struct CadetChannelReliability *rel;
  1238. rel = fwd ? ch->root_rel : ch->dest_rel;
  1239. if (NULL == rel)
  1240. {
  1241. /* Probably shutting down: root/dest NULL'ed to mark disconnection */
  1242. GNUNET_break (GNUNET_NO != ch->destroy);
  1243. return 0;
  1244. }
  1245. return rel->client_allowed;
  1246. }
  1247. /**
  1248. * Is the root client for this channel on this peer?
  1249. *
  1250. * @param ch Channel.
  1251. * @param fwd Is this for fwd traffic?
  1252. *
  1253. * @return #GNUNET_YES in case it is.
  1254. */
  1255. int
  1256. GCCH_is_origin (struct CadetChannel *ch, int fwd)
  1257. {
  1258. struct CadetClient *c;
  1259. c = fwd ? ch->root : ch->dest;
  1260. return NULL != c;
  1261. }
  1262. /**
  1263. * Is the destination client for this channel on this peer?
  1264. *
  1265. * @param ch Channel.
  1266. * @param fwd Is this for fwd traffic?
  1267. *
  1268. * @return #GNUNET_YES in case it is.
  1269. */
  1270. int
  1271. GCCH_is_terminal (struct CadetChannel *ch, int fwd)
  1272. {
  1273. struct CadetClient *c;
  1274. c = fwd ? ch->dest : ch->root;
  1275. return NULL != c;
  1276. }
  1277. /**
  1278. * Send an end-to-end ACK message for the most recent in-sequence payload.
  1279. *
  1280. * If channel is not reliable, do nothing.
  1281. *
  1282. * @param ch Channel this is about.
  1283. * @param fwd Is for FWD traffic? (ACK dest->owner)
  1284. */
  1285. void
  1286. GCCH_send_data_ack (struct CadetChannel *ch, int fwd)
  1287. {
  1288. struct GNUNET_CADET_DataACK msg;
  1289. struct CadetChannelReliability *rel;
  1290. struct CadetReliableMessage *copy;
  1291. unsigned int delta;
  1292. uint64_t mask;
  1293. uint32_t ack;
  1294. if (GNUNET_NO == ch->reliable)
  1295. return;
  1296. rel = fwd ? ch->dest_rel : ch->root_rel;
  1297. ack = rel->mid_recv - 1;
  1298. msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_DATA_ACK);
  1299. msg.header.size = htons (sizeof (msg));
  1300. msg.chid = htonl (ch->gid);
  1301. msg.mid = htonl (ack);
  1302. msg.futures = 0LL;
  1303. for (copy = rel->head_recv; NULL != copy; copy = copy->next)
  1304. {
  1305. if (copy->type != GNUNET_MESSAGE_TYPE_CADET_DATA)
  1306. {
  1307. LOG (GNUNET_ERROR_TYPE_DEBUG, " Type %s, expected DATA\n",
  1308. GC_m2s (copy->type));
  1309. continue;
  1310. }
  1311. GNUNET_assert (GC_is_pid_bigger(copy->mid, ack));
  1312. delta = copy->mid - (ack + 1);
  1313. if (63 < delta)
  1314. break;
  1315. mask = 0x1LL << delta;
  1316. msg.futures |= mask;
  1317. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1318. " setting bit for %u (delta %u) (%llX) -> %llX\n",
  1319. copy->mid, delta, mask, msg.futures);
  1320. }
  1321. LOG (GNUNET_ERROR_TYPE_INFO, "===> DATA_ACK for %u + %llX\n",
  1322. ack, msg.futures);
  1323. GCCH_send_prebuilt_message (&msg.header, ch, !fwd, NULL);
  1324. LOG (GNUNET_ERROR_TYPE_DEBUG, "send_data_ack END\n");
  1325. }
  1326. /**
  1327. * Allow a client to send us more data, in case it was choked.
  1328. *
  1329. * @param ch Channel.
  1330. * @param fwd Is this about FWD traffic? (Root client).
  1331. */
  1332. void
  1333. GCCH_allow_client (struct CadetChannel *ch, int fwd)
  1334. {
  1335. struct CadetChannelReliability *rel;
  1336. unsigned int buffer;
  1337. LOG (GNUNET_ERROR_TYPE_DEBUG, "GMCH allow\n");
  1338. if (CADET_CHANNEL_READY != ch->state)
  1339. {
  1340. LOG (GNUNET_ERROR_TYPE_DEBUG, " channel not ready yet!\n");
  1341. return;
  1342. }
  1343. if (GNUNET_YES == ch->reliable)
  1344. {
  1345. rel = fwd ? ch->root_rel : ch->dest_rel;
  1346. if (NULL == rel)
  1347. {
  1348. GNUNET_break (GNUNET_NO != ch->destroy);
  1349. return;
  1350. }
  1351. if (NULL != rel->head_sent)
  1352. {
  1353. if (64 <= rel->mid_send - rel->head_sent->mid)
  1354. {
  1355. LOG (GNUNET_ERROR_TYPE_DEBUG, " too big MID gap! Wait for ACK.\n");
  1356. return;
  1357. }
  1358. else
  1359. {
  1360. LOG (GNUNET_ERROR_TYPE_DEBUG, " gap ok: %u - %u\n",
  1361. rel->head_sent->mid, rel->mid_send);
  1362. struct CadetReliableMessage *aux;
  1363. for (aux = rel->head_sent; NULL != aux; aux = aux->next)
  1364. {
  1365. LOG (GNUNET_ERROR_TYPE_DEBUG, " - sent MID %u\n", aux->mid);
  1366. }
  1367. }
  1368. }
  1369. else
  1370. {
  1371. LOG (GNUNET_ERROR_TYPE_DEBUG, " head sent is NULL\n");
  1372. }
  1373. }
  1374. if (is_loopback (ch))
  1375. buffer = GCCH_get_buffer (ch, fwd);
  1376. else
  1377. buffer = GCT_get_connections_buffer (ch->t);
  1378. if (0 == buffer)
  1379. {
  1380. LOG (GNUNET_ERROR_TYPE_DEBUG, " no buffer space.\n");
  1381. return;
  1382. }
  1383. LOG (GNUNET_ERROR_TYPE_DEBUG, " buffer space %u, allowing\n", buffer);
  1384. send_client_ack (ch, fwd);
  1385. }
  1386. /**
  1387. * Log channel info.
  1388. *
  1389. * @param ch Channel.
  1390. */
  1391. void
  1392. GCCH_debug (struct CadetChannel *ch)
  1393. {
  1394. if (NULL == ch)
  1395. {
  1396. LOG (GNUNET_ERROR_TYPE_DEBUG, "*** DEBUG NULL CHANNEL ***\n");
  1397. return;
  1398. }
  1399. LOG (GNUNET_ERROR_TYPE_DEBUG, "Channel %s:%X (%p)\n",
  1400. GCT_2s (ch->t), ch->gid, ch);
  1401. LOG (GNUNET_ERROR_TYPE_DEBUG, " root %p/%p\n",
  1402. ch->root, ch->root_rel);
  1403. if (NULL != ch->root)
  1404. {
  1405. LOG (GNUNET_ERROR_TYPE_DEBUG, " cli %s\n", GML_2s (ch->root));
  1406. LOG (GNUNET_ERROR_TYPE_DEBUG, " ready %s\n",
  1407. ch->root_rel->client_ready ? "YES" : "NO");
  1408. LOG (GNUNET_ERROR_TYPE_DEBUG, " id %X\n", ch->lid_root);
  1409. }
  1410. LOG (GNUNET_ERROR_TYPE_DEBUG, " dest %p/%p\n",
  1411. ch->dest, ch->dest_rel);
  1412. if (NULL != ch->dest)
  1413. {
  1414. LOG (GNUNET_ERROR_TYPE_DEBUG, " cli %s\n", GML_2s (ch->dest));
  1415. LOG (GNUNET_ERROR_TYPE_DEBUG, " ready %s\n",
  1416. ch->dest_rel->client_ready ? "YES" : "NO");
  1417. LOG (GNUNET_ERROR_TYPE_DEBUG, " id %X\n", ch->lid_dest);
  1418. }
  1419. }
  1420. /**
  1421. * Handle an ACK given by a client.
  1422. *
  1423. * Mark client as ready and send him any buffered data we could have for him.
  1424. *
  1425. * @param ch Channel.
  1426. * @param fwd Is this a "FWD ACK"? (FWD ACKs are sent by dest and go BCK)
  1427. */
  1428. void
  1429. GCCH_handle_local_ack (struct CadetChannel *ch, int fwd)
  1430. {
  1431. struct CadetChannelReliability *rel;
  1432. struct CadetClient *c;
  1433. rel = fwd ? ch->dest_rel : ch->root_rel;
  1434. c = fwd ? ch->dest : ch->root;
  1435. rel->client_ready = GNUNET_YES;
  1436. send_client_buffered_data (ch, c, fwd);
  1437. if (GNUNET_YES == ch->destroy && 0 == rel->n_recv)
  1438. {
  1439. send_destroy (ch, GNUNET_YES);
  1440. GCCH_destroy (ch);
  1441. return;
  1442. }
  1443. /* if loopback is marked for destruction, no need to ACK to the other peer,
  1444. * it requested the destruction and is already gone, therefore, else if.
  1445. */
  1446. else if (is_loopback (ch))
  1447. {
  1448. unsigned int buffer;
  1449. buffer = GCCH_get_buffer (ch, fwd);
  1450. if (0 < buffer)
  1451. GCCH_allow_client (ch, fwd);
  1452. return;
  1453. }
  1454. GCT_send_connection_acks (ch->t);
  1455. }
  1456. /**
  1457. * Handle data given by a client.
  1458. *
  1459. * Check whether the client is allowed to send in this tunnel, save if channel
  1460. * is reliable and send an ACK to the client if there is still buffer space
  1461. * in the tunnel.
  1462. *
  1463. * @param ch Channel.
  1464. * @param c Client which sent the data.
  1465. * @param fwd Is this a FWD data?
  1466. * @param message Data message.
  1467. * @param size Size of data.
  1468. *
  1469. * @return GNUNET_OK if everything goes well, GNUNET_SYSERR in case of en error.
  1470. */
  1471. int
  1472. GCCH_handle_local_data (struct CadetChannel *ch,
  1473. struct CadetClient *c, int fwd,
  1474. const struct GNUNET_MessageHeader *message,
  1475. size_t size)
  1476. {
  1477. struct CadetChannelReliability *rel;
  1478. struct GNUNET_CADET_Data *payload;
  1479. uint16_t p2p_size = sizeof(struct GNUNET_CADET_Data) + size;
  1480. unsigned char cbuf[p2p_size];
  1481. /* Is the client in the channel? */
  1482. if ( !( (fwd &&
  1483. ch->root == c)
  1484. ||
  1485. (!fwd &&
  1486. ch->dest == c) ) )
  1487. {
  1488. GNUNET_break_op (0);
  1489. return GNUNET_SYSERR;
  1490. }
  1491. rel = fwd ? ch->root_rel : ch->dest_rel;
  1492. if (GNUNET_NO == rel->client_allowed)
  1493. {
  1494. GNUNET_break_op (0);
  1495. return GNUNET_SYSERR;
  1496. }
  1497. rel->client_allowed = GNUNET_NO;
  1498. /* Ok, everything is correct, send the message. */
  1499. payload = (struct GNUNET_CADET_Data *) cbuf;
  1500. payload->mid = htonl (rel->mid_send);
  1501. rel->mid_send++;
  1502. memcpy (&payload[1], message, size);
  1503. payload->header.size = htons (p2p_size);
  1504. payload->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_DATA);
  1505. payload->chid = htonl (ch->gid);
  1506. LOG (GNUNET_ERROR_TYPE_DEBUG, " sending on channel...\n");
  1507. GCCH_send_prebuilt_message (&payload->header, ch, fwd, NULL);
  1508. if (is_loopback (ch))
  1509. {
  1510. if (GCCH_get_buffer (ch, fwd) > 0)
  1511. GCCH_allow_client (ch, fwd);
  1512. return GNUNET_OK;
  1513. }
  1514. if (GCT_get_connections_buffer (ch->t) > 0)
  1515. {
  1516. GCCH_allow_client (ch, fwd);
  1517. }
  1518. return GNUNET_OK;
  1519. }
  1520. /**
  1521. * Handle a channel destroy requested by a client.
  1522. *
  1523. * TODO: add "reason" field
  1524. *
  1525. * Destroy the channel and the tunnel in case this was the last channel.
  1526. *
  1527. * @param ch Channel.
  1528. * @param c Client that requested the destruction (to avoid notifying him).
  1529. * @param is_root Is the request coming from root?
  1530. */
  1531. void
  1532. GCCH_handle_local_destroy (struct CadetChannel *ch,
  1533. struct CadetClient *c,
  1534. int is_root)
  1535. {
  1536. ch->destroy = GNUNET_YES;
  1537. /* Cleanup after the tunnel */
  1538. if (GNUNET_NO == is_root && c == ch->dest)
  1539. {
  1540. LOG (GNUNET_ERROR_TYPE_DEBUG, " Client %s is destination.\n", GML_2s (c));
  1541. GML_client_delete_channel (c, ch, ch->lid_dest);
  1542. ch->dest = NULL;
  1543. }
  1544. if (GNUNET_YES == is_root && c == ch->root)
  1545. {
  1546. LOG (GNUNET_ERROR_TYPE_DEBUG, " Client %s is owner.\n", GML_2s (c));
  1547. GML_client_delete_channel (c, ch, ch->lid_root);
  1548. ch->root = NULL;
  1549. }
  1550. send_destroy (ch, GNUNET_NO);
  1551. if (0 == ch->pending_messages)
  1552. GCCH_destroy (ch);
  1553. }
  1554. /**
  1555. * Handle a channel create requested by a client.
  1556. *
  1557. * Create the channel and the tunnel in case this was the first0 channel.
  1558. *
  1559. * @param c Client that requested the creation (will be the root).
  1560. * @param msg Create Channel message.
  1561. *
  1562. * @return GNUNET_OK if everything went fine, GNUNET_SYSERR otherwise.
  1563. */
  1564. int
  1565. GCCH_handle_local_create (struct CadetClient *c,
  1566. struct GNUNET_CADET_ChannelMessage *msg)
  1567. {
  1568. struct CadetChannel *ch;
  1569. struct CadetTunnel *t;
  1570. struct CadetPeer *peer;
  1571. CADET_ChannelNumber chid;
  1572. LOG (GNUNET_ERROR_TYPE_DEBUG, " towards %s:%u\n",
  1573. GNUNET_i2s (&msg->peer), ntohl (msg->port));
  1574. chid = ntohl (msg->channel_id);
  1575. /* Sanity check for duplicate channel IDs */
  1576. if (NULL != GML_channel_get (c, chid))
  1577. {
  1578. GNUNET_break (0);
  1579. return GNUNET_SYSERR;
  1580. }
  1581. peer = GCP_get (&msg->peer);
  1582. GCP_add_tunnel (peer);
  1583. t = GCP_get_tunnel (peer);
  1584. if (GCP_get_short_id (peer) == myid)
  1585. {
  1586. GCT_change_cstate (t, CADET_TUNNEL_READY);
  1587. }
  1588. else
  1589. {
  1590. /* FIXME change to a tunnel API, eliminate ch <-> peer connection */
  1591. GCP_connect (peer);
  1592. }
  1593. /* Create channel */
  1594. ch = channel_new (t, c, chid);
  1595. if (NULL == ch)
  1596. {
  1597. GNUNET_break (0);
  1598. return GNUNET_SYSERR;
  1599. }
  1600. ch->port = ntohl (msg->port);
  1601. channel_set_options (ch, ntohl (msg->opt));
  1602. /* In unreliable channels, we'll use the DLL to buffer BCK data */
  1603. ch->root_rel = GNUNET_new (struct CadetChannelReliability);
  1604. ch->root_rel->ch = ch;
  1605. ch->root_rel->retry_timer = CADET_RETRANSMIT_TIME;
  1606. ch->root_rel->expected_delay.rel_value_us = 0;
  1607. LOG (GNUNET_ERROR_TYPE_DEBUG, "CREATED CHANNEL %s\n", GCCH_2s (ch));
  1608. send_create (ch);
  1609. return GNUNET_OK;
  1610. }
  1611. /**
  1612. * Handler for cadet network payload traffic.
  1613. *
  1614. * @param ch Channel for the message.
  1615. * @param msg Unencryted data message.
  1616. * @param fwd Is this message fwd? This only is meaningful in loopback channels.
  1617. * #GNUNET_YES if message is FWD on the respective channel (loopback)
  1618. * #GNUNET_NO if message is BCK on the respective channel (loopback)
  1619. * #GNUNET_SYSERR if message on a one-ended channel (remote)
  1620. */
  1621. void
  1622. GCCH_handle_data (struct CadetChannel *ch,
  1623. const struct GNUNET_CADET_Data *msg,
  1624. int fwd)
  1625. {
  1626. struct CadetChannelReliability *rel;
  1627. struct CadetClient *c;
  1628. uint32_t mid;
  1629. /* If this is a remote (non-loopback) channel, find 'fwd'. */
  1630. if (GNUNET_SYSERR == fwd)
  1631. {
  1632. if (is_loopback (ch))
  1633. {
  1634. /* It is a loopback channel after all... */
  1635. GNUNET_break (0);
  1636. return;
  1637. }
  1638. fwd = (NULL != ch->dest) ? GNUNET_YES : GNUNET_NO;
  1639. }
  1640. /* Initialize FWD/BCK data */
  1641. c = fwd ? ch->dest : ch->root;
  1642. rel = fwd ? ch->dest_rel : ch->root_rel;
  1643. if (NULL == c)
  1644. {
  1645. GNUNET_break (GNUNET_NO != ch->destroy);
  1646. return;
  1647. }
  1648. if (CADET_CHANNEL_READY != ch->state)
  1649. {
  1650. if (GNUNET_NO == fwd)
  1651. {
  1652. /* If we are the root, this means the other peer has sent traffic before
  1653. * receiving our ACK. Even if the SYNACK goes missing, no traffic should
  1654. * be sent before the ACK.
  1655. */
  1656. GNUNET_break_op (0);
  1657. return;
  1658. }
  1659. /* If we are the dest, this means that the SYNACK got to the root but
  1660. * the ACK went missing. Treat this as an ACK.
  1661. */
  1662. channel_confirm (ch, GNUNET_NO);
  1663. }
  1664. GNUNET_STATISTICS_update (stats, "# data received", 1, GNUNET_NO);
  1665. mid = ntohl (msg->mid);
  1666. LOG (GNUNET_ERROR_TYPE_INFO, "<=== DATA %u %s on channel %s\n",
  1667. mid, GC_f2s (fwd), GCCH_2s (ch));
  1668. if (GNUNET_NO == ch->reliable ||
  1669. ( !GC_is_pid_bigger (rel->mid_recv, mid) &&
  1670. GC_is_pid_bigger (rel->mid_recv + 64, mid) ) )
  1671. {
  1672. LOG (GNUNET_ERROR_TYPE_DEBUG, "RECV %u (%u)\n",
  1673. mid, ntohs (msg->header.size));
  1674. if (GNUNET_YES == ch->reliable)
  1675. {
  1676. /* Is this the exact next expected messasge? */
  1677. if (mid == rel->mid_recv)
  1678. {
  1679. LOG (GNUNET_ERROR_TYPE_DEBUG, "as expected, sending to client\n");
  1680. send_client_data (ch, msg, fwd);
  1681. }
  1682. else
  1683. {
  1684. LOG (GNUNET_ERROR_TYPE_DEBUG, "save for later\n");
  1685. add_buffered_data (msg, rel);
  1686. }
  1687. }
  1688. else
  1689. {
  1690. /* Tunnel is unreliable: send to clients directly */
  1691. /* FIXME: accept Out Of Order traffic */
  1692. rel->mid_recv = mid + 1;
  1693. send_client_data (ch, msg, fwd);
  1694. }
  1695. }
  1696. else
  1697. {
  1698. if (GC_is_pid_bigger (rel->mid_recv, mid))
  1699. {
  1700. GNUNET_break_op (0);
  1701. LOG (GNUNET_ERROR_TYPE_WARNING,
  1702. "MID %u on channel %s not expected (window: %u - %u). Dropping!\n",
  1703. mid, GCCH_2s (ch), rel->mid_recv, rel->mid_recv + 63);
  1704. }
  1705. else
  1706. {
  1707. LOG (GNUNET_ERROR_TYPE_WARNING,
  1708. "Duplicate MID %u, channel %s (expecting MID %u). Re-sending ACK!\n",
  1709. mid, GCCH_2s (ch), rel->mid_recv);
  1710. if (NULL != rel->uniq)
  1711. {
  1712. LOG (GNUNET_ERROR_TYPE_WARNING,
  1713. "We are trying to send an ACK, but don't seem have the "
  1714. "bandwidth. Try to increase your ats QUOTA in you config file\n");
  1715. }
  1716. }
  1717. }
  1718. GCCH_send_data_ack (ch, fwd);
  1719. }
  1720. /**
  1721. * Handler for cadet network traffic end-to-end ACKs.
  1722. *
  1723. * @param ch Channel on which we got this message.
  1724. * @param msg Data message.
  1725. * @param fwd Is this message fwd? This only is meaningful in loopback channels.
  1726. * #GNUNET_YES if message is FWD on the respective channel (loopback)
  1727. * #GNUNET_NO if message is BCK on the respective channel (loopback)
  1728. * #GNUNET_SYSERR if message on a one-ended channel (remote)
  1729. */
  1730. void
  1731. GCCH_handle_data_ack (struct CadetChannel *ch,
  1732. const struct GNUNET_CADET_DataACK *msg,
  1733. int fwd)
  1734. {
  1735. struct CadetChannelReliability *rel;
  1736. struct CadetReliableMessage *copy;
  1737. struct CadetReliableMessage *next;
  1738. uint32_t ack;
  1739. int work;
  1740. /* If this is a remote (non-loopback) channel, find 'fwd'. */
  1741. if (GNUNET_SYSERR == fwd)
  1742. {
  1743. if (is_loopback (ch))
  1744. {
  1745. /* It is a loopback channel after all... */
  1746. GNUNET_break (0);
  1747. return;
  1748. }
  1749. /* Inverted: if message came 'FWD' is a 'BCK ACK'. */
  1750. fwd = (NULL != ch->dest) ? GNUNET_NO : GNUNET_YES;
  1751. }
  1752. ack = ntohl (msg->mid);
  1753. LOG (GNUNET_ERROR_TYPE_INFO, "<=== %s ACK %u + %llX\n",
  1754. GC_f2s (fwd), ack, msg->futures);
  1755. if (GNUNET_YES == fwd)
  1756. {
  1757. rel = ch->root_rel;
  1758. }
  1759. else
  1760. {
  1761. rel = ch->dest_rel;
  1762. }
  1763. if (NULL == rel)
  1764. {
  1765. GNUNET_break_op (GNUNET_NO != ch->destroy);
  1766. return;
  1767. }
  1768. /* Free ACK'd copies: no need to retransmit those anymore FIXME refactor */
  1769. for (work = GNUNET_NO, copy = rel->head_sent; copy != NULL; copy = next)
  1770. {
  1771. if (GC_is_pid_bigger (copy->mid, ack))
  1772. {
  1773. LOG (GNUNET_ERROR_TYPE_DEBUG, " head %u, out!\n", copy->mid);
  1774. channel_rel_free_sent (rel, msg);
  1775. break;
  1776. }
  1777. work = GNUNET_YES;
  1778. LOG (GNUNET_ERROR_TYPE_DEBUG, " id %u\n", copy->mid);
  1779. next = copy->next;
  1780. if (GNUNET_YES == rel_message_free (copy, GNUNET_YES))
  1781. return;
  1782. }
  1783. /* ACK client if needed and possible */
  1784. GCCH_allow_client (ch, fwd);
  1785. /* If some message was free'd, update the retransmission delay */
  1786. if (GNUNET_YES == work)
  1787. {
  1788. if (NULL != rel->retry_task)
  1789. {
  1790. GNUNET_SCHEDULER_cancel (rel->retry_task);
  1791. rel->retry_task = NULL;
  1792. if (NULL != rel->head_sent && NULL == rel->head_sent->chq)
  1793. {
  1794. struct GNUNET_TIME_Absolute new_target;
  1795. struct GNUNET_TIME_Relative delay;
  1796. delay = GNUNET_TIME_relative_multiply (rel->retry_timer,
  1797. CADET_RETRANSMIT_MARGIN);
  1798. new_target = GNUNET_TIME_absolute_add (rel->head_sent->timestamp,
  1799. delay);
  1800. delay = GNUNET_TIME_absolute_get_remaining (new_target);
  1801. rel->retry_task =
  1802. GNUNET_SCHEDULER_add_delayed (delay,
  1803. &channel_retransmit_message,
  1804. rel);
  1805. }
  1806. }
  1807. else
  1808. {
  1809. /* Work was done but no task was pending? Shouldn't happen! */
  1810. GNUNET_break (0);
  1811. }
  1812. }
  1813. }
  1814. /**
  1815. * Handler for channel create messages.
  1816. *
  1817. * Does not have fwd parameter because it's always 'FWD': channel is incoming.
  1818. *
  1819. * @param t Tunnel this channel will be in.
  1820. * @param msg Channel crate message.
  1821. */
  1822. struct CadetChannel *
  1823. GCCH_handle_create (struct CadetTunnel *t,
  1824. const struct GNUNET_CADET_ChannelCreate *msg)
  1825. {
  1826. CADET_ChannelNumber chid;
  1827. struct CadetChannel *ch;
  1828. struct CadetClient *c;
  1829. int new_channel;
  1830. chid = ntohl (msg->chid);
  1831. ch = GCT_get_channel (t, chid);
  1832. if (NULL == ch)
  1833. {
  1834. /* Create channel */
  1835. ch = channel_new (t, NULL, 0);
  1836. ch->gid = chid;
  1837. channel_set_options (ch, ntohl (msg->opt));
  1838. new_channel = GNUNET_YES;
  1839. }
  1840. else
  1841. {
  1842. new_channel = GNUNET_NO;
  1843. }
  1844. if (GNUNET_YES == new_channel || GCT_is_loopback (t))
  1845. {
  1846. /* Find a destination client */
  1847. ch->port = ntohl (msg->port);
  1848. LOG (GNUNET_ERROR_TYPE_DEBUG, " port %u\n", ch->port);
  1849. c = GML_client_get_by_port (ch->port);
  1850. if (NULL == c)
  1851. {
  1852. LOG (GNUNET_ERROR_TYPE_DEBUG, " no client has port registered\n");
  1853. if (is_loopback (ch))
  1854. {
  1855. LOG (GNUNET_ERROR_TYPE_DEBUG, " loopback: destroy on handler\n");
  1856. send_nack (ch);
  1857. }
  1858. else
  1859. {
  1860. LOG (GNUNET_ERROR_TYPE_DEBUG, " not loopback: destroy now\n");
  1861. send_nack (ch);
  1862. GCCH_destroy (ch);
  1863. }
  1864. return NULL;
  1865. }
  1866. else
  1867. {
  1868. LOG (GNUNET_ERROR_TYPE_DEBUG, " client %p has port registered\n", c);
  1869. }
  1870. add_destination (ch, c);
  1871. if (GNUNET_YES == ch->reliable)
  1872. LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! Reliable\n");
  1873. else
  1874. LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! Not Reliable\n");
  1875. send_client_create (ch);
  1876. ch->state = CADET_CHANNEL_SENT;
  1877. }
  1878. else
  1879. {
  1880. LOG (GNUNET_ERROR_TYPE_DEBUG, " duplicate create channel\n");
  1881. if (NULL != ch->dest_rel->retry_task)
  1882. {
  1883. LOG (GNUNET_ERROR_TYPE_DEBUG, " clearing retry task\n");
  1884. /* we were waiting to re-send our 'SYNACK', wait no more! */
  1885. GNUNET_SCHEDULER_cancel (ch->dest_rel->retry_task);
  1886. ch->dest_rel->retry_task = NULL;
  1887. }
  1888. else if (NULL != ch->dest_rel->uniq)
  1889. {
  1890. /* we are waiting to for our 'SYNACK' to leave the queue, all done! */
  1891. return ch;
  1892. }
  1893. }
  1894. send_ack (ch, GNUNET_YES);
  1895. return ch;
  1896. }
  1897. /**
  1898. * Handler for channel NACK messages.
  1899. *
  1900. * NACK messages always go dest -> root, no need for 'fwd' or 'msg' parameter.
  1901. *
  1902. * @param ch Channel.
  1903. */
  1904. void
  1905. GCCH_handle_nack (struct CadetChannel *ch)
  1906. {
  1907. send_client_nack (ch);
  1908. GCCH_destroy (ch);
  1909. }
  1910. /**
  1911. * Handler for channel ack messages.
  1912. *
  1913. * @param ch Channel.
  1914. * @param msg Message.
  1915. * @param fwd Is this message fwd? This only is meaningful in loopback channels.
  1916. * #GNUNET_YES if message is FWD on the respective channel (loopback)
  1917. * #GNUNET_NO if message is BCK on the respective channel (loopback)
  1918. * #GNUNET_SYSERR if message on a one-ended channel (remote)
  1919. */
  1920. void
  1921. GCCH_handle_ack (struct CadetChannel *ch,
  1922. const struct GNUNET_CADET_ChannelManage *msg,
  1923. int fwd)
  1924. {
  1925. /* If this is a remote (non-loopback) channel, find 'fwd'. */
  1926. if (GNUNET_SYSERR == fwd)
  1927. {
  1928. if (is_loopback (ch))
  1929. {
  1930. /* It is a loopback channel after all... */
  1931. GNUNET_break (0);
  1932. return;
  1933. }
  1934. fwd = (NULL != ch->dest) ? GNUNET_YES : GNUNET_NO;
  1935. }
  1936. channel_confirm (ch, !fwd);
  1937. }
  1938. /**
  1939. * Handler for channel destroy messages.
  1940. *
  1941. * @param ch Channel to be destroyed of.
  1942. * @param msg Message.
  1943. * @param fwd Is this message fwd? This only is meaningful in loopback channels.
  1944. * #GNUNET_YES if message is FWD on the respective channel (loopback)
  1945. * #GNUNET_NO if message is BCK on the respective channel (loopback)
  1946. * #GNUNET_SYSERR if message on a one-ended channel (remote)
  1947. */
  1948. void
  1949. GCCH_handle_destroy (struct CadetChannel *ch,
  1950. const struct GNUNET_CADET_ChannelManage *msg,
  1951. int fwd)
  1952. {
  1953. struct CadetChannelReliability *rel;
  1954. /* If this is a remote (non-loopback) channel, find 'fwd'. */
  1955. if (GNUNET_SYSERR == fwd)
  1956. {
  1957. if (is_loopback (ch))
  1958. {
  1959. /* It is a loopback channel after all... */
  1960. GNUNET_break (0);
  1961. return;
  1962. }
  1963. fwd = (NULL != ch->dest) ? GNUNET_YES : GNUNET_NO;
  1964. }
  1965. GCCH_debug (ch);
  1966. if ( (fwd && NULL == ch->dest) || (!fwd && NULL == ch->root) )
  1967. {
  1968. /* Not for us (don't destroy twice a half-open loopback channel) */
  1969. return;
  1970. }
  1971. rel = fwd ? ch->dest_rel : ch->root_rel;
  1972. if (0 == rel->n_recv)
  1973. {
  1974. send_destroy (ch, GNUNET_YES);
  1975. GCCH_destroy (ch);
  1976. }
  1977. else
  1978. {
  1979. ch->destroy = GNUNET_YES;
  1980. }
  1981. }
  1982. /**
  1983. * Sends an already built message on a channel.
  1984. *
  1985. * If the channel is on a loopback tunnel, notifies the appropriate destination
  1986. * client locally.
  1987. *
  1988. * On a normal channel passes the message to the tunnel for encryption and
  1989. * sending on a connection.
  1990. *
  1991. * This function DOES NOT save the message for retransmission.
  1992. *
  1993. * @param message Message to send. Function makes a copy of it.
  1994. * @param ch Channel on which this message is transmitted.
  1995. * @param fwd Is this a fwd message?
  1996. * @param existing_copy This is a retransmission, don't save a copy.
  1997. */
  1998. void
  1999. GCCH_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
  2000. struct CadetChannel *ch, int fwd,
  2001. void *existing_copy)
  2002. {
  2003. struct CadetChannelQueue *chq;
  2004. uint16_t type;
  2005. type = ntohs (message->type);
  2006. LOG (GNUNET_ERROR_TYPE_INFO, "===> %s %s on channel %s\n",
  2007. GC_m2s (type), GC_f2s (fwd), GCCH_2s (ch));
  2008. if (GCT_is_loopback (ch->t))
  2009. {
  2010. handle_loopback (ch, message, fwd);
  2011. return;
  2012. }
  2013. switch (type)
  2014. {
  2015. struct GNUNET_CADET_Data *payload;
  2016. case GNUNET_MESSAGE_TYPE_CADET_DATA:
  2017. payload = (struct GNUNET_CADET_Data *) message;
  2018. LOG (GNUNET_ERROR_TYPE_INFO, "===> %s %u\n",
  2019. GC_m2s (type), ntohl (payload->mid));
  2020. if (GNUNET_YES == ch->reliable)
  2021. {
  2022. chq = GNUNET_new (struct CadetChannelQueue);
  2023. chq->type = type;
  2024. if (NULL == existing_copy)
  2025. chq->copy = channel_save_copy (ch, message, fwd);
  2026. else
  2027. {
  2028. chq->copy = (struct CadetReliableMessage *) existing_copy;
  2029. if (NULL != chq->copy->chq)
  2030. {
  2031. /* Last retransmission was queued but not yet sent!
  2032. * This retransmission was scheduled by a ch_message_sent which
  2033. * followed a very fast RTT, so the tiny delay made the
  2034. * retransmission function to execute before the previous
  2035. * retransmitted message even had a chance to leave the peer.
  2036. * Cancel this message and wait until the pending
  2037. * retransmission leaves the peer and ch_message_sent starts
  2038. * the timer for the next one.
  2039. */
  2040. GNUNET_free (chq);
  2041. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2042. " exisitng copy not yet transmitted!\n");
  2043. return;
  2044. }
  2045. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2046. " using existing copy: %p {r:%p q:%p t:%u}\n",
  2047. existing_copy,
  2048. chq->copy->rel, chq->copy->chq, chq->copy->type);
  2049. }
  2050. LOG (GNUNET_ERROR_TYPE_DEBUG, " new chq: %p\n", chq);
  2051. chq->copy->chq = chq;
  2052. chq->tq = GCT_send_prebuilt_message (message, ch->t, NULL,
  2053. GNUNET_YES,
  2054. &ch_message_sent, chq);
  2055. /* q itself is stored in copy */
  2056. GNUNET_assert (NULL != chq->tq || GNUNET_NO != ch->destroy);
  2057. }
  2058. else
  2059. {
  2060. fire_and_forget (message, ch, GNUNET_NO);
  2061. }
  2062. break;
  2063. case GNUNET_MESSAGE_TYPE_CADET_DATA_ACK:
  2064. case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE:
  2065. case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_ACK:
  2066. chq = GNUNET_new (struct CadetChannelQueue);
  2067. chq->type = type;
  2068. chq->rel = fwd ? ch->root_rel : ch->dest_rel;
  2069. if (NULL != chq->rel->uniq)
  2070. {
  2071. if (NULL != chq->rel->uniq->tq)
  2072. {
  2073. GCT_cancel (chq->rel->uniq->tq);
  2074. /* ch_message_sent is called, freeing and NULLing uniq */
  2075. GNUNET_break (NULL == chq->rel->uniq);
  2076. }
  2077. else
  2078. {
  2079. GNUNET_break (0);
  2080. GNUNET_free (chq->rel->uniq);
  2081. }
  2082. }
  2083. chq->tq = GCT_send_prebuilt_message (message, ch->t, NULL, GNUNET_YES,
  2084. &ch_message_sent, chq);
  2085. if (NULL == chq->tq)
  2086. {
  2087. GNUNET_break (0);
  2088. GCT_debug (ch->t, GNUNET_ERROR_TYPE_ERROR);
  2089. GNUNET_free (chq);
  2090. chq = NULL;
  2091. return;
  2092. }
  2093. chq->rel->uniq = chq;
  2094. break;
  2095. case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY:
  2096. case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_NACK:
  2097. fire_and_forget (message, ch, GNUNET_YES);
  2098. break;
  2099. default:
  2100. GNUNET_break (0);
  2101. LOG (GNUNET_ERROR_TYPE_DEBUG, "type %s unknown!\n", GC_m2s (type));
  2102. fire_and_forget (message, ch, GNUNET_YES);
  2103. }
  2104. }
  2105. /**
  2106. * Get the static string for identification of the channel.
  2107. *
  2108. * @param ch Channel.
  2109. *
  2110. * @return Static string with the channel IDs.
  2111. */
  2112. const char *
  2113. GCCH_2s (const struct CadetChannel *ch)
  2114. {
  2115. static char buf[64];
  2116. if (NULL == ch)
  2117. return "(NULL Channel)";
  2118. SPRINTF (buf, "%s:%u gid:%X (%X / %X)",
  2119. GCT_2s (ch->t), ch->port, ch->gid, ch->lid_root, ch->lid_dest);
  2120. return buf;
  2121. }