gnunet-service-setu.c 102 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681
  1. /*
  2. This file is part of GNUnet
  3. Copyright (C) 2013-2017, 2020 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file setu/gnunet-service-setu.c
  18. * @brief set union operation
  19. * @author Florian Dold
  20. * @author Christian Grothoff
  21. */
  22. #include "platform.h"
  23. #include "gnunet_util_lib.h"
  24. #include "gnunet_statistics_service.h"
  25. #include "ibf.h"
  26. #include "gnunet_protocols.h"
  27. #include "gnunet_applications.h"
  28. #include "gnunet_cadet_service.h"
  29. #include "gnunet-service-setu_strata_estimator.h"
  30. #include "gnunet-service-setu_protocol.h"
  31. #include "gnunet_statistics_service.h"
  32. #include <gcrypt.h>
  33. #include "gnunet_setu_service.h"
  34. #include "setu.h"
  35. #define LOG(kind, ...) GNUNET_log_from (kind, "setu", __VA_ARGS__)
  36. /**
  37. * How long do we hold on to an incoming channel if there is
  38. * no local listener before giving up?
  39. */
  40. #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
  41. /**
  42. * Number of IBFs in a strata estimator.
  43. */
  44. #define SE_STRATA_COUNT 32
  45. /**
  46. * Size of the IBFs in the strata estimator.
  47. */
  48. #define SE_IBF_SIZE 80
  49. /**
  50. * The hash num parameter for the difference digests and strata estimators.
  51. */
  52. #define SE_IBF_HASH_NUM 4
  53. /**
  54. * Number of buckets that can be transmitted in one message.
  55. */
  56. #define MAX_BUCKETS_PER_MESSAGE ((1 << 15) / IBF_BUCKET_SIZE)
  57. /**
  58. * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
  59. * Choose this value so that computing the IBF is still cheaper
  60. * than transmitting all values.
  61. */
  62. #define MAX_IBF_ORDER (20)
  63. /**
  64. * Number of buckets used in the ibf per estimated
  65. * difference.
  66. */
  67. #define IBF_ALPHA 4
  68. /**
  69. * Current phase we are in for a union operation.
  70. */
  71. enum UnionOperationPhase
  72. {
  73. /**
  74. * We sent the request message, and expect a strata estimator.
  75. */
  76. PHASE_EXPECT_SE,
  77. /**
  78. * We sent the strata estimator, and expect an IBF. This phase is entered once
  79. * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
  80. *
  81. * XXX: could use better wording.
  82. * XXX: repurposed to also expect a "request full set" message, should be renamed
  83. *
  84. * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
  85. */
  86. PHASE_EXPECT_IBF,
  87. /**
  88. * Continuation for multi part IBFs.
  89. */
  90. PHASE_EXPECT_IBF_CONT,
  91. /**
  92. * We are decoding an IBF.
  93. */
  94. PHASE_INVENTORY_ACTIVE,
  95. /**
  96. * The other peer is decoding the IBF we just sent.
  97. */
  98. PHASE_INVENTORY_PASSIVE,
  99. /**
  100. * The protocol is almost finished, but we still have to flush our message
  101. * queue and/or expect some elements.
  102. */
  103. PHASE_FINISH_CLOSING,
  104. /**
  105. * In the penultimate phase, we wait until all our demands are satisfied.
  106. * Then we send a done message, and wait for another done message.
  107. */
  108. PHASE_FINISH_WAITING,
  109. /**
  110. * In the ultimate phase, we wait until our demands are satisfied and then
  111. * quit (sending another DONE message).
  112. */
  113. PHASE_DONE,
  114. /**
  115. * After sending the full set, wait for responses with the elements
  116. * that the local peer is missing.
  117. */
  118. PHASE_FULL_SENDING,
  119. };
  120. /**
  121. * Information about an element element in the set. All elements are
  122. * stored in a hash-table from their hash-code to their `struct
  123. * Element`, so that the remove and add operations are reasonably
  124. * fast.
  125. */
  126. struct ElementEntry
  127. {
  128. /**
  129. * The actual element. The data for the element
  130. * should be allocated at the end of this struct.
  131. */
  132. struct GNUNET_SETU_Element element;
  133. /**
  134. * Hash of the element. For set union: Will be used to derive the
  135. * different IBF keys for different salts.
  136. */
  137. struct GNUNET_HashCode element_hash;
  138. /**
  139. * First generation that includes this element.
  140. */
  141. unsigned int generation;
  142. /**
  143. * #GNUNET_YES if the element is a remote element, and does not belong
  144. * to the operation's set.
  145. */
  146. int remote;
  147. };
  148. /**
  149. * A listener is inhabited by a client, and waits for evaluation
  150. * requests from remote peers.
  151. */
  152. struct Listener;
  153. /**
  154. * A set that supports a specific operation with other peers.
  155. */
  156. struct Set;
  157. /**
  158. * State we keep per client.
  159. */
  160. struct ClientState
  161. {
  162. /**
  163. * Set, if associated with the client, otherwise NULL.
  164. */
  165. struct Set *set;
  166. /**
  167. * Listener, if associated with the client, otherwise NULL.
  168. */
  169. struct Listener *listener;
  170. /**
  171. * Client handle.
  172. */
  173. struct GNUNET_SERVICE_Client *client;
  174. /**
  175. * Message queue.
  176. */
  177. struct GNUNET_MQ_Handle *mq;
  178. };
  179. /**
  180. * Operation context used to execute a set operation.
  181. */
  182. struct Operation
  183. {
  184. /**
  185. * The identity of the requesting peer. Needs to
  186. * be stored here as the op spec might not have been created yet.
  187. */
  188. struct GNUNET_PeerIdentity peer;
  189. /**
  190. * Initial size of our set, just before the operation started.
  191. */
  192. uint64_t initial_size;
  193. /**
  194. * Kept in a DLL of the listener, if @e listener is non-NULL.
  195. */
  196. struct Operation *next;
  197. /**
  198. * Kept in a DLL of the listener, if @e listener is non-NULL.
  199. */
  200. struct Operation *prev;
  201. /**
  202. * Channel to the peer.
  203. */
  204. struct GNUNET_CADET_Channel *channel;
  205. /**
  206. * Port this operation runs on.
  207. */
  208. struct Listener *listener;
  209. /**
  210. * Message queue for the channel.
  211. */
  212. struct GNUNET_MQ_Handle *mq;
  213. /**
  214. * Context message, may be NULL.
  215. */
  216. struct GNUNET_MessageHeader *context_msg;
  217. /**
  218. * Set associated with the operation, NULL until the spec has been
  219. * associated with a set.
  220. */
  221. struct Set *set;
  222. /**
  223. * Copy of the set's strata estimator at the time of
  224. * creation of this operation.
  225. */
  226. struct StrataEstimator *se;
  227. /**
  228. * The IBF we currently receive.
  229. */
  230. struct InvertibleBloomFilter *remote_ibf;
  231. /**
  232. * The IBF with the local set's element.
  233. */
  234. struct InvertibleBloomFilter *local_ibf;
  235. /**
  236. * Maps unsalted IBF-Keys to elements.
  237. * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
  238. * Colliding IBF-Keys are linked.
  239. */
  240. struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
  241. /**
  242. * Timeout task, if the incoming peer has not been accepted
  243. * after the timeout, it will be disconnected.
  244. */
  245. struct GNUNET_SCHEDULER_Task *timeout_task;
  246. /**
  247. * Hashes for elements that we have demanded from the other peer.
  248. */
  249. struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
  250. /**
  251. * Current state of the operation.
  252. */
  253. enum UnionOperationPhase phase;
  254. /**
  255. * Did we send the client that we are done?
  256. */
  257. int client_done_sent;
  258. /**
  259. * Number of ibf buckets already received into the @a remote_ibf.
  260. */
  261. unsigned int ibf_buckets_received;
  262. /**
  263. * Salt that we're using for sending IBFs
  264. */
  265. uint32_t salt_send;
  266. /**
  267. * Salt for the IBF we've received and that we're currently decoding.
  268. */
  269. uint32_t salt_receive;
  270. /**
  271. * Number of elements we received from the other peer
  272. * that were not in the local set yet.
  273. */
  274. uint32_t received_fresh;
  275. /**
  276. * Total number of elements received from the other peer.
  277. */
  278. uint32_t received_total;
  279. /**
  280. * Salt to use for the operation.
  281. */
  282. uint32_t salt;
  283. /**
  284. * Remote peers element count
  285. */
  286. uint32_t remote_element_count;
  287. /**
  288. * ID used to identify an operation between service and client
  289. */
  290. uint32_t client_request_id;
  291. /**
  292. * Always use delta operation instead of sending full sets,
  293. * even it it's less efficient.
  294. */
  295. int force_delta;
  296. /**
  297. * Always send full sets, even if delta operations would
  298. * be more efficient.
  299. */
  300. int force_full;
  301. /**
  302. * #GNUNET_YES to fail operations where Byzantine faults
  303. * are suspected
  304. */
  305. int byzantine;
  306. /**
  307. * #GNUNET_YES to also send back set elements we are sending to
  308. * the remote peer.
  309. */
  310. int symmetric;
  311. /**
  312. * Lower bound for the set size, used only when
  313. * byzantine mode is enabled.
  314. */
  315. int byzantine_lower_bound;
  316. /**
  317. * Unique request id for the request from a remote peer, sent to the
  318. * client, which will accept or reject the request. Set to '0' iff
  319. * the request has not been suggested yet.
  320. */
  321. uint32_t suggest_id;
  322. /**
  323. * Generation in which the operation handle
  324. * was created.
  325. */
  326. unsigned int generation_created;
  327. };
  328. /**
  329. * SetContent stores the actual set elements, which may be shared by
  330. * multiple generations derived from one set.
  331. */
  332. struct SetContent
  333. {
  334. /**
  335. * Maps `struct GNUNET_HashCode *` to `struct ElementEntry *`.
  336. */
  337. struct GNUNET_CONTAINER_MultiHashMap *elements;
  338. /**
  339. * Number of references to the content.
  340. */
  341. unsigned int refcount;
  342. /**
  343. * FIXME: document!
  344. */
  345. unsigned int latest_generation;
  346. /**
  347. * Number of concurrently active iterators.
  348. */
  349. int iterator_count;
  350. };
  351. /**
  352. * A set that supports a specific operation with other peers.
  353. */
  354. struct Set
  355. {
  356. /**
  357. * Sets are held in a doubly linked list (in `sets_head` and `sets_tail`).
  358. */
  359. struct Set *next;
  360. /**
  361. * Sets are held in a doubly linked list.
  362. */
  363. struct Set *prev;
  364. /**
  365. * Client that owns the set. Only one client may own a set,
  366. * and there can only be one set per client.
  367. */
  368. struct ClientState *cs;
  369. /**
  370. * Content, possibly shared by multiple sets,
  371. * and thus reference counted.
  372. */
  373. struct SetContent *content;
  374. /**
  375. * The strata estimator is only generated once for each set. The IBF keys
  376. * are derived from the element hashes with salt=0.
  377. */
  378. struct StrataEstimator *se;
  379. /**
  380. * Evaluate operations are held in a linked list.
  381. */
  382. struct Operation *ops_head;
  383. /**
  384. * Evaluate operations are held in a linked list.
  385. */
  386. struct Operation *ops_tail;
  387. /**
  388. * Current generation, that is, number of previously executed
  389. * operations and lazy copies on the underlying set content.
  390. */
  391. unsigned int current_generation;
  392. };
  393. /**
  394. * The key entry is used to associate an ibf key with an element.
  395. */
  396. struct KeyEntry
  397. {
  398. /**
  399. * IBF key for the entry, derived from the current salt.
  400. */
  401. struct IBF_Key ibf_key;
  402. /**
  403. * The actual element associated with the key.
  404. *
  405. * Only owned by the union operation if element->operation
  406. * is #GNUNET_YES.
  407. */
  408. struct ElementEntry *element;
  409. /**
  410. * Did we receive this element? Even if element->is_foreign is false, we
  411. * might have received the element, so this indicates that the other peer
  412. * has it.
  413. */
  414. int received;
  415. };
  416. /**
  417. * Used as a closure for sending elements
  418. * with a specific IBF key.
  419. */
  420. struct SendElementClosure
  421. {
  422. /**
  423. * The IBF key whose matching elements should be
  424. * sent.
  425. */
  426. struct IBF_Key ibf_key;
  427. /**
  428. * Operation for which the elements
  429. * should be sent.
  430. */
  431. struct Operation *op;
  432. };
  433. /**
  434. * A listener is inhabited by a client, and waits for evaluation
  435. * requests from remote peers.
  436. */
  437. struct Listener
  438. {
  439. /**
  440. * Listeners are held in a doubly linked list.
  441. */
  442. struct Listener *next;
  443. /**
  444. * Listeners are held in a doubly linked list.
  445. */
  446. struct Listener *prev;
  447. /**
  448. * Head of DLL of operations this listener is responsible for.
  449. * Once the client has accepted/declined the operation, the
  450. * operation is moved to the respective set's operation DLLS.
  451. */
  452. struct Operation *op_head;
  453. /**
  454. * Tail of DLL of operations this listener is responsible for.
  455. * Once the client has accepted/declined the operation, the
  456. * operation is moved to the respective set's operation DLLS.
  457. */
  458. struct Operation *op_tail;
  459. /**
  460. * Client that owns the listener.
  461. * Only one client may own a listener.
  462. */
  463. struct ClientState *cs;
  464. /**
  465. * The port we are listening on with CADET.
  466. */
  467. struct GNUNET_CADET_Port *open_port;
  468. /**
  469. * Application ID for the operation, used to distinguish
  470. * multiple operations of the same type with the same peer.
  471. */
  472. struct GNUNET_HashCode app_id;
  473. };
  474. /**
  475. * Handle to the cadet service, used to listen for and connect to
  476. * remote peers.
  477. */
  478. static struct GNUNET_CADET_Handle *cadet;
  479. /**
  480. * Statistics handle.
  481. */
  482. static struct GNUNET_STATISTICS_Handle *_GSS_statistics;
  483. /**
  484. * Listeners are held in a doubly linked list.
  485. */
  486. static struct Listener *listener_head;
  487. /**
  488. * Listeners are held in a doubly linked list.
  489. */
  490. static struct Listener *listener_tail;
  491. /**
  492. * Number of active clients.
  493. */
  494. static unsigned int num_clients;
  495. /**
  496. * Are we in shutdown? if #GNUNET_YES and the number of clients
  497. * drops to zero, disconnect from CADET.
  498. */
  499. static int in_shutdown;
  500. /**
  501. * Counter for allocating unique IDs for clients, used to identify incoming
  502. * operation requests from remote peers, that the client can choose to accept
  503. * or refuse. 0 must not be used (reserved for uninitialized).
  504. */
  505. static uint32_t suggest_id;
  506. /**
  507. * Iterator over hash map entries, called to
  508. * destroy the linked list of colliding ibf key entries.
  509. *
  510. * @param cls closure
  511. * @param key current key code
  512. * @param value value in the hash map
  513. * @return #GNUNET_YES if we should continue to iterate,
  514. * #GNUNET_NO if not.
  515. */
  516. static int
  517. destroy_key_to_element_iter (void *cls,
  518. uint32_t key,
  519. void *value)
  520. {
  521. struct KeyEntry *k = value;
  522. GNUNET_assert (NULL != k);
  523. if (GNUNET_YES == k->element->remote)
  524. {
  525. GNUNET_free (k->element);
  526. k->element = NULL;
  527. }
  528. GNUNET_free (k);
  529. return GNUNET_YES;
  530. }
  531. /**
  532. * Signal to the client that the operation has finished and
  533. * destroy the operation.
  534. *
  535. * @param cls operation to destroy
  536. */
  537. static void
  538. send_client_done (void *cls)
  539. {
  540. struct Operation *op = cls;
  541. struct GNUNET_MQ_Envelope *ev;
  542. struct GNUNET_SETU_ResultMessage *rm;
  543. if (GNUNET_YES == op->client_done_sent)
  544. return;
  545. if (PHASE_DONE != op->phase)
  546. {
  547. LOG (GNUNET_ERROR_TYPE_WARNING,
  548. "Union operation failed\n");
  549. GNUNET_STATISTICS_update (_GSS_statistics,
  550. "# Union operations failed",
  551. 1,
  552. GNUNET_NO);
  553. ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SETU_RESULT);
  554. rm->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
  555. rm->request_id = htonl (op->client_request_id);
  556. rm->element_type = htons (0);
  557. GNUNET_MQ_send (op->set->cs->mq,
  558. ev);
  559. return;
  560. }
  561. op->client_done_sent = GNUNET_YES;
  562. GNUNET_STATISTICS_update (_GSS_statistics,
  563. "# Union operations succeeded",
  564. 1,
  565. GNUNET_NO);
  566. LOG (GNUNET_ERROR_TYPE_INFO,
  567. "Signalling client that union operation is done\n");
  568. ev = GNUNET_MQ_msg (rm,
  569. GNUNET_MESSAGE_TYPE_SETU_RESULT);
  570. rm->request_id = htonl (op->client_request_id);
  571. rm->result_status = htons (GNUNET_SETU_STATUS_DONE);
  572. rm->element_type = htons (0);
  573. rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (
  574. op->key_to_element));
  575. GNUNET_MQ_send (op->set->cs->mq,
  576. ev);
  577. }
  578. /* FIXME: the destroy logic is a mess and should be cleaned up! */
  579. /**
  580. * Destroy the given operation. Used for any operation where both
  581. * peers were known and that thus actually had a vt and channel. Must
  582. * not be used for operations where 'listener' is still set and we do
  583. * not know the other peer.
  584. *
  585. * Call the implementation-specific cancel function of the operation.
  586. * Disconnects from the remote peer. Does not disconnect the client,
  587. * as there may be multiple operations per set.
  588. *
  589. * @param op operation to destroy
  590. */
  591. static void
  592. _GSS_operation_destroy (struct Operation *op)
  593. {
  594. struct Set *set = op->set;
  595. struct GNUNET_CADET_Channel *channel;
  596. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  597. "Destroying union operation %p\n",
  598. op);
  599. GNUNET_assert (NULL == op->listener);
  600. /* check if the op was canceled twice */
  601. if (NULL != op->remote_ibf)
  602. {
  603. ibf_destroy (op->remote_ibf);
  604. op->remote_ibf = NULL;
  605. }
  606. if (NULL != op->demanded_hashes)
  607. {
  608. GNUNET_CONTAINER_multihashmap_destroy (op->demanded_hashes);
  609. op->demanded_hashes = NULL;
  610. }
  611. if (NULL != op->local_ibf)
  612. {
  613. ibf_destroy (op->local_ibf);
  614. op->local_ibf = NULL;
  615. }
  616. if (NULL != op->se)
  617. {
  618. strata_estimator_destroy (op->se);
  619. op->se = NULL;
  620. }
  621. if (NULL != op->key_to_element)
  622. {
  623. GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element,
  624. &destroy_key_to_element_iter,
  625. NULL);
  626. GNUNET_CONTAINER_multihashmap32_destroy (op->key_to_element);
  627. op->key_to_element = NULL;
  628. }
  629. if (NULL != set)
  630. {
  631. GNUNET_CONTAINER_DLL_remove (set->ops_head,
  632. set->ops_tail,
  633. op);
  634. op->set = NULL;
  635. }
  636. if (NULL != op->context_msg)
  637. {
  638. GNUNET_free (op->context_msg);
  639. op->context_msg = NULL;
  640. }
  641. if (NULL != (channel = op->channel))
  642. {
  643. /* This will free op; called conditionally as this helper function
  644. is also called from within the channel disconnect handler. */
  645. op->channel = NULL;
  646. GNUNET_CADET_channel_destroy (channel);
  647. }
  648. /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
  649. * there was a channel end handler that will free 'op' on the call stack. */
  650. }
  651. /**
  652. * This function probably should not exist
  653. * and be replaced by inlining more specific
  654. * logic in the various places where it is called.
  655. */
  656. static void
  657. _GSS_operation_destroy2 (struct Operation *op);
  658. /**
  659. * Destroy an incoming request from a remote peer
  660. *
  661. * @param op remote request to destroy
  662. */
  663. static void
  664. incoming_destroy (struct Operation *op)
  665. {
  666. struct Listener *listener;
  667. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  668. "Destroying incoming operation %p\n",
  669. op);
  670. if (NULL != (listener = op->listener))
  671. {
  672. GNUNET_CONTAINER_DLL_remove (listener->op_head,
  673. listener->op_tail,
  674. op);
  675. op->listener = NULL;
  676. }
  677. if (NULL != op->timeout_task)
  678. {
  679. GNUNET_SCHEDULER_cancel (op->timeout_task);
  680. op->timeout_task = NULL;
  681. }
  682. _GSS_operation_destroy2 (op);
  683. }
  684. /**
  685. * This function probably should not exist
  686. * and be replaced by inlining more specific
  687. * logic in the various places where it is called.
  688. */
  689. static void
  690. _GSS_operation_destroy2 (struct Operation *op)
  691. {
  692. struct GNUNET_CADET_Channel *channel;
  693. if (NULL != (channel = op->channel))
  694. {
  695. /* This will free op; called conditionally as this helper function
  696. is also called from within the channel disconnect handler. */
  697. op->channel = NULL;
  698. GNUNET_CADET_channel_destroy (channel);
  699. }
  700. if (NULL != op->listener)
  701. {
  702. incoming_destroy (op);
  703. return;
  704. }
  705. if (NULL != op->set)
  706. send_client_done (op);
  707. _GSS_operation_destroy (op);
  708. GNUNET_free (op);
  709. }
  710. /**
  711. * Inform the client that the union operation has failed,
  712. * and proceed to destroy the evaluate operation.
  713. *
  714. * @param op the union operation to fail
  715. */
  716. static void
  717. fail_union_operation (struct Operation *op)
  718. {
  719. struct GNUNET_MQ_Envelope *ev;
  720. struct GNUNET_SETU_ResultMessage *msg;
  721. LOG (GNUNET_ERROR_TYPE_WARNING,
  722. "union operation failed\n");
  723. ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SETU_RESULT);
  724. msg->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
  725. msg->request_id = htonl (op->client_request_id);
  726. msg->element_type = htons (0);
  727. GNUNET_MQ_send (op->set->cs->mq,
  728. ev);
  729. _GSS_operation_destroy (op);
  730. }
  731. /**
  732. * Derive the IBF key from a hash code and
  733. * a salt.
  734. *
  735. * @param src the hash code
  736. * @return the derived IBF key
  737. */
  738. static struct IBF_Key
  739. get_ibf_key (const struct GNUNET_HashCode *src)
  740. {
  741. struct IBF_Key key;
  742. uint16_t salt = 0;
  743. GNUNET_assert (GNUNET_OK ==
  744. GNUNET_CRYPTO_kdf (&key, sizeof(key),
  745. src, sizeof *src,
  746. &salt, sizeof(salt),
  747. NULL, 0));
  748. return key;
  749. }
  750. /**
  751. * Context for #op_get_element_iterator
  752. */
  753. struct GetElementContext
  754. {
  755. /**
  756. * FIXME.
  757. */
  758. struct GNUNET_HashCode hash;
  759. /**
  760. * FIXME.
  761. */
  762. struct KeyEntry *k;
  763. };
  764. /**
  765. * Iterator over the mapping from IBF keys to element entries. Checks if we
  766. * have an element with a given GNUNET_HashCode.
  767. *
  768. * @param cls closure
  769. * @param key current key code
  770. * @param value value in the hash map
  771. * @return #GNUNET_YES if we should search further,
  772. * #GNUNET_NO if we've found the element.
  773. */
  774. static int
  775. op_get_element_iterator (void *cls,
  776. uint32_t key,
  777. void *value)
  778. {
  779. struct GetElementContext *ctx = cls;
  780. struct KeyEntry *k = value;
  781. GNUNET_assert (NULL != k);
  782. if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
  783. &ctx->hash))
  784. {
  785. ctx->k = k;
  786. return GNUNET_NO;
  787. }
  788. return GNUNET_YES;
  789. }
  790. /**
  791. * Determine whether the given element is already in the operation's element
  792. * set.
  793. *
  794. * @param op operation that should be tested for 'element_hash'
  795. * @param element_hash hash of the element to look for
  796. * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
  797. */
  798. static struct KeyEntry *
  799. op_get_element (struct Operation *op,
  800. const struct GNUNET_HashCode *element_hash)
  801. {
  802. int ret;
  803. struct IBF_Key ibf_key;
  804. struct GetElementContext ctx = { { { 0 } }, 0 };
  805. ctx.hash = *element_hash;
  806. ibf_key = get_ibf_key (element_hash);
  807. ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->key_to_element,
  808. (uint32_t) ibf_key.key_val,
  809. &op_get_element_iterator,
  810. &ctx);
  811. /* was the iteration aborted because we found the element? */
  812. if (GNUNET_SYSERR == ret)
  813. {
  814. GNUNET_assert (NULL != ctx.k);
  815. return ctx.k;
  816. }
  817. return NULL;
  818. }
  819. /**
  820. * Insert an element into the union operation's
  821. * key-to-element mapping. Takes ownership of 'ee'.
  822. * Note that this does not insert the element in the set,
  823. * only in the operation's key-element mapping.
  824. * This is done to speed up re-tried operations, if some elements
  825. * were transmitted, and then the IBF fails to decode.
  826. *
  827. * XXX: clarify ownership, doesn't sound right.
  828. *
  829. * @param op the union operation
  830. * @param ee the element entry
  831. * @parem received was this element received from the remote peer?
  832. */
  833. static void
  834. op_register_element (struct Operation *op,
  835. struct ElementEntry *ee,
  836. int received)
  837. {
  838. struct IBF_Key ibf_key;
  839. struct KeyEntry *k;
  840. ibf_key = get_ibf_key (&ee->element_hash);
  841. k = GNUNET_new (struct KeyEntry);
  842. k->element = ee;
  843. k->ibf_key = ibf_key;
  844. k->received = received;
  845. GNUNET_assert (GNUNET_OK ==
  846. GNUNET_CONTAINER_multihashmap32_put (op->key_to_element,
  847. (uint32_t) ibf_key.key_val,
  848. k,
  849. GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
  850. }
  851. /**
  852. * FIXME.
  853. */
  854. static void
  855. salt_key (const struct IBF_Key *k_in,
  856. uint32_t salt,
  857. struct IBF_Key *k_out)
  858. {
  859. int s = salt % 64;
  860. uint64_t x = k_in->key_val;
  861. /* rotate ibf key */
  862. x = (x >> s) | (x << (64 - s));
  863. k_out->key_val = x;
  864. }
  865. /**
  866. * FIXME.
  867. */
  868. static void
  869. unsalt_key (const struct IBF_Key *k_in,
  870. uint32_t salt,
  871. struct IBF_Key *k_out)
  872. {
  873. int s = salt % 64;
  874. uint64_t x = k_in->key_val;
  875. x = (x << s) | (x >> (64 - s));
  876. k_out->key_val = x;
  877. }
  878. /**
  879. * Insert a key into an ibf.
  880. *
  881. * @param cls the ibf
  882. * @param key unused
  883. * @param value the key entry to get the key from
  884. */
  885. static int
  886. prepare_ibf_iterator (void *cls,
  887. uint32_t key,
  888. void *value)
  889. {
  890. struct Operation *op = cls;
  891. struct KeyEntry *ke = value;
  892. struct IBF_Key salted_key;
  893. LOG (GNUNET_ERROR_TYPE_DEBUG,
  894. "[OP %p] inserting %lx (hash %s) into ibf\n",
  895. op,
  896. (unsigned long) ke->ibf_key.key_val,
  897. GNUNET_h2s (&ke->element->element_hash));
  898. salt_key (&ke->ibf_key,
  899. op->salt_send,
  900. &salted_key);
  901. ibf_insert (op->local_ibf, salted_key);
  902. return GNUNET_YES;
  903. }
  904. /**
  905. * Is element @a ee part of the set used by @a op?
  906. *
  907. * @param ee element to test
  908. * @param op operation the defines the set and its generation
  909. * @return #GNUNET_YES if the element is in the set, #GNUNET_NO if not
  910. */
  911. static int
  912. _GSS_is_element_of_operation (struct ElementEntry *ee,
  913. struct Operation *op)
  914. {
  915. return ee->generation >= op->generation_created;
  916. }
  917. /**
  918. * Iterator for initializing the
  919. * key-to-element mapping of a union operation
  920. *
  921. * @param cls the union operation `struct Operation *`
  922. * @param key unused
  923. * @param value the `struct ElementEntry *` to insert
  924. * into the key-to-element mapping
  925. * @return #GNUNET_YES (to continue iterating)
  926. */
  927. static int
  928. init_key_to_element_iterator (void *cls,
  929. const struct GNUNET_HashCode *key,
  930. void *value)
  931. {
  932. struct Operation *op = cls;
  933. struct ElementEntry *ee = value;
  934. /* make sure that the element belongs to the set at the time
  935. * of creating the operation */
  936. if (GNUNET_NO ==
  937. _GSS_is_element_of_operation (ee,
  938. op))
  939. return GNUNET_YES;
  940. GNUNET_assert (GNUNET_NO == ee->remote);
  941. op_register_element (op,
  942. ee,
  943. GNUNET_NO);
  944. return GNUNET_YES;
  945. }
  946. /**
  947. * Initialize the IBF key to element mapping local to this set operation.
  948. *
  949. * @param op the set union operation
  950. */
  951. static void
  952. initialize_key_to_element (struct Operation *op)
  953. {
  954. unsigned int len;
  955. GNUNET_assert (NULL == op->key_to_element);
  956. len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
  957. op->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
  958. GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
  959. &init_key_to_element_iterator,
  960. op);
  961. }
  962. /**
  963. * Create an ibf with the operation's elements
  964. * of the specified size
  965. *
  966. * @param op the union operation
  967. * @param size size of the ibf to create
  968. * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
  969. */
  970. static int
  971. prepare_ibf (struct Operation *op,
  972. uint32_t size)
  973. {
  974. GNUNET_assert (NULL != op->key_to_element);
  975. if (NULL != op->local_ibf)
  976. ibf_destroy (op->local_ibf);
  977. op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
  978. if (NULL == op->local_ibf)
  979. {
  980. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  981. "Failed to allocate local IBF\n");
  982. return GNUNET_SYSERR;
  983. }
  984. GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element,
  985. &prepare_ibf_iterator,
  986. op);
  987. return GNUNET_OK;
  988. }
  989. /**
  990. * Send an ibf of appropriate size.
  991. *
  992. * Fragments the IBF into multiple messages if necessary.
  993. *
  994. * @param op the union operation
  995. * @param ibf_order order of the ibf to send, size=2^order
  996. * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
  997. */
  998. static int
  999. send_ibf (struct Operation *op,
  1000. uint16_t ibf_order)
  1001. {
  1002. unsigned int buckets_sent = 0;
  1003. struct InvertibleBloomFilter *ibf;
  1004. if (GNUNET_OK !=
  1005. prepare_ibf (op, 1 << ibf_order))
  1006. {
  1007. /* allocation failed */
  1008. return GNUNET_SYSERR;
  1009. }
  1010. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1011. "sending ibf of size %u\n",
  1012. 1 << ibf_order);
  1013. {
  1014. char name[64] = { 0 };
  1015. snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_order);
  1016. GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
  1017. }
  1018. ibf = op->local_ibf;
  1019. while (buckets_sent < (1 << ibf_order))
  1020. {
  1021. unsigned int buckets_in_message;
  1022. struct GNUNET_MQ_Envelope *ev;
  1023. struct IBFMessage *msg;
  1024. buckets_in_message = (1 << ibf_order) - buckets_sent;
  1025. /* limit to maximum */
  1026. if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
  1027. buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
  1028. ev = GNUNET_MQ_msg_extra (msg,
  1029. buckets_in_message * IBF_BUCKET_SIZE,
  1030. GNUNET_MESSAGE_TYPE_SETU_P2P_IBF);
  1031. msg->reserved1 = 0;
  1032. msg->reserved2 = 0;
  1033. msg->order = ibf_order;
  1034. msg->offset = htonl (buckets_sent);
  1035. msg->salt = htonl (op->salt_send);
  1036. ibf_write_slice (ibf, buckets_sent,
  1037. buckets_in_message, &msg[1]);
  1038. buckets_sent += buckets_in_message;
  1039. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1040. "ibf chunk size %u, %u/%u sent\n",
  1041. buckets_in_message,
  1042. buckets_sent,
  1043. 1 << ibf_order);
  1044. GNUNET_MQ_send (op->mq, ev);
  1045. }
  1046. /* The other peer must decode the IBF, so
  1047. * we're passive. */
  1048. op->phase = PHASE_INVENTORY_PASSIVE;
  1049. return GNUNET_OK;
  1050. }
  1051. /**
  1052. * Compute the necessary order of an ibf
  1053. * from the size of the symmetric set difference.
  1054. *
  1055. * @param diff the difference
  1056. * @return the required size of the ibf
  1057. */
  1058. static unsigned int
  1059. get_order_from_difference (unsigned int diff)
  1060. {
  1061. unsigned int ibf_order;
  1062. ibf_order = 2;
  1063. while (((1 << ibf_order) < (IBF_ALPHA * diff) ||
  1064. ((1 << ibf_order) < SE_IBF_HASH_NUM)) &&
  1065. (ibf_order < MAX_IBF_ORDER))
  1066. ibf_order++;
  1067. // add one for correction
  1068. return ibf_order + 1;
  1069. }
  1070. /**
  1071. * Send a set element.
  1072. *
  1073. * @param cls the union operation `struct Operation *`
  1074. * @param key unused
  1075. * @param value the `struct ElementEntry *` to insert
  1076. * into the key-to-element mapping
  1077. * @return #GNUNET_YES (to continue iterating)
  1078. */
  1079. static int
  1080. send_full_element_iterator (void *cls,
  1081. const struct GNUNET_HashCode *key,
  1082. void *value)
  1083. {
  1084. struct Operation *op = cls;
  1085. struct GNUNET_SETU_ElementMessage *emsg;
  1086. struct ElementEntry *ee = value;
  1087. struct GNUNET_SETU_Element *el = &ee->element;
  1088. struct GNUNET_MQ_Envelope *ev;
  1089. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1090. "Sending element %s\n",
  1091. GNUNET_h2s (key));
  1092. ev = GNUNET_MQ_msg_extra (emsg,
  1093. el->size,
  1094. GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT);
  1095. emsg->element_type = htons (el->element_type);
  1096. GNUNET_memcpy (&emsg[1],
  1097. el->data,
  1098. el->size);
  1099. GNUNET_MQ_send (op->mq,
  1100. ev);
  1101. return GNUNET_YES;
  1102. }
  1103. /**
  1104. * Switch to full set transmission for @a op.
  1105. *
  1106. * @param op operation to switch to full set transmission.
  1107. */
  1108. static void
  1109. send_full_set (struct Operation *op)
  1110. {
  1111. struct GNUNET_MQ_Envelope *ev;
  1112. op->phase = PHASE_FULL_SENDING;
  1113. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1114. "Dedicing to transmit the full set\n");
  1115. /* FIXME: use a more memory-friendly way of doing this with an
  1116. iterator, just as we do in the non-full case! */
  1117. (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
  1118. &send_full_element_iterator,
  1119. op);
  1120. ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE);
  1121. GNUNET_MQ_send (op->mq,
  1122. ev);
  1123. }
  1124. /**
  1125. * Handle a strata estimator from a remote peer
  1126. *
  1127. * @param cls the union operation
  1128. * @param msg the message
  1129. */
  1130. static int
  1131. check_union_p2p_strata_estimator (void *cls,
  1132. const struct StrataEstimatorMessage *msg)
  1133. {
  1134. struct Operation *op = cls;
  1135. int is_compressed;
  1136. size_t len;
  1137. if (op->phase != PHASE_EXPECT_SE)
  1138. {
  1139. GNUNET_break (0);
  1140. return GNUNET_SYSERR;
  1141. }
  1142. is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
  1143. msg->header.type));
  1144. len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
  1145. if ((GNUNET_NO == is_compressed) &&
  1146. (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE))
  1147. {
  1148. GNUNET_break (0);
  1149. return GNUNET_SYSERR;
  1150. }
  1151. return GNUNET_OK;
  1152. }
  1153. /**
  1154. * Handle a strata estimator from a remote peer
  1155. *
  1156. * @param cls the union operation
  1157. * @param msg the message
  1158. */
  1159. static void
  1160. handle_union_p2p_strata_estimator (void *cls,
  1161. const struct StrataEstimatorMessage *msg)
  1162. {
  1163. struct Operation *op = cls;
  1164. struct StrataEstimator *remote_se;
  1165. unsigned int diff;
  1166. uint64_t other_size;
  1167. size_t len;
  1168. int is_compressed;
  1169. is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
  1170. msg->header.type));
  1171. GNUNET_STATISTICS_update (_GSS_statistics,
  1172. "# bytes of SE received",
  1173. ntohs (msg->header.size),
  1174. GNUNET_NO);
  1175. len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
  1176. other_size = GNUNET_ntohll (msg->set_size);
  1177. remote_se = strata_estimator_create (SE_STRATA_COUNT,
  1178. SE_IBF_SIZE,
  1179. SE_IBF_HASH_NUM);
  1180. if (NULL == remote_se)
  1181. {
  1182. /* insufficient resources, fail */
  1183. fail_union_operation (op);
  1184. return;
  1185. }
  1186. if (GNUNET_OK !=
  1187. strata_estimator_read (&msg[1],
  1188. len,
  1189. is_compressed,
  1190. remote_se))
  1191. {
  1192. /* decompression failed */
  1193. strata_estimator_destroy (remote_se);
  1194. fail_union_operation (op);
  1195. return;
  1196. }
  1197. GNUNET_assert (NULL != op->se);
  1198. diff = strata_estimator_difference (remote_se,
  1199. op->se);
  1200. if (diff > 200)
  1201. diff = diff * 3 / 2;
  1202. strata_estimator_destroy (remote_se);
  1203. strata_estimator_destroy (op->se);
  1204. op->se = NULL;
  1205. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1206. "got se diff=%d, using ibf size %d\n",
  1207. diff,
  1208. 1U << get_order_from_difference (diff));
  1209. {
  1210. char *set_debug;
  1211. set_debug = getenv ("GNUNET_SETU_BENCHMARK");
  1212. if ((NULL != set_debug) &&
  1213. (0 == strcmp (set_debug, "1")))
  1214. {
  1215. FILE *f = fopen ("set.log", "a");
  1216. fprintf (f, "%llu\n", (unsigned long long) diff);
  1217. fclose (f);
  1218. }
  1219. }
  1220. if ((GNUNET_YES == op->byzantine) &&
  1221. (other_size < op->byzantine_lower_bound))
  1222. {
  1223. GNUNET_break (0);
  1224. fail_union_operation (op);
  1225. return;
  1226. }
  1227. if ((GNUNET_YES == op->force_full) ||
  1228. (diff > op->initial_size / 4) ||
  1229. (0 == other_size))
  1230. {
  1231. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1232. "Deciding to go for full set transmission (diff=%d, own set=%llu)\n",
  1233. diff,
  1234. (unsigned long long) op->initial_size);
  1235. GNUNET_STATISTICS_update (_GSS_statistics,
  1236. "# of full sends",
  1237. 1,
  1238. GNUNET_NO);
  1239. if ((op->initial_size <= other_size) ||
  1240. (0 == other_size))
  1241. {
  1242. send_full_set (op);
  1243. }
  1244. else
  1245. {
  1246. struct GNUNET_MQ_Envelope *ev;
  1247. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1248. "Telling other peer that we expect its full set\n");
  1249. op->phase = PHASE_EXPECT_IBF;
  1250. ev = GNUNET_MQ_msg_header (
  1251. GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL);
  1252. GNUNET_MQ_send (op->mq,
  1253. ev);
  1254. }
  1255. }
  1256. else
  1257. {
  1258. GNUNET_STATISTICS_update (_GSS_statistics,
  1259. "# of ibf sends",
  1260. 1,
  1261. GNUNET_NO);
  1262. if (GNUNET_OK !=
  1263. send_ibf (op,
  1264. get_order_from_difference (diff)))
  1265. {
  1266. /* Internal error, best we can do is shut the connection */
  1267. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  1268. "Failed to send IBF, closing connection\n");
  1269. fail_union_operation (op);
  1270. return;
  1271. }
  1272. }
  1273. GNUNET_CADET_receive_done (op->channel);
  1274. }
  1275. /**
  1276. * Iterator to send elements to a remote peer
  1277. *
  1278. * @param cls closure with the element key and the union operation
  1279. * @param key ignored
  1280. * @param value the key entry
  1281. */
  1282. static int
  1283. send_offers_iterator (void *cls,
  1284. uint32_t key,
  1285. void *value)
  1286. {
  1287. struct SendElementClosure *sec = cls;
  1288. struct Operation *op = sec->op;
  1289. struct KeyEntry *ke = value;
  1290. struct GNUNET_MQ_Envelope *ev;
  1291. struct GNUNET_MessageHeader *mh;
  1292. /* Detect 32-bit key collision for the 64-bit IBF keys. */
  1293. if (ke->ibf_key.key_val != sec->ibf_key.key_val)
  1294. return GNUNET_YES;
  1295. ev = GNUNET_MQ_msg_header_extra (mh,
  1296. sizeof(struct GNUNET_HashCode),
  1297. GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER);
  1298. GNUNET_assert (NULL != ev);
  1299. *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
  1300. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1301. "[OP %p] sending element offer (%s) to peer\n",
  1302. op,
  1303. GNUNET_h2s (&ke->element->element_hash));
  1304. GNUNET_MQ_send (op->mq, ev);
  1305. return GNUNET_YES;
  1306. }
  1307. /**
  1308. * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
  1309. *
  1310. * @param op union operation
  1311. * @param ibf_key IBF key of interest
  1312. */
  1313. static void
  1314. send_offers_for_key (struct Operation *op,
  1315. struct IBF_Key ibf_key)
  1316. {
  1317. struct SendElementClosure send_cls;
  1318. send_cls.ibf_key = ibf_key;
  1319. send_cls.op = op;
  1320. (void) GNUNET_CONTAINER_multihashmap32_get_multiple (
  1321. op->key_to_element,
  1322. (uint32_t) ibf_key.
  1323. key_val,
  1324. &send_offers_iterator,
  1325. &send_cls);
  1326. }
  1327. /**
  1328. * Decode which elements are missing on each side, and
  1329. * send the appropriate offers and inquiries.
  1330. *
  1331. * @param op union operation
  1332. * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
  1333. */
  1334. static int
  1335. decode_and_send (struct Operation *op)
  1336. {
  1337. struct IBF_Key key;
  1338. struct IBF_Key last_key;
  1339. int side;
  1340. unsigned int num_decoded;
  1341. struct InvertibleBloomFilter *diff_ibf;
  1342. GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->phase);
  1343. if (GNUNET_OK !=
  1344. prepare_ibf (op,
  1345. op->remote_ibf->size))
  1346. {
  1347. GNUNET_break (0);
  1348. /* allocation failed */
  1349. return GNUNET_SYSERR;
  1350. }
  1351. diff_ibf = ibf_dup (op->local_ibf);
  1352. ibf_subtract (diff_ibf,
  1353. op->remote_ibf);
  1354. ibf_destroy (op->remote_ibf);
  1355. op->remote_ibf = NULL;
  1356. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1357. "decoding IBF (size=%u)\n",
  1358. diff_ibf->size);
  1359. num_decoded = 0;
  1360. key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
  1361. while (1)
  1362. {
  1363. int res;
  1364. int cycle_detected = GNUNET_NO;
  1365. last_key = key;
  1366. res = ibf_decode (diff_ibf,
  1367. &side,
  1368. &key);
  1369. if (res == GNUNET_OK)
  1370. {
  1371. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1372. "decoded ibf key %lx\n",
  1373. (unsigned long) key.key_val);
  1374. num_decoded += 1;
  1375. if ((num_decoded > diff_ibf->size) ||
  1376. ((num_decoded > 1) &&
  1377. (last_key.key_val == key.key_val)))
  1378. {
  1379. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1380. "detected cyclic ibf (decoded %u/%u)\n",
  1381. num_decoded,
  1382. diff_ibf->size);
  1383. cycle_detected = GNUNET_YES;
  1384. }
  1385. }
  1386. if ((GNUNET_SYSERR == res) ||
  1387. (GNUNET_YES == cycle_detected))
  1388. {
  1389. int next_order;
  1390. next_order = 0;
  1391. while (1 << next_order < diff_ibf->size)
  1392. next_order++;
  1393. next_order++;
  1394. if (next_order <= MAX_IBF_ORDER)
  1395. {
  1396. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1397. "decoding failed, sending larger ibf (size %u)\n",
  1398. 1 << next_order);
  1399. GNUNET_STATISTICS_update (_GSS_statistics,
  1400. "# of IBF retries",
  1401. 1,
  1402. GNUNET_NO);
  1403. op->salt_send++;
  1404. if (GNUNET_OK !=
  1405. send_ibf (op, next_order))
  1406. {
  1407. /* Internal error, best we can do is shut the connection */
  1408. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  1409. "Failed to send IBF, closing connection\n");
  1410. fail_union_operation (op);
  1411. ibf_destroy (diff_ibf);
  1412. return GNUNET_SYSERR;
  1413. }
  1414. }
  1415. else
  1416. {
  1417. GNUNET_STATISTICS_update (_GSS_statistics,
  1418. "# of failed union operations (too large)",
  1419. 1,
  1420. GNUNET_NO);
  1421. // XXX: Send the whole set, element-by-element
  1422. LOG (GNUNET_ERROR_TYPE_ERROR,
  1423. "set union failed: reached ibf limit\n");
  1424. fail_union_operation (op);
  1425. ibf_destroy (diff_ibf);
  1426. return GNUNET_SYSERR;
  1427. }
  1428. break;
  1429. }
  1430. if (GNUNET_NO == res)
  1431. {
  1432. struct GNUNET_MQ_Envelope *ev;
  1433. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1434. "transmitted all values, sending DONE\n");
  1435. ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE);
  1436. GNUNET_MQ_send (op->mq, ev);
  1437. /* We now wait until we get a DONE message back
  1438. * and then wait for our MQ to be flushed and all our
  1439. * demands be delivered. */
  1440. break;
  1441. }
  1442. if (1 == side)
  1443. {
  1444. struct IBF_Key unsalted_key;
  1445. unsalt_key (&key,
  1446. op->salt_receive,
  1447. &unsalted_key);
  1448. send_offers_for_key (op,
  1449. unsalted_key);
  1450. }
  1451. else if (-1 == side)
  1452. {
  1453. struct GNUNET_MQ_Envelope *ev;
  1454. struct InquiryMessage *msg;
  1455. /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
  1456. * the effort additional complexity. */
  1457. ev = GNUNET_MQ_msg_extra (msg,
  1458. sizeof(struct IBF_Key),
  1459. GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY);
  1460. msg->salt = htonl (op->salt_receive);
  1461. GNUNET_memcpy (&msg[1],
  1462. &key,
  1463. sizeof(struct IBF_Key));
  1464. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1465. "sending element inquiry for IBF key %lx\n",
  1466. (unsigned long) key.key_val);
  1467. GNUNET_MQ_send (op->mq, ev);
  1468. }
  1469. else
  1470. {
  1471. GNUNET_assert (0);
  1472. }
  1473. }
  1474. ibf_destroy (diff_ibf);
  1475. return GNUNET_OK;
  1476. }
  1477. /**
  1478. * Check an IBF message from a remote peer.
  1479. *
  1480. * Reassemble the IBF from multiple pieces, and
  1481. * process the whole IBF once possible.
  1482. *
  1483. * @param cls the union operation
  1484. * @param msg the header of the message
  1485. * @return #GNUNET_OK if @a msg is well-formed
  1486. */
  1487. static int
  1488. check_union_p2p_ibf (void *cls,
  1489. const struct IBFMessage *msg)
  1490. {
  1491. struct Operation *op = cls;
  1492. unsigned int buckets_in_message;
  1493. buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
  1494. / IBF_BUCKET_SIZE;
  1495. if (0 == buckets_in_message)
  1496. {
  1497. GNUNET_break_op (0);
  1498. return GNUNET_SYSERR;
  1499. }
  1500. if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message
  1501. * IBF_BUCKET_SIZE)
  1502. {
  1503. GNUNET_break_op (0);
  1504. return GNUNET_SYSERR;
  1505. }
  1506. if (op->phase == PHASE_EXPECT_IBF_CONT)
  1507. {
  1508. if (ntohl (msg->offset) != op->ibf_buckets_received)
  1509. {
  1510. GNUNET_break_op (0);
  1511. return GNUNET_SYSERR;
  1512. }
  1513. if (1 << msg->order != op->remote_ibf->size)
  1514. {
  1515. GNUNET_break_op (0);
  1516. return GNUNET_SYSERR;
  1517. }
  1518. if (ntohl (msg->salt) != op->salt_receive)
  1519. {
  1520. GNUNET_break_op (0);
  1521. return GNUNET_SYSERR;
  1522. }
  1523. }
  1524. else if ((op->phase != PHASE_INVENTORY_PASSIVE) &&
  1525. (op->phase != PHASE_EXPECT_IBF))
  1526. {
  1527. GNUNET_break_op (0);
  1528. return GNUNET_SYSERR;
  1529. }
  1530. return GNUNET_OK;
  1531. }
  1532. /**
  1533. * Handle an IBF message from a remote peer.
  1534. *
  1535. * Reassemble the IBF from multiple pieces, and
  1536. * process the whole IBF once possible.
  1537. *
  1538. * @param cls the union operation
  1539. * @param msg the header of the message
  1540. */
  1541. static void
  1542. handle_union_p2p_ibf (void *cls,
  1543. const struct IBFMessage *msg)
  1544. {
  1545. struct Operation *op = cls;
  1546. unsigned int buckets_in_message;
  1547. buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
  1548. / IBF_BUCKET_SIZE;
  1549. if ((op->phase == PHASE_INVENTORY_PASSIVE) ||
  1550. (op->phase == PHASE_EXPECT_IBF))
  1551. {
  1552. op->phase = PHASE_EXPECT_IBF_CONT;
  1553. GNUNET_assert (NULL == op->remote_ibf);
  1554. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1555. "Creating new ibf of size %u\n",
  1556. 1 << msg->order);
  1557. op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
  1558. op->salt_receive = ntohl (msg->salt);
  1559. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1560. "Receiving new IBF with salt %u\n",
  1561. op->salt_receive);
  1562. if (NULL == op->remote_ibf)
  1563. {
  1564. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  1565. "Failed to parse remote IBF, closing connection\n");
  1566. fail_union_operation (op);
  1567. return;
  1568. }
  1569. op->ibf_buckets_received = 0;
  1570. if (0 != ntohl (msg->offset))
  1571. {
  1572. GNUNET_break_op (0);
  1573. fail_union_operation (op);
  1574. return;
  1575. }
  1576. }
  1577. else
  1578. {
  1579. GNUNET_assert (op->phase == PHASE_EXPECT_IBF_CONT);
  1580. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1581. "Received more of IBF\n");
  1582. }
  1583. GNUNET_assert (NULL != op->remote_ibf);
  1584. ibf_read_slice (&msg[1],
  1585. op->ibf_buckets_received,
  1586. buckets_in_message,
  1587. op->remote_ibf);
  1588. op->ibf_buckets_received += buckets_in_message;
  1589. if (op->ibf_buckets_received == op->remote_ibf->size)
  1590. {
  1591. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1592. "received full ibf\n");
  1593. op->phase = PHASE_INVENTORY_ACTIVE;
  1594. if (GNUNET_OK !=
  1595. decode_and_send (op))
  1596. {
  1597. /* Internal error, best we can do is shut down */
  1598. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  1599. "Failed to decode IBF, closing connection\n");
  1600. fail_union_operation (op);
  1601. return;
  1602. }
  1603. }
  1604. GNUNET_CADET_receive_done (op->channel);
  1605. }
  1606. /**
  1607. * Send a result message to the client indicating
  1608. * that there is a new element.
  1609. *
  1610. * @param op union operation
  1611. * @param element element to send
  1612. * @param status status to send with the new element
  1613. */
  1614. static void
  1615. send_client_element (struct Operation *op,
  1616. const struct GNUNET_SETU_Element *element,
  1617. enum GNUNET_SETU_Status status)
  1618. {
  1619. struct GNUNET_MQ_Envelope *ev;
  1620. struct GNUNET_SETU_ResultMessage *rm;
  1621. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1622. "sending element (size %u) to client\n",
  1623. element->size);
  1624. GNUNET_assert (0 != op->client_request_id);
  1625. ev = GNUNET_MQ_msg_extra (rm,
  1626. element->size,
  1627. GNUNET_MESSAGE_TYPE_SETU_RESULT);
  1628. if (NULL == ev)
  1629. {
  1630. GNUNET_MQ_discard (ev);
  1631. GNUNET_break (0);
  1632. return;
  1633. }
  1634. rm->result_status = htons (status);
  1635. rm->request_id = htonl (op->client_request_id);
  1636. rm->element_type = htons (element->element_type);
  1637. rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (
  1638. op->key_to_element));
  1639. GNUNET_memcpy (&rm[1],
  1640. element->data,
  1641. element->size);
  1642. GNUNET_MQ_send (op->set->cs->mq,
  1643. ev);
  1644. }
  1645. /**
  1646. * Tests if the operation is finished, and if so notify.
  1647. *
  1648. * @param op operation to check
  1649. */
  1650. static void
  1651. maybe_finish (struct Operation *op)
  1652. {
  1653. unsigned int num_demanded;
  1654. num_demanded = GNUNET_CONTAINER_multihashmap_size (
  1655. op->demanded_hashes);
  1656. if (PHASE_FINISH_WAITING == op->phase)
  1657. {
  1658. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1659. "In PHASE_FINISH_WAITING, pending %u demands\n",
  1660. num_demanded);
  1661. if (0 == num_demanded)
  1662. {
  1663. struct GNUNET_MQ_Envelope *ev;
  1664. op->phase = PHASE_DONE;
  1665. ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_DONE);
  1666. GNUNET_MQ_send (op->mq,
  1667. ev);
  1668. /* We now wait until the other peer sends P2P_OVER
  1669. * after it got all elements from us. */
  1670. }
  1671. }
  1672. if (PHASE_FINISH_CLOSING == op->phase)
  1673. {
  1674. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1675. "In PHASE_FINISH_CLOSING, pending %u demands\n",
  1676. num_demanded);
  1677. if (0 == num_demanded)
  1678. {
  1679. op->phase = PHASE_DONE;
  1680. send_client_done (op);
  1681. _GSS_operation_destroy2 (op);
  1682. }
  1683. }
  1684. }
  1685. /**
  1686. * Check an element message from a remote peer.
  1687. *
  1688. * @param cls the union operation
  1689. * @param emsg the message
  1690. */
  1691. static int
  1692. check_union_p2p_elements (void *cls,
  1693. const struct GNUNET_SETU_ElementMessage *emsg)
  1694. {
  1695. struct Operation *op = cls;
  1696. if (0 == GNUNET_CONTAINER_multihashmap_size (op->demanded_hashes))
  1697. {
  1698. GNUNET_break_op (0);
  1699. return GNUNET_SYSERR;
  1700. }
  1701. return GNUNET_OK;
  1702. }
  1703. /**
  1704. * Handle an element message from a remote peer.
  1705. * Sent by the other peer either because we decoded an IBF and placed a demand,
  1706. * or because the other peer switched to full set transmission.
  1707. *
  1708. * @param cls the union operation
  1709. * @param emsg the message
  1710. */
  1711. static void
  1712. handle_union_p2p_elements (void *cls,
  1713. const struct GNUNET_SETU_ElementMessage *emsg)
  1714. {
  1715. struct Operation *op = cls;
  1716. struct ElementEntry *ee;
  1717. struct KeyEntry *ke;
  1718. uint16_t element_size;
  1719. element_size = ntohs (emsg->header.size) - sizeof(struct
  1720. GNUNET_SETU_ElementMessage);
  1721. ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
  1722. GNUNET_memcpy (&ee[1],
  1723. &emsg[1],
  1724. element_size);
  1725. ee->element.size = element_size;
  1726. ee->element.data = &ee[1];
  1727. ee->element.element_type = ntohs (emsg->element_type);
  1728. ee->remote = GNUNET_YES;
  1729. GNUNET_SETU_element_hash (&ee->element,
  1730. &ee->element_hash);
  1731. if (GNUNET_NO ==
  1732. GNUNET_CONTAINER_multihashmap_remove (op->demanded_hashes,
  1733. &ee->element_hash,
  1734. NULL))
  1735. {
  1736. /* We got something we didn't demand, since it's not in our map. */
  1737. GNUNET_break_op (0);
  1738. fail_union_operation (op);
  1739. return;
  1740. }
  1741. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1742. "Got element (size %u, hash %s) from peer\n",
  1743. (unsigned int) element_size,
  1744. GNUNET_h2s (&ee->element_hash));
  1745. GNUNET_STATISTICS_update (_GSS_statistics,
  1746. "# received elements",
  1747. 1,
  1748. GNUNET_NO);
  1749. GNUNET_STATISTICS_update (_GSS_statistics,
  1750. "# exchanged elements",
  1751. 1,
  1752. GNUNET_NO);
  1753. op->received_total++;
  1754. ke = op_get_element (op,
  1755. &ee->element_hash);
  1756. if (NULL != ke)
  1757. {
  1758. /* Got repeated element. Should not happen since
  1759. * we track demands. */
  1760. GNUNET_STATISTICS_update (_GSS_statistics,
  1761. "# repeated elements",
  1762. 1,
  1763. GNUNET_NO);
  1764. ke->received = GNUNET_YES;
  1765. GNUNET_free (ee);
  1766. }
  1767. else
  1768. {
  1769. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1770. "Registering new element from remote peer\n");
  1771. op->received_fresh++;
  1772. op_register_element (op, ee, GNUNET_YES);
  1773. /* only send results immediately if the client wants it */
  1774. send_client_element (op,
  1775. &ee->element,
  1776. GNUNET_SETU_STATUS_ADD_LOCAL);
  1777. }
  1778. if ((op->received_total > 8) &&
  1779. (op->received_fresh < op->received_total / 3))
  1780. {
  1781. /* The other peer gave us lots of old elements, there's something wrong. */
  1782. GNUNET_break_op (0);
  1783. fail_union_operation (op);
  1784. return;
  1785. }
  1786. GNUNET_CADET_receive_done (op->channel);
  1787. maybe_finish (op);
  1788. }
  1789. /**
  1790. * Check a full element message from a remote peer.
  1791. *
  1792. * @param cls the union operation
  1793. * @param emsg the message
  1794. */
  1795. static int
  1796. check_union_p2p_full_element (void *cls,
  1797. const struct GNUNET_SETU_ElementMessage *emsg)
  1798. {
  1799. struct Operation *op = cls;
  1800. (void) op;
  1801. // FIXME: check that we expect full elements here?
  1802. return GNUNET_OK;
  1803. }
  1804. /**
  1805. * Handle an element message from a remote peer.
  1806. *
  1807. * @param cls the union operation
  1808. * @param emsg the message
  1809. */
  1810. static void
  1811. handle_union_p2p_full_element (void *cls,
  1812. const struct GNUNET_SETU_ElementMessage *emsg)
  1813. {
  1814. struct Operation *op = cls;
  1815. struct ElementEntry *ee;
  1816. struct KeyEntry *ke;
  1817. uint16_t element_size;
  1818. element_size = ntohs (emsg->header.size)
  1819. - sizeof(struct GNUNET_SETU_ElementMessage);
  1820. ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
  1821. GNUNET_memcpy (&ee[1], &emsg[1], element_size);
  1822. ee->element.size = element_size;
  1823. ee->element.data = &ee[1];
  1824. ee->element.element_type = ntohs (emsg->element_type);
  1825. ee->remote = GNUNET_YES;
  1826. GNUNET_SETU_element_hash (&ee->element,
  1827. &ee->element_hash);
  1828. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1829. "Got element (full diff, size %u, hash %s) from peer\n",
  1830. (unsigned int) element_size,
  1831. GNUNET_h2s (&ee->element_hash));
  1832. GNUNET_STATISTICS_update (_GSS_statistics,
  1833. "# received elements",
  1834. 1,
  1835. GNUNET_NO);
  1836. GNUNET_STATISTICS_update (_GSS_statistics,
  1837. "# exchanged elements",
  1838. 1,
  1839. GNUNET_NO);
  1840. op->received_total++;
  1841. ke = op_get_element (op,
  1842. &ee->element_hash);
  1843. if (NULL != ke)
  1844. {
  1845. /* Got repeated element. Should not happen since
  1846. * we track demands. */
  1847. GNUNET_STATISTICS_update (_GSS_statistics,
  1848. "# repeated elements",
  1849. 1,
  1850. GNUNET_NO);
  1851. ke->received = GNUNET_YES;
  1852. GNUNET_free (ee);
  1853. }
  1854. else
  1855. {
  1856. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1857. "Registering new element from remote peer\n");
  1858. op->received_fresh++;
  1859. op_register_element (op, ee, GNUNET_YES);
  1860. /* only send results immediately if the client wants it */
  1861. send_client_element (op,
  1862. &ee->element,
  1863. GNUNET_SETU_STATUS_ADD_LOCAL);
  1864. }
  1865. if ((GNUNET_YES == op->byzantine) &&
  1866. (op->received_total > 384 + op->received_fresh * 4) &&
  1867. (op->received_fresh < op->received_total / 6))
  1868. {
  1869. /* The other peer gave us lots of old elements, there's something wrong. */
  1870. LOG (GNUNET_ERROR_TYPE_ERROR,
  1871. "Other peer sent only %llu/%llu fresh elements, failing operation\n",
  1872. (unsigned long long) op->received_fresh,
  1873. (unsigned long long) op->received_total);
  1874. GNUNET_break_op (0);
  1875. fail_union_operation (op);
  1876. return;
  1877. }
  1878. GNUNET_CADET_receive_done (op->channel);
  1879. }
  1880. /**
  1881. * Send offers (for GNUNET_Hash-es) in response
  1882. * to inquiries (for IBF_Key-s).
  1883. *
  1884. * @param cls the union operation
  1885. * @param msg the message
  1886. */
  1887. static int
  1888. check_union_p2p_inquiry (void *cls,
  1889. const struct InquiryMessage *msg)
  1890. {
  1891. struct Operation *op = cls;
  1892. unsigned int num_keys;
  1893. if (op->phase != PHASE_INVENTORY_PASSIVE)
  1894. {
  1895. GNUNET_break_op (0);
  1896. return GNUNET_SYSERR;
  1897. }
  1898. num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
  1899. / sizeof(struct IBF_Key);
  1900. if ((ntohs (msg->header.size) - sizeof(struct InquiryMessage))
  1901. != num_keys * sizeof(struct IBF_Key))
  1902. {
  1903. GNUNET_break_op (0);
  1904. return GNUNET_SYSERR;
  1905. }
  1906. return GNUNET_OK;
  1907. }
  1908. /**
  1909. * Send offers (for GNUNET_Hash-es) in response to inquiries (for IBF_Key-s).
  1910. *
  1911. * @param cls the union operation
  1912. * @param msg the message
  1913. */
  1914. static void
  1915. handle_union_p2p_inquiry (void *cls,
  1916. const struct InquiryMessage *msg)
  1917. {
  1918. struct Operation *op = cls;
  1919. const struct IBF_Key *ibf_key;
  1920. unsigned int num_keys;
  1921. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1922. "Received union inquiry\n");
  1923. num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
  1924. / sizeof(struct IBF_Key);
  1925. ibf_key = (const struct IBF_Key *) &msg[1];
  1926. while (0 != num_keys--)
  1927. {
  1928. struct IBF_Key unsalted_key;
  1929. unsalt_key (ibf_key,
  1930. ntohl (msg->salt),
  1931. &unsalted_key);
  1932. send_offers_for_key (op,
  1933. unsalted_key);
  1934. ibf_key++;
  1935. }
  1936. GNUNET_CADET_receive_done (op->channel);
  1937. }
  1938. /**
  1939. * Iterator over hash map entries, called to destroy the linked list of
  1940. * colliding ibf key entries.
  1941. *
  1942. * @param cls closure
  1943. * @param key current key code
  1944. * @param value value in the hash map
  1945. * @return #GNUNET_YES if we should continue to iterate,
  1946. * #GNUNET_NO if not.
  1947. */
  1948. static int
  1949. send_missing_full_elements_iter (void *cls,
  1950. uint32_t key,
  1951. void *value)
  1952. {
  1953. struct Operation *op = cls;
  1954. struct KeyEntry *ke = value;
  1955. struct GNUNET_MQ_Envelope *ev;
  1956. struct GNUNET_SETU_ElementMessage *emsg;
  1957. struct ElementEntry *ee = ke->element;
  1958. if (GNUNET_YES == ke->received)
  1959. return GNUNET_YES;
  1960. ev = GNUNET_MQ_msg_extra (emsg,
  1961. ee->element.size,
  1962. GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT);
  1963. GNUNET_memcpy (&emsg[1],
  1964. ee->element.data,
  1965. ee->element.size);
  1966. emsg->element_type = htons (ee->element.element_type);
  1967. GNUNET_MQ_send (op->mq,
  1968. ev);
  1969. return GNUNET_YES;
  1970. }
  1971. /**
  1972. * Handle a request for full set transmission.
  1973. *
  1974. * @parem cls closure, a set union operation
  1975. * @param mh the demand message
  1976. */
  1977. static void
  1978. handle_union_p2p_request_full (void *cls,
  1979. const struct GNUNET_MessageHeader *mh)
  1980. {
  1981. struct Operation *op = cls;
  1982. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1983. "Received request for full set transmission\n");
  1984. if (PHASE_EXPECT_IBF != op->phase)
  1985. {
  1986. GNUNET_break_op (0);
  1987. fail_union_operation (op);
  1988. return;
  1989. }
  1990. // FIXME: we need to check that our set is larger than the
  1991. // byzantine_lower_bound by some threshold
  1992. send_full_set (op);
  1993. GNUNET_CADET_receive_done (op->channel);
  1994. }
  1995. /**
  1996. * Handle a "full done" message.
  1997. *
  1998. * @parem cls closure, a set union operation
  1999. * @param mh the demand message
  2000. */
  2001. static void
  2002. handle_union_p2p_full_done (void *cls,
  2003. const struct GNUNET_MessageHeader *mh)
  2004. {
  2005. struct Operation *op = cls;
  2006. switch (op->phase)
  2007. {
  2008. case PHASE_EXPECT_IBF:
  2009. {
  2010. struct GNUNET_MQ_Envelope *ev;
  2011. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2012. "got FULL DONE, sending elements that other peer is missing\n");
  2013. /* send all the elements that did not come from the remote peer */
  2014. GNUNET_CONTAINER_multihashmap32_iterate (op->key_to_element,
  2015. &send_missing_full_elements_iter,
  2016. op);
  2017. ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE);
  2018. GNUNET_MQ_send (op->mq,
  2019. ev);
  2020. op->phase = PHASE_DONE;
  2021. /* we now wait until the other peer sends us the OVER message*/
  2022. }
  2023. break;
  2024. case PHASE_FULL_SENDING:
  2025. {
  2026. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2027. "got FULL DONE, finishing\n");
  2028. /* We sent the full set, and got the response for that. We're done. */
  2029. op->phase = PHASE_DONE;
  2030. GNUNET_CADET_receive_done (op->channel);
  2031. send_client_done (op);
  2032. _GSS_operation_destroy2 (op);
  2033. return;
  2034. }
  2035. break;
  2036. default:
  2037. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2038. "Handle full done phase is %u\n",
  2039. (unsigned) op->phase);
  2040. GNUNET_break_op (0);
  2041. fail_union_operation (op);
  2042. return;
  2043. }
  2044. GNUNET_CADET_receive_done (op->channel);
  2045. }
  2046. /**
  2047. * Check a demand by the other peer for elements based on a list
  2048. * of `struct GNUNET_HashCode`s.
  2049. *
  2050. * @parem cls closure, a set union operation
  2051. * @param mh the demand message
  2052. * @return #GNUNET_OK if @a mh is well-formed
  2053. */
  2054. static int
  2055. check_union_p2p_demand (void *cls,
  2056. const struct GNUNET_MessageHeader *mh)
  2057. {
  2058. struct Operation *op = cls;
  2059. unsigned int num_hashes;
  2060. (void) op;
  2061. num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
  2062. / sizeof(struct GNUNET_HashCode);
  2063. if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
  2064. != num_hashes * sizeof(struct GNUNET_HashCode))
  2065. {
  2066. GNUNET_break_op (0);
  2067. return GNUNET_SYSERR;
  2068. }
  2069. return GNUNET_OK;
  2070. }
  2071. /**
  2072. * Handle a demand by the other peer for elements based on a list
  2073. * of `struct GNUNET_HashCode`s.
  2074. *
  2075. * @parem cls closure, a set union operation
  2076. * @param mh the demand message
  2077. */
  2078. static void
  2079. handle_union_p2p_demand (void *cls,
  2080. const struct GNUNET_MessageHeader *mh)
  2081. {
  2082. struct Operation *op = cls;
  2083. struct ElementEntry *ee;
  2084. struct GNUNET_SETU_ElementMessage *emsg;
  2085. const struct GNUNET_HashCode *hash;
  2086. unsigned int num_hashes;
  2087. struct GNUNET_MQ_Envelope *ev;
  2088. num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
  2089. / sizeof(struct GNUNET_HashCode);
  2090. for (hash = (const struct GNUNET_HashCode *) &mh[1];
  2091. num_hashes > 0;
  2092. hash++, num_hashes--)
  2093. {
  2094. ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
  2095. hash);
  2096. if (NULL == ee)
  2097. {
  2098. /* Demand for non-existing element. */
  2099. GNUNET_break_op (0);
  2100. fail_union_operation (op);
  2101. return;
  2102. }
  2103. if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
  2104. {
  2105. /* Probably confused lazily copied sets. */
  2106. GNUNET_break_op (0);
  2107. fail_union_operation (op);
  2108. return;
  2109. }
  2110. ev = GNUNET_MQ_msg_extra (emsg,
  2111. ee->element.size,
  2112. GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS);
  2113. GNUNET_memcpy (&emsg[1],
  2114. ee->element.data,
  2115. ee->element.size);
  2116. emsg->reserved = htons (0);
  2117. emsg->element_type = htons (ee->element.element_type);
  2118. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2119. "[OP %p] Sending demanded element (size %u, hash %s) to peer\n",
  2120. op,
  2121. (unsigned int) ee->element.size,
  2122. GNUNET_h2s (&ee->element_hash));
  2123. GNUNET_MQ_send (op->mq, ev);
  2124. GNUNET_STATISTICS_update (_GSS_statistics,
  2125. "# exchanged elements",
  2126. 1,
  2127. GNUNET_NO);
  2128. if (op->symmetric)
  2129. send_client_element (op,
  2130. &ee->element,
  2131. GNUNET_SET_STATUS_ADD_REMOTE);
  2132. }
  2133. GNUNET_CADET_receive_done (op->channel);
  2134. }
  2135. /**
  2136. * Check offer (of `struct GNUNET_HashCode`s).
  2137. *
  2138. * @param cls the union operation
  2139. * @param mh the message
  2140. * @return #GNUNET_OK if @a mh is well-formed
  2141. */
  2142. static int
  2143. check_union_p2p_offer (void *cls,
  2144. const struct GNUNET_MessageHeader *mh)
  2145. {
  2146. struct Operation *op = cls;
  2147. unsigned int num_hashes;
  2148. /* look up elements and send them */
  2149. if ((op->phase != PHASE_INVENTORY_PASSIVE) &&
  2150. (op->phase != PHASE_INVENTORY_ACTIVE))
  2151. {
  2152. GNUNET_break_op (0);
  2153. return GNUNET_SYSERR;
  2154. }
  2155. num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
  2156. / sizeof(struct GNUNET_HashCode);
  2157. if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) !=
  2158. num_hashes * sizeof(struct GNUNET_HashCode))
  2159. {
  2160. GNUNET_break_op (0);
  2161. return GNUNET_SYSERR;
  2162. }
  2163. return GNUNET_OK;
  2164. }
  2165. /**
  2166. * Handle offers (of `struct GNUNET_HashCode`s) and
  2167. * respond with demands (of `struct GNUNET_HashCode`s).
  2168. *
  2169. * @param cls the union operation
  2170. * @param mh the message
  2171. */
  2172. static void
  2173. handle_union_p2p_offer (void *cls,
  2174. const struct GNUNET_MessageHeader *mh)
  2175. {
  2176. struct Operation *op = cls;
  2177. const struct GNUNET_HashCode *hash;
  2178. unsigned int num_hashes;
  2179. num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
  2180. / sizeof(struct GNUNET_HashCode);
  2181. for (hash = (const struct GNUNET_HashCode *) &mh[1];
  2182. num_hashes > 0;
  2183. hash++, num_hashes--)
  2184. {
  2185. struct ElementEntry *ee;
  2186. struct GNUNET_MessageHeader *demands;
  2187. struct GNUNET_MQ_Envelope *ev;
  2188. ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
  2189. hash);
  2190. if (NULL != ee)
  2191. if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
  2192. continue;
  2193. if (GNUNET_YES ==
  2194. GNUNET_CONTAINER_multihashmap_contains (op->demanded_hashes,
  2195. hash))
  2196. {
  2197. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2198. "Skipped sending duplicate demand\n");
  2199. continue;
  2200. }
  2201. GNUNET_assert (GNUNET_OK ==
  2202. GNUNET_CONTAINER_multihashmap_put (
  2203. op->demanded_hashes,
  2204. hash,
  2205. NULL,
  2206. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
  2207. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2208. "[OP %p] Requesting element (hash %s)\n",
  2209. op, GNUNET_h2s (hash));
  2210. ev = GNUNET_MQ_msg_header_extra (demands,
  2211. sizeof(struct GNUNET_HashCode),
  2212. GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND);
  2213. GNUNET_memcpy (&demands[1],
  2214. hash,
  2215. sizeof(struct GNUNET_HashCode));
  2216. GNUNET_MQ_send (op->mq, ev);
  2217. }
  2218. GNUNET_CADET_receive_done (op->channel);
  2219. }
  2220. /**
  2221. * Handle a done message from a remote peer
  2222. *
  2223. * @param cls the union operation
  2224. * @param mh the message
  2225. */
  2226. static void
  2227. handle_union_p2p_done (void *cls,
  2228. const struct GNUNET_MessageHeader *mh)
  2229. {
  2230. struct Operation *op = cls;
  2231. switch (op->phase)
  2232. {
  2233. case PHASE_INVENTORY_PASSIVE:
  2234. /* We got all requests, but still have to send our elements in response. */
  2235. op->phase = PHASE_FINISH_WAITING;
  2236. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2237. "got DONE (as passive partner), waiting for our demands to be satisfied\n");
  2238. /* The active peer is done sending offers
  2239. * and inquiries. This means that all
  2240. * our responses to that (demands and offers)
  2241. * must be in flight (queued or in mesh).
  2242. *
  2243. * We should notify the active peer once
  2244. * all our demands are satisfied, so that the active
  2245. * peer can quit if we gave it everything.
  2246. */GNUNET_CADET_receive_done (op->channel);
  2247. maybe_finish (op);
  2248. return;
  2249. case PHASE_INVENTORY_ACTIVE:
  2250. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2251. "got DONE (as active partner), waiting to finish\n");
  2252. /* All demands of the other peer are satisfied,
  2253. * and we processed all offers, thus we know
  2254. * exactly what our demands must be.
  2255. *
  2256. * We'll close the channel
  2257. * to the other peer once our demands are met.
  2258. */op->phase = PHASE_FINISH_CLOSING;
  2259. GNUNET_CADET_receive_done (op->channel);
  2260. maybe_finish (op);
  2261. return;
  2262. default:
  2263. GNUNET_break_op (0);
  2264. fail_union_operation (op);
  2265. return;
  2266. }
  2267. }
  2268. /**
  2269. * Handle a over message from a remote peer
  2270. *
  2271. * @param cls the union operation
  2272. * @param mh the message
  2273. */
  2274. static void
  2275. handle_union_p2p_over (void *cls,
  2276. const struct GNUNET_MessageHeader *mh)
  2277. {
  2278. send_client_done (cls);
  2279. }
  2280. /**
  2281. * Get the incoming socket associated with the given id.
  2282. *
  2283. * @param listener the listener to look in
  2284. * @param id id to look for
  2285. * @return the incoming socket associated with the id,
  2286. * or NULL if there is none
  2287. */
  2288. static struct Operation *
  2289. get_incoming (uint32_t id)
  2290. {
  2291. for (struct Listener *listener = listener_head;
  2292. NULL != listener;
  2293. listener = listener->next)
  2294. {
  2295. for (struct Operation *op = listener->op_head;
  2296. NULL != op;
  2297. op = op->next)
  2298. if (op->suggest_id == id)
  2299. return op;
  2300. }
  2301. return NULL;
  2302. }
  2303. /**
  2304. * Callback called when a client connects to the service.
  2305. *
  2306. * @param cls closure for the service
  2307. * @param c the new client that connected to the service
  2308. * @param mq the message queue used to send messages to the client
  2309. * @return @a `struct ClientState`
  2310. */
  2311. static void *
  2312. client_connect_cb (void *cls,
  2313. struct GNUNET_SERVICE_Client *c,
  2314. struct GNUNET_MQ_Handle *mq)
  2315. {
  2316. struct ClientState *cs;
  2317. num_clients++;
  2318. cs = GNUNET_new (struct ClientState);
  2319. cs->client = c;
  2320. cs->mq = mq;
  2321. return cs;
  2322. }
  2323. /**
  2324. * Iterator over hash map entries to free element entries.
  2325. *
  2326. * @param cls closure
  2327. * @param key current key code
  2328. * @param value a `struct ElementEntry *` to be free'd
  2329. * @return #GNUNET_YES (continue to iterate)
  2330. */
  2331. static int
  2332. destroy_elements_iterator (void *cls,
  2333. const struct GNUNET_HashCode *key,
  2334. void *value)
  2335. {
  2336. struct ElementEntry *ee = value;
  2337. GNUNET_free (ee);
  2338. return GNUNET_YES;
  2339. }
  2340. /**
  2341. * Clean up after a client has disconnected
  2342. *
  2343. * @param cls closure, unused
  2344. * @param client the client to clean up after
  2345. * @param internal_cls the `struct ClientState`
  2346. */
  2347. static void
  2348. client_disconnect_cb (void *cls,
  2349. struct GNUNET_SERVICE_Client *client,
  2350. void *internal_cls)
  2351. {
  2352. struct ClientState *cs = internal_cls;
  2353. struct Operation *op;
  2354. struct Listener *listener;
  2355. struct Set *set;
  2356. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2357. "Client disconnected, cleaning up\n");
  2358. if (NULL != (set = cs->set))
  2359. {
  2360. struct SetContent *content = set->content;
  2361. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2362. "Destroying client's set\n");
  2363. /* Destroy pending set operations */
  2364. while (NULL != set->ops_head)
  2365. _GSS_operation_destroy (set->ops_head);
  2366. /* Destroy operation-specific state */
  2367. if (NULL != set->se)
  2368. {
  2369. strata_estimator_destroy (set->se);
  2370. set->se = NULL;
  2371. }
  2372. /* free set content (or at least decrement RC) */
  2373. set->content = NULL;
  2374. GNUNET_assert (0 != content->refcount);
  2375. content->refcount--;
  2376. if (0 == content->refcount)
  2377. {
  2378. GNUNET_assert (NULL != content->elements);
  2379. GNUNET_CONTAINER_multihashmap_iterate (content->elements,
  2380. &destroy_elements_iterator,
  2381. NULL);
  2382. GNUNET_CONTAINER_multihashmap_destroy (content->elements);
  2383. content->elements = NULL;
  2384. GNUNET_free (content);
  2385. }
  2386. GNUNET_free (set);
  2387. }
  2388. if (NULL != (listener = cs->listener))
  2389. {
  2390. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2391. "Destroying client's listener\n");
  2392. GNUNET_CADET_close_port (listener->open_port);
  2393. listener->open_port = NULL;
  2394. while (NULL != (op = listener->op_head))
  2395. {
  2396. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  2397. "Destroying incoming operation `%u' from peer `%s'\n",
  2398. (unsigned int) op->client_request_id,
  2399. GNUNET_i2s (&op->peer));
  2400. incoming_destroy (op);
  2401. }
  2402. GNUNET_CONTAINER_DLL_remove (listener_head,
  2403. listener_tail,
  2404. listener);
  2405. GNUNET_free (listener);
  2406. }
  2407. GNUNET_free (cs);
  2408. num_clients--;
  2409. if ( (GNUNET_YES == in_shutdown) &&
  2410. (0 == num_clients) )
  2411. {
  2412. if (NULL != cadet)
  2413. {
  2414. GNUNET_CADET_disconnect (cadet);
  2415. cadet = NULL;
  2416. }
  2417. }
  2418. }
  2419. /**
  2420. * Check a request for a set operation from another peer.
  2421. *
  2422. * @param cls the operation state
  2423. * @param msg the received message
  2424. * @return #GNUNET_OK if the channel should be kept alive,
  2425. * #GNUNET_SYSERR to destroy the channel
  2426. */
  2427. static int
  2428. check_incoming_msg (void *cls,
  2429. const struct OperationRequestMessage *msg)
  2430. {
  2431. struct Operation *op = cls;
  2432. struct Listener *listener = op->listener;
  2433. const struct GNUNET_MessageHeader *nested_context;
  2434. /* double operation request */
  2435. if (0 != op->suggest_id)
  2436. {
  2437. GNUNET_break_op (0);
  2438. return GNUNET_SYSERR;
  2439. }
  2440. /* This should be equivalent to the previous condition, but can't hurt to check twice */
  2441. if (NULL == listener)
  2442. {
  2443. GNUNET_break (0);
  2444. return GNUNET_SYSERR;
  2445. }
  2446. nested_context = GNUNET_MQ_extract_nested_mh (msg);
  2447. if ((NULL != nested_context) &&
  2448. (ntohs (nested_context->size) > GNUNET_SETU_CONTEXT_MESSAGE_MAX_SIZE))
  2449. {
  2450. GNUNET_break_op (0);
  2451. return GNUNET_SYSERR;
  2452. }
  2453. return GNUNET_OK;
  2454. }
  2455. /**
  2456. * Handle a request for a set operation from another peer. Checks if we
  2457. * have a listener waiting for such a request (and in that case initiates
  2458. * asking the listener about accepting the connection). If no listener
  2459. * is waiting, we queue the operation request in hope that a listener
  2460. * shows up soon (before timeout).
  2461. *
  2462. * This msg is expected as the first and only msg handled through the
  2463. * non-operation bound virtual table, acceptance of this operation replaces
  2464. * our virtual table and subsequent msgs would be routed differently (as
  2465. * we then know what type of operation this is).
  2466. *
  2467. * @param cls the operation state
  2468. * @param msg the received message
  2469. */
  2470. static void
  2471. handle_incoming_msg (void *cls,
  2472. const struct OperationRequestMessage *msg)
  2473. {
  2474. struct Operation *op = cls;
  2475. struct Listener *listener = op->listener;
  2476. const struct GNUNET_MessageHeader *nested_context;
  2477. struct GNUNET_MQ_Envelope *env;
  2478. struct GNUNET_SETU_RequestMessage *cmsg;
  2479. nested_context = GNUNET_MQ_extract_nested_mh (msg);
  2480. /* Make a copy of the nested_context (application-specific context
  2481. information that is opaque to set) so we can pass it to the
  2482. listener later on */
  2483. if (NULL != nested_context)
  2484. op->context_msg = GNUNET_copy_message (nested_context);
  2485. op->remote_element_count = ntohl (msg->element_count);
  2486. GNUNET_log (
  2487. GNUNET_ERROR_TYPE_DEBUG,
  2488. "Received P2P operation request (port %s) for active listener\n",
  2489. GNUNET_h2s (&op->listener->app_id));
  2490. GNUNET_assert (0 == op->suggest_id);
  2491. if (0 == suggest_id)
  2492. suggest_id++;
  2493. op->suggest_id = suggest_id++;
  2494. GNUNET_assert (NULL != op->timeout_task);
  2495. GNUNET_SCHEDULER_cancel (op->timeout_task);
  2496. op->timeout_task = NULL;
  2497. env = GNUNET_MQ_msg_nested_mh (cmsg,
  2498. GNUNET_MESSAGE_TYPE_SETU_REQUEST,
  2499. op->context_msg);
  2500. GNUNET_log (
  2501. GNUNET_ERROR_TYPE_DEBUG,
  2502. "Suggesting incoming request with accept id %u to listener %p of client %p\n",
  2503. op->suggest_id,
  2504. listener,
  2505. listener->cs);
  2506. cmsg->accept_id = htonl (op->suggest_id);
  2507. cmsg->peer_id = op->peer;
  2508. GNUNET_MQ_send (listener->cs->mq,
  2509. env);
  2510. /* NOTE: GNUNET_CADET_receive_done() will be called in
  2511. #handle_client_accept() */
  2512. }
  2513. /**
  2514. * Called when a client wants to create a new set. This is typically
  2515. * the first request from a client, and includes the type of set
  2516. * operation to be performed.
  2517. *
  2518. * @param cls client that sent the message
  2519. * @param m message sent by the client
  2520. */
  2521. static void
  2522. handle_client_create_set (void *cls,
  2523. const struct GNUNET_SETU_CreateMessage *msg)
  2524. {
  2525. struct ClientState *cs = cls;
  2526. struct Set *set;
  2527. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2528. "Client created new set for union operation\n");
  2529. if (NULL != cs->set)
  2530. {
  2531. /* There can only be one set per client */
  2532. GNUNET_break (0);
  2533. GNUNET_SERVICE_client_drop (cs->client);
  2534. return;
  2535. }
  2536. set = GNUNET_new (struct Set);
  2537. {
  2538. struct StrataEstimator *se;
  2539. se = strata_estimator_create (SE_STRATA_COUNT,
  2540. SE_IBF_SIZE,
  2541. SE_IBF_HASH_NUM);
  2542. if (NULL == se)
  2543. {
  2544. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  2545. "Failed to allocate strata estimator\n");
  2546. GNUNET_free (set);
  2547. GNUNET_SERVICE_client_drop (cs->client);
  2548. return;
  2549. }
  2550. set->se = se;
  2551. }
  2552. set->content = GNUNET_new (struct SetContent);
  2553. set->content->refcount = 1;
  2554. set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
  2555. GNUNET_YES);
  2556. set->cs = cs;
  2557. cs->set = set;
  2558. GNUNET_SERVICE_client_continue (cs->client);
  2559. }
  2560. /**
  2561. * Timeout happens iff:
  2562. * - we suggested an operation to our listener,
  2563. * but did not receive a response in time
  2564. * - we got the channel from a peer but no #GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST
  2565. *
  2566. * @param cls channel context
  2567. * @param tc context information (why was this task triggered now)
  2568. */
  2569. static void
  2570. incoming_timeout_cb (void *cls)
  2571. {
  2572. struct Operation *op = cls;
  2573. op->timeout_task = NULL;
  2574. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2575. "Remote peer's incoming request timed out\n");
  2576. incoming_destroy (op);
  2577. }
  2578. /**
  2579. * Method called whenever another peer has added us to a channel the
  2580. * other peer initiated. Only called (once) upon reception of data
  2581. * from a channel we listen on.
  2582. *
  2583. * The channel context represents the operation itself and gets added
  2584. * to a DLL, from where it gets looked up when our local listener
  2585. * client responds to a proposed/suggested operation or connects and
  2586. * associates with this operation.
  2587. *
  2588. * @param cls closure
  2589. * @param channel new handle to the channel
  2590. * @param source peer that started the channel
  2591. * @return initial channel context for the channel
  2592. * returns NULL on error
  2593. */
  2594. static void *
  2595. channel_new_cb (void *cls,
  2596. struct GNUNET_CADET_Channel *channel,
  2597. const struct GNUNET_PeerIdentity *source)
  2598. {
  2599. struct Listener *listener = cls;
  2600. struct Operation *op;
  2601. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2602. "New incoming channel\n");
  2603. op = GNUNET_new (struct Operation);
  2604. op->listener = listener;
  2605. op->peer = *source;
  2606. op->channel = channel;
  2607. op->mq = GNUNET_CADET_get_mq (op->channel);
  2608. op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
  2609. UINT32_MAX);
  2610. op->timeout_task = GNUNET_SCHEDULER_add_delayed (INCOMING_CHANNEL_TIMEOUT,
  2611. &incoming_timeout_cb,
  2612. op);
  2613. GNUNET_CONTAINER_DLL_insert (listener->op_head,
  2614. listener->op_tail,
  2615. op);
  2616. return op;
  2617. }
  2618. /**
  2619. * Function called whenever a channel is destroyed. Should clean up
  2620. * any associated state. It must NOT call
  2621. * GNUNET_CADET_channel_destroy() on the channel.
  2622. *
  2623. * The peer_disconnect function is part of a a virtual table set initially either
  2624. * when a peer creates a new channel with us, or once we create
  2625. * a new channel ourselves (evaluate).
  2626. *
  2627. * Once we know the exact type of operation (union/intersection), the vt is
  2628. * replaced with an operation specific instance (_GSS_[op]_vt).
  2629. *
  2630. * @param channel_ctx place where local state associated
  2631. * with the channel is stored
  2632. * @param channel connection to the other end (henceforth invalid)
  2633. */
  2634. static void
  2635. channel_end_cb (void *channel_ctx,
  2636. const struct GNUNET_CADET_Channel *channel)
  2637. {
  2638. struct Operation *op = channel_ctx;
  2639. op->channel = NULL;
  2640. _GSS_operation_destroy2 (op);
  2641. }
  2642. /**
  2643. * Function called whenever an MQ-channel's transmission window size changes.
  2644. *
  2645. * The first callback in an outgoing channel will be with a non-zero value
  2646. * and will mean the channel is connected to the destination.
  2647. *
  2648. * For an incoming channel it will be called immediately after the
  2649. * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
  2650. *
  2651. * @param cls Channel closure.
  2652. * @param channel Connection to the other end (henceforth invalid).
  2653. * @param window_size New window size. If the is more messages than buffer size
  2654. * this value will be negative..
  2655. */
  2656. static void
  2657. channel_window_cb (void *cls,
  2658. const struct GNUNET_CADET_Channel *channel,
  2659. int window_size)
  2660. {
  2661. /* FIXME: not implemented, we could do flow control here... */
  2662. }
  2663. /**
  2664. * Called when a client wants to create a new listener.
  2665. *
  2666. * @param cls client that sent the message
  2667. * @param msg message sent by the client
  2668. */
  2669. static void
  2670. handle_client_listen (void *cls,
  2671. const struct GNUNET_SETU_ListenMessage *msg)
  2672. {
  2673. struct ClientState *cs = cls;
  2674. struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
  2675. GNUNET_MQ_hd_var_size (incoming_msg,
  2676. GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST,
  2677. struct OperationRequestMessage,
  2678. NULL),
  2679. GNUNET_MQ_hd_var_size (union_p2p_ibf,
  2680. GNUNET_MESSAGE_TYPE_SETU_P2P_IBF,
  2681. struct IBFMessage,
  2682. NULL),
  2683. GNUNET_MQ_hd_var_size (union_p2p_elements,
  2684. GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS,
  2685. struct GNUNET_SETU_ElementMessage,
  2686. NULL),
  2687. GNUNET_MQ_hd_var_size (union_p2p_offer,
  2688. GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER,
  2689. struct GNUNET_MessageHeader,
  2690. NULL),
  2691. GNUNET_MQ_hd_var_size (union_p2p_inquiry,
  2692. GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY,
  2693. struct InquiryMessage,
  2694. NULL),
  2695. GNUNET_MQ_hd_var_size (union_p2p_demand,
  2696. GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND,
  2697. struct GNUNET_MessageHeader,
  2698. NULL),
  2699. GNUNET_MQ_hd_fixed_size (union_p2p_done,
  2700. GNUNET_MESSAGE_TYPE_SETU_P2P_DONE,
  2701. struct GNUNET_MessageHeader,
  2702. NULL),
  2703. GNUNET_MQ_hd_fixed_size (union_p2p_over,
  2704. GNUNET_MESSAGE_TYPE_SETU_P2P_OVER,
  2705. struct GNUNET_MessageHeader,
  2706. NULL),
  2707. GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
  2708. GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE,
  2709. struct GNUNET_MessageHeader,
  2710. NULL),
  2711. GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
  2712. GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL,
  2713. struct GNUNET_MessageHeader,
  2714. NULL),
  2715. GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
  2716. GNUNET_MESSAGE_TYPE_SETU_P2P_SE,
  2717. struct StrataEstimatorMessage,
  2718. NULL),
  2719. GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
  2720. GNUNET_MESSAGE_TYPE_SETU_P2P_SEC,
  2721. struct StrataEstimatorMessage,
  2722. NULL),
  2723. GNUNET_MQ_hd_var_size (union_p2p_full_element,
  2724. GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT,
  2725. struct GNUNET_SETU_ElementMessage,
  2726. NULL),
  2727. GNUNET_MQ_handler_end ()
  2728. };
  2729. struct Listener *listener;
  2730. if (NULL != cs->listener)
  2731. {
  2732. /* max. one active listener per client! */
  2733. GNUNET_break (0);
  2734. GNUNET_SERVICE_client_drop (cs->client);
  2735. return;
  2736. }
  2737. listener = GNUNET_new (struct Listener);
  2738. listener->cs = cs;
  2739. cs->listener = listener;
  2740. listener->app_id = msg->app_id;
  2741. GNUNET_CONTAINER_DLL_insert (listener_head,
  2742. listener_tail,
  2743. listener);
  2744. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2745. "New listener created (port %s)\n",
  2746. GNUNET_h2s (&listener->app_id));
  2747. listener->open_port = GNUNET_CADET_open_port (cadet,
  2748. &msg->app_id,
  2749. &channel_new_cb,
  2750. listener,
  2751. &channel_window_cb,
  2752. &channel_end_cb,
  2753. cadet_handlers);
  2754. GNUNET_SERVICE_client_continue (cs->client);
  2755. }
  2756. /**
  2757. * Called when the listening client rejects an operation
  2758. * request by another peer.
  2759. *
  2760. * @param cls client that sent the message
  2761. * @param msg message sent by the client
  2762. */
  2763. static void
  2764. handle_client_reject (void *cls,
  2765. const struct GNUNET_SETU_RejectMessage *msg)
  2766. {
  2767. struct ClientState *cs = cls;
  2768. struct Operation *op;
  2769. op = get_incoming (ntohl (msg->accept_reject_id));
  2770. if (NULL == op)
  2771. {
  2772. /* no matching incoming operation for this reject;
  2773. could be that the other peer already disconnected... */
  2774. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  2775. "Client rejected unknown operation %u\n",
  2776. (unsigned int) ntohl (msg->accept_reject_id));
  2777. GNUNET_SERVICE_client_continue (cs->client);
  2778. return;
  2779. }
  2780. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2781. "Peer request (app %s) rejected by client\n",
  2782. GNUNET_h2s (&cs->listener->app_id));
  2783. _GSS_operation_destroy2 (op);
  2784. GNUNET_SERVICE_client_continue (cs->client);
  2785. }
  2786. /**
  2787. * Called when a client wants to add or remove an element to a set it inhabits.
  2788. *
  2789. * @param cls client that sent the message
  2790. * @param msg message sent by the client
  2791. */
  2792. static int
  2793. check_client_set_add (void *cls,
  2794. const struct GNUNET_SETU_ElementMessage *msg)
  2795. {
  2796. /* NOTE: Technically, we should probably check with the
  2797. block library whether the element we are given is well-formed */
  2798. return GNUNET_OK;
  2799. }
  2800. /**
  2801. * Called when a client wants to add or remove an element to a set it inhabits.
  2802. *
  2803. * @param cls client that sent the message
  2804. * @param msg message sent by the client
  2805. */
  2806. static void
  2807. handle_client_set_add (void *cls,
  2808. const struct GNUNET_SETU_ElementMessage *msg)
  2809. {
  2810. struct ClientState *cs = cls;
  2811. struct Set *set;
  2812. struct GNUNET_SETU_Element el;
  2813. struct ElementEntry *ee;
  2814. struct GNUNET_HashCode hash;
  2815. if (NULL == (set = cs->set))
  2816. {
  2817. /* client without a set requested an operation */
  2818. GNUNET_break (0);
  2819. GNUNET_SERVICE_client_drop (cs->client);
  2820. return;
  2821. }
  2822. GNUNET_SERVICE_client_continue (cs->client);
  2823. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n");
  2824. el.size = ntohs (msg->header.size) - sizeof(*msg);
  2825. el.data = &msg[1];
  2826. el.element_type = ntohs (msg->element_type);
  2827. GNUNET_SETU_element_hash (&el,
  2828. &hash);
  2829. ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
  2830. &hash);
  2831. if (NULL == ee)
  2832. {
  2833. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2834. "Client inserts element %s of size %u\n",
  2835. GNUNET_h2s (&hash),
  2836. el.size);
  2837. ee = GNUNET_malloc (el.size + sizeof(*ee));
  2838. ee->element.size = el.size;
  2839. GNUNET_memcpy (&ee[1], el.data, el.size);
  2840. ee->element.data = &ee[1];
  2841. ee->element.element_type = el.element_type;
  2842. ee->remote = GNUNET_NO;
  2843. ee->generation = set->current_generation;
  2844. ee->element_hash = hash;
  2845. GNUNET_break (GNUNET_YES ==
  2846. GNUNET_CONTAINER_multihashmap_put (
  2847. set->content->elements,
  2848. &ee->element_hash,
  2849. ee,
  2850. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
  2851. }
  2852. else
  2853. {
  2854. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2855. "Client inserted element %s of size %u twice (ignored)\n",
  2856. GNUNET_h2s (&hash),
  2857. el.size);
  2858. /* same element inserted twice */
  2859. return;
  2860. }
  2861. strata_estimator_insert (set->se,
  2862. get_ibf_key (&ee->element_hash));
  2863. }
  2864. /**
  2865. * Advance the current generation of a set,
  2866. * adding exclusion ranges if necessary.
  2867. *
  2868. * @param set the set where we want to advance the generation
  2869. */
  2870. static void
  2871. advance_generation (struct Set *set)
  2872. {
  2873. set->content->latest_generation++;
  2874. set->current_generation++;
  2875. }
  2876. /**
  2877. * Called when a client wants to initiate a set operation with another
  2878. * peer. Initiates the CADET connection to the listener and sends the
  2879. * request.
  2880. *
  2881. * @param cls client that sent the message
  2882. * @param msg message sent by the client
  2883. * @return #GNUNET_OK if the message is well-formed
  2884. */
  2885. static int
  2886. check_client_evaluate (void *cls,
  2887. const struct GNUNET_SETU_EvaluateMessage *msg)
  2888. {
  2889. /* FIXME: suboptimal, even if the context below could be NULL,
  2890. there are malformed messages this does not check for... */
  2891. return GNUNET_OK;
  2892. }
  2893. /**
  2894. * Called when a client wants to initiate a set operation with another
  2895. * peer. Initiates the CADET connection to the listener and sends the
  2896. * request.
  2897. *
  2898. * @param cls client that sent the message
  2899. * @param msg message sent by the client
  2900. */
  2901. static void
  2902. handle_client_evaluate (void *cls,
  2903. const struct GNUNET_SETU_EvaluateMessage *msg)
  2904. {
  2905. struct ClientState *cs = cls;
  2906. struct Operation *op = GNUNET_new (struct Operation);
  2907. const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
  2908. GNUNET_MQ_hd_var_size (incoming_msg,
  2909. GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST,
  2910. struct OperationRequestMessage,
  2911. op),
  2912. GNUNET_MQ_hd_var_size (union_p2p_ibf,
  2913. GNUNET_MESSAGE_TYPE_SETU_P2P_IBF,
  2914. struct IBFMessage,
  2915. op),
  2916. GNUNET_MQ_hd_var_size (union_p2p_elements,
  2917. GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS,
  2918. struct GNUNET_SETU_ElementMessage,
  2919. op),
  2920. GNUNET_MQ_hd_var_size (union_p2p_offer,
  2921. GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER,
  2922. struct GNUNET_MessageHeader,
  2923. op),
  2924. GNUNET_MQ_hd_var_size (union_p2p_inquiry,
  2925. GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY,
  2926. struct InquiryMessage,
  2927. op),
  2928. GNUNET_MQ_hd_var_size (union_p2p_demand,
  2929. GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND,
  2930. struct GNUNET_MessageHeader,
  2931. op),
  2932. GNUNET_MQ_hd_fixed_size (union_p2p_done,
  2933. GNUNET_MESSAGE_TYPE_SETU_P2P_DONE,
  2934. struct GNUNET_MessageHeader,
  2935. op),
  2936. GNUNET_MQ_hd_fixed_size (union_p2p_over,
  2937. GNUNET_MESSAGE_TYPE_SETU_P2P_OVER,
  2938. struct GNUNET_MessageHeader,
  2939. op),
  2940. GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
  2941. GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE,
  2942. struct GNUNET_MessageHeader,
  2943. op),
  2944. GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
  2945. GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL,
  2946. struct GNUNET_MessageHeader,
  2947. op),
  2948. GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
  2949. GNUNET_MESSAGE_TYPE_SETU_P2P_SE,
  2950. struct StrataEstimatorMessage,
  2951. op),
  2952. GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
  2953. GNUNET_MESSAGE_TYPE_SETU_P2P_SEC,
  2954. struct StrataEstimatorMessage,
  2955. op),
  2956. GNUNET_MQ_hd_var_size (union_p2p_full_element,
  2957. GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT,
  2958. struct GNUNET_SETU_ElementMessage,
  2959. op),
  2960. GNUNET_MQ_handler_end ()
  2961. };
  2962. struct Set *set;
  2963. const struct GNUNET_MessageHeader *context;
  2964. if (NULL == (set = cs->set))
  2965. {
  2966. GNUNET_break (0);
  2967. GNUNET_free (op);
  2968. GNUNET_SERVICE_client_drop (cs->client);
  2969. return;
  2970. }
  2971. op->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
  2972. UINT32_MAX);
  2973. op->peer = msg->target_peer;
  2974. op->client_request_id = ntohl (msg->request_id);
  2975. op->byzantine = msg->byzantine;
  2976. op->byzantine_lower_bound = ntohl (msg->byzantine_lower_bound);
  2977. op->force_full = msg->force_full;
  2978. op->force_delta = msg->force_delta;
  2979. op->symmetric = msg->symmetric;
  2980. context = GNUNET_MQ_extract_nested_mh (msg);
  2981. /* Advance generation values, so that
  2982. mutations won't interfer with the running operation. */
  2983. op->set = set;
  2984. op->generation_created = set->current_generation;
  2985. advance_generation (set);
  2986. GNUNET_CONTAINER_DLL_insert (set->ops_head,
  2987. set->ops_tail,
  2988. op);
  2989. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2990. "Creating new CADET channel to port %s for set union\n",
  2991. GNUNET_h2s (&msg->app_id));
  2992. op->channel = GNUNET_CADET_channel_create (cadet,
  2993. op,
  2994. &msg->target_peer,
  2995. &msg->app_id,
  2996. &channel_window_cb,
  2997. &channel_end_cb,
  2998. cadet_handlers);
  2999. op->mq = GNUNET_CADET_get_mq (op->channel);
  3000. {
  3001. struct GNUNET_MQ_Envelope *ev;
  3002. struct OperationRequestMessage *msg;
  3003. ev = GNUNET_MQ_msg_nested_mh (msg,
  3004. GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST,
  3005. context);
  3006. if (NULL == ev)
  3007. {
  3008. /* the context message is too large */
  3009. GNUNET_break (0);
  3010. GNUNET_SERVICE_client_drop (cs->client);
  3011. return;
  3012. }
  3013. op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
  3014. GNUNET_NO);
  3015. /* copy the current generation's strata estimator for this operation */
  3016. op->se = strata_estimator_dup (op->set->se);
  3017. /* we started the operation, thus we have to send the operation request */
  3018. op->phase = PHASE_EXPECT_SE;
  3019. op->salt_receive = op->salt_send = 42; // FIXME?????
  3020. LOG (GNUNET_ERROR_TYPE_DEBUG,
  3021. "Initiating union operation evaluation\n");
  3022. GNUNET_STATISTICS_update (_GSS_statistics,
  3023. "# of total union operations",
  3024. 1,
  3025. GNUNET_NO);
  3026. GNUNET_STATISTICS_update (_GSS_statistics,
  3027. "# of initiated union operations",
  3028. 1,
  3029. GNUNET_NO);
  3030. GNUNET_MQ_send (op->mq,
  3031. ev);
  3032. if (NULL != context)
  3033. LOG (GNUNET_ERROR_TYPE_DEBUG,
  3034. "sent op request with context message\n");
  3035. else
  3036. LOG (GNUNET_ERROR_TYPE_DEBUG,
  3037. "sent op request without context message\n");
  3038. initialize_key_to_element (op);
  3039. op->initial_size = GNUNET_CONTAINER_multihashmap32_size (
  3040. op->key_to_element);
  3041. }
  3042. GNUNET_SERVICE_client_continue (cs->client);
  3043. }
  3044. /**
  3045. * Handle a request from the client to cancel a running set operation.
  3046. *
  3047. * @param cls the client
  3048. * @param msg the message
  3049. */
  3050. static void
  3051. handle_client_cancel (void *cls,
  3052. const struct GNUNET_SETU_CancelMessage *msg)
  3053. {
  3054. struct ClientState *cs = cls;
  3055. struct Set *set;
  3056. struct Operation *op;
  3057. int found;
  3058. if (NULL == (set = cs->set))
  3059. {
  3060. /* client without a set requested an operation */
  3061. GNUNET_break (0);
  3062. GNUNET_SERVICE_client_drop (cs->client);
  3063. return;
  3064. }
  3065. found = GNUNET_NO;
  3066. for (op = set->ops_head; NULL != op; op = op->next)
  3067. {
  3068. if (op->client_request_id == ntohl (msg->request_id))
  3069. {
  3070. found = GNUNET_YES;
  3071. break;
  3072. }
  3073. }
  3074. if (GNUNET_NO == found)
  3075. {
  3076. /* It may happen that the operation was already destroyed due to
  3077. * the other peer disconnecting. The client may not know about this
  3078. * yet and try to cancel the (just barely non-existent) operation.
  3079. * So this is not a hard error.
  3080. *///
  3081. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  3082. "Client canceled non-existent op %u\n",
  3083. (uint32_t) ntohl (msg->request_id));
  3084. }
  3085. else
  3086. {
  3087. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  3088. "Client requested cancel for op %u\n",
  3089. (uint32_t) ntohl (msg->request_id));
  3090. _GSS_operation_destroy (op);
  3091. }
  3092. GNUNET_SERVICE_client_continue (cs->client);
  3093. }
  3094. /**
  3095. * Handle a request from the client to accept a set operation that
  3096. * came from a remote peer. We forward the accept to the associated
  3097. * operation for handling
  3098. *
  3099. * @param cls the client
  3100. * @param msg the message
  3101. */
  3102. static void
  3103. handle_client_accept (void *cls,
  3104. const struct GNUNET_SETU_AcceptMessage *msg)
  3105. {
  3106. struct ClientState *cs = cls;
  3107. struct Set *set;
  3108. struct Operation *op;
  3109. struct GNUNET_SETU_ResultMessage *result_message;
  3110. struct GNUNET_MQ_Envelope *ev;
  3111. struct Listener *listener;
  3112. if (NULL == (set = cs->set))
  3113. {
  3114. /* client without a set requested to accept */
  3115. GNUNET_break (0);
  3116. GNUNET_SERVICE_client_drop (cs->client);
  3117. return;
  3118. }
  3119. op = get_incoming (ntohl (msg->accept_reject_id));
  3120. if (NULL == op)
  3121. {
  3122. /* It is not an error if the set op does not exist -- it may
  3123. * have been destroyed when the partner peer disconnected. */
  3124. GNUNET_log (
  3125. GNUNET_ERROR_TYPE_INFO,
  3126. "Client %p accepted request %u of listener %p that is no longer active\n",
  3127. cs,
  3128. ntohl (msg->accept_reject_id),
  3129. cs->listener);
  3130. ev = GNUNET_MQ_msg (result_message,
  3131. GNUNET_MESSAGE_TYPE_SETU_RESULT);
  3132. result_message->request_id = msg->request_id;
  3133. result_message->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
  3134. GNUNET_MQ_send (set->cs->mq, ev);
  3135. GNUNET_SERVICE_client_continue (cs->client);
  3136. return;
  3137. }
  3138. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  3139. "Client accepting request %u\n",
  3140. (uint32_t) ntohl (msg->accept_reject_id));
  3141. listener = op->listener;
  3142. op->listener = NULL;
  3143. GNUNET_CONTAINER_DLL_remove (listener->op_head,
  3144. listener->op_tail,
  3145. op);
  3146. op->set = set;
  3147. GNUNET_CONTAINER_DLL_insert (set->ops_head,
  3148. set->ops_tail,
  3149. op);
  3150. op->client_request_id = ntohl (msg->request_id);
  3151. op->byzantine = msg->byzantine;
  3152. op->byzantine_lower_bound = ntohl (msg->byzantine_lower_bound);
  3153. op->force_full = msg->force_full;
  3154. op->force_delta = msg->force_delta;
  3155. op->symmetric = msg->symmetric;
  3156. /* Advance generation values, so that future mutations do not
  3157. interfer with the running operation. */
  3158. op->generation_created = set->current_generation;
  3159. advance_generation (set);
  3160. GNUNET_assert (NULL == op->se);
  3161. LOG (GNUNET_ERROR_TYPE_DEBUG,
  3162. "accepting set union operation\n");
  3163. GNUNET_STATISTICS_update (_GSS_statistics,
  3164. "# of accepted union operations",
  3165. 1,
  3166. GNUNET_NO);
  3167. GNUNET_STATISTICS_update (_GSS_statistics,
  3168. "# of total union operations",
  3169. 1,
  3170. GNUNET_NO);
  3171. {
  3172. const struct StrataEstimator *se;
  3173. struct GNUNET_MQ_Envelope *ev;
  3174. struct StrataEstimatorMessage *strata_msg;
  3175. char *buf;
  3176. size_t len;
  3177. uint16_t type;
  3178. op->se = strata_estimator_dup (op->set->se);
  3179. op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
  3180. GNUNET_NO);
  3181. op->salt_receive = op->salt_send = 42; // FIXME?????
  3182. initialize_key_to_element (op);
  3183. op->initial_size = GNUNET_CONTAINER_multihashmap32_size (
  3184. op->key_to_element);
  3185. /* kick off the operation */
  3186. se = op->se;
  3187. buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
  3188. len = strata_estimator_write (se,
  3189. buf);
  3190. if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
  3191. type = GNUNET_MESSAGE_TYPE_SETU_P2P_SEC;
  3192. else
  3193. type = GNUNET_MESSAGE_TYPE_SETU_P2P_SE;
  3194. ev = GNUNET_MQ_msg_extra (strata_msg,
  3195. len,
  3196. type);
  3197. GNUNET_memcpy (&strata_msg[1],
  3198. buf,
  3199. len);
  3200. GNUNET_free (buf);
  3201. strata_msg->set_size
  3202. = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (
  3203. op->set->content->elements));
  3204. GNUNET_MQ_send (op->mq,
  3205. ev);
  3206. op->phase = PHASE_EXPECT_IBF;
  3207. }
  3208. /* Now allow CADET to continue, as we did not do this in
  3209. #handle_incoming_msg (as we wanted to first see if the
  3210. local client would accept the request). */
  3211. GNUNET_CADET_receive_done (op->channel);
  3212. GNUNET_SERVICE_client_continue (cs->client);
  3213. }
  3214. /**
  3215. * Called to clean up, after a shutdown has been requested.
  3216. *
  3217. * @param cls closure, NULL
  3218. */
  3219. static void
  3220. shutdown_task (void *cls)
  3221. {
  3222. /* Delay actual shutdown to allow service to disconnect clients */
  3223. in_shutdown = GNUNET_YES;
  3224. if (0 == num_clients)
  3225. {
  3226. if (NULL != cadet)
  3227. {
  3228. GNUNET_CADET_disconnect (cadet);
  3229. cadet = NULL;
  3230. }
  3231. }
  3232. GNUNET_STATISTICS_destroy (_GSS_statistics,
  3233. GNUNET_YES);
  3234. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  3235. "handled shutdown request\n");
  3236. }
  3237. /**
  3238. * Function called by the service's run
  3239. * method to run service-specific setup code.
  3240. *
  3241. * @param cls closure
  3242. * @param cfg configuration to use
  3243. * @param service the initialized service
  3244. */
  3245. static void
  3246. run (void *cls,
  3247. const struct GNUNET_CONFIGURATION_Handle *cfg,
  3248. struct GNUNET_SERVICE_Handle *service)
  3249. {
  3250. /* FIXME: need to modify SERVICE (!) API to allow
  3251. us to run a shutdown task *after* clients were
  3252. forcefully disconnected! */
  3253. GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
  3254. NULL);
  3255. _GSS_statistics = GNUNET_STATISTICS_create ("setu",
  3256. cfg);
  3257. cadet = GNUNET_CADET_connect (cfg);
  3258. if (NULL == cadet)
  3259. {
  3260. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  3261. _ ("Could not connect to CADET service\n"));
  3262. GNUNET_SCHEDULER_shutdown ();
  3263. return;
  3264. }
  3265. }
  3266. /**
  3267. * Define "main" method using service macro.
  3268. */
  3269. GNUNET_SERVICE_MAIN (
  3270. "set",
  3271. GNUNET_SERVICE_OPTION_NONE,
  3272. &run,
  3273. &client_connect_cb,
  3274. &client_disconnect_cb,
  3275. NULL,
  3276. GNUNET_MQ_hd_fixed_size (client_accept,
  3277. GNUNET_MESSAGE_TYPE_SETU_ACCEPT,
  3278. struct GNUNET_SETU_AcceptMessage,
  3279. NULL),
  3280. GNUNET_MQ_hd_var_size (client_set_add,
  3281. GNUNET_MESSAGE_TYPE_SETU_ADD,
  3282. struct GNUNET_SETU_ElementMessage,
  3283. NULL),
  3284. GNUNET_MQ_hd_fixed_size (client_create_set,
  3285. GNUNET_MESSAGE_TYPE_SETU_CREATE,
  3286. struct GNUNET_SETU_CreateMessage,
  3287. NULL),
  3288. GNUNET_MQ_hd_var_size (client_evaluate,
  3289. GNUNET_MESSAGE_TYPE_SETU_EVALUATE,
  3290. struct GNUNET_SETU_EvaluateMessage,
  3291. NULL),
  3292. GNUNET_MQ_hd_fixed_size (client_listen,
  3293. GNUNET_MESSAGE_TYPE_SETU_LISTEN,
  3294. struct GNUNET_SETU_ListenMessage,
  3295. NULL),
  3296. GNUNET_MQ_hd_fixed_size (client_reject,
  3297. GNUNET_MESSAGE_TYPE_SETU_REJECT,
  3298. struct GNUNET_SETU_RejectMessage,
  3299. NULL),
  3300. GNUNET_MQ_hd_fixed_size (client_cancel,
  3301. GNUNET_MESSAGE_TYPE_SETU_CANCEL,
  3302. struct GNUNET_SETU_CancelMessage,
  3303. NULL),
  3304. GNUNET_MQ_handler_end ());
  3305. /* end of gnunet-service-setu.c */