gnunet-service-rps.c 142 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293429442954296429742984299430043014302430343044305430643074308430943104311431243134314431543164317431843194320432143224323432443254326432743284329433043314332433343344335433643374338433943404341434243434344434543464347434843494350435143524353435443554356435743584359436043614362436343644365436643674368436943704371437243734374437543764377437843794380438143824383438443854386438743884389439043914392439343944395439643974398439944004401440244034404440544064407440844094410441144124413441444154416441744184419442044214422442344244425442644274428442944304431443244334434443544364437443844394440444144424443444444454446444744484449445044514452445344544455445644574458445944604461446244634464446544664467446844694470447144724473447444754476447744784479448044814482448344844485448644874488448944904491449244934494449544964497449844994500450145024503450445054506450745084509451045114512451345144515451645174518451945204521452245234524452545264527452845294530453145324533453445354536453745384539454045414542454345444545454645474548454945504551455245534554455545564557455845594560456145624563456445654566456745684569457045714572457345744575457645774578457945804581458245834584458545864587458845894590459145924593459445954596459745984599460046014602460346044605460646074608460946104611461246134614461546164617461846194620462146224623462446254626462746284629463046314632463346344635463646374638463946404641464246434644464546464647464846494650465146524653465446554656465746584659466046614662466346644665466646674668466946704671467246734674467546764677467846794680468146824683468446854686468746884689469046914692469346944695469646974698469947004701470247034704470547064707470847094710471147124713471447154716471747184719472047214722472347244725472647274728472947304731473247334734473547364737473847394740474147424743474447454746474747484749475047514752475347544755475647574758475947604761476247634764476547664767476847694770477147724773477447754776477747784779478047814782478347844785478647874788478947904791479247934794479547964797479847994800480148024803480448054806480748084809481048114812481348144815481648174818481948204821482248234824482548264827482848294830483148324833483448354836483748384839484048414842484348444845484648474848484948504851485248534854485548564857485848594860486148624863486448654866486748684869487048714872487348744875487648774878487948804881488248834884488548864887488848894890489148924893489448954896489748984899490049014902490349044905490649074908490949104911491249134914491549164917491849194920492149224923492449254926492749284929493049314932493349344935493649374938493949404941494249434944494549464947494849494950495149524953495449554956495749584959496049614962496349644965496649674968496949704971497249734974497549764977497849794980498149824983498449854986498749884989499049914992499349944995499649974998499950005001500250035004500550065007
  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2013-2015 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file rps/gnunet-service-rps.c
  18. * @brief rps service implementation
  19. * @author Julius Bünger
  20. */
  21. #include "platform.h"
  22. #include "gnunet_applications.h"
  23. #include "gnunet_util_lib.h"
  24. #include "gnunet_cadet_service.h"
  25. #include "gnunet_core_service.h"
  26. #include "gnunet_peerinfo_service.h"
  27. #include "gnunet_nse_service.h"
  28. #include "gnunet_statistics_service.h"
  29. #include "rps.h"
  30. #include "rps-test_util.h"
  31. #include "gnunet-service-rps_sampler.h"
  32. #include "gnunet-service-rps_custommap.h"
  33. #include "gnunet-service-rps_view.h"
  34. #include <math.h>
  35. #include <inttypes.h>
  36. #include <string.h>
  37. #define LOG(kind, ...) GNUNET_log (kind, __VA_ARGS__)
  38. // TODO check for overflows
  39. // TODO align message structs
  40. // TODO connect to friends
  41. // TODO blacklist? (-> mal peer detection on top of brahms)
  42. // hist_size_init, hist_size_max
  43. /***********************************************************************
  44. * Old gnunet-service-rps_peers.c
  45. ***********************************************************************/
  46. /**
  47. * Set a peer flag of given peer context.
  48. */
  49. #define SET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
  50. /**
  51. * Get peer flag of given peer context.
  52. */
  53. #define check_peer_flag_set(peer_ctx, mask) \
  54. ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
  55. /**
  56. * Unset flag of given peer context.
  57. */
  58. #define UNSET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
  59. /**
  60. * Get channel flag of given channel context.
  61. */
  62. #define check_channel_flag_set(channel_flags, mask) \
  63. ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
  64. /**
  65. * Unset flag of given channel context.
  66. */
  67. #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
  68. /**
  69. * Pending operation on peer consisting of callback and closure
  70. *
  71. * When an operation cannot be executed right now this struct is used to store
  72. * the callback and closure for later execution.
  73. */
  74. struct PeerPendingOp
  75. {
  76. /**
  77. * Callback
  78. */
  79. PeerOp op;
  80. /**
  81. * Closure
  82. */
  83. void *op_cls;
  84. };
  85. /**
  86. * List containing all messages that are yet to be send
  87. *
  88. * This is used to keep track of all messages that have not been sent yet. When
  89. * a peer is to be removed the pending messages can be removed properly.
  90. */
  91. struct PendingMessage
  92. {
  93. /**
  94. * DLL next, prev
  95. */
  96. struct PendingMessage *next;
  97. struct PendingMessage *prev;
  98. /**
  99. * The envelope to the corresponding message
  100. */
  101. struct GNUNET_MQ_Envelope *ev;
  102. /**
  103. * The corresponding context
  104. */
  105. struct PeerContext *peer_ctx;
  106. /**
  107. * The message type
  108. */
  109. const char *type;
  110. };
  111. /**
  112. * @brief Context for a channel
  113. */
  114. struct ChannelCtx;
  115. /**
  116. * Struct used to keep track of other peer's status
  117. *
  118. * This is stored in a multipeermap.
  119. * It contains information such as cadet channels, a message queue for sending,
  120. * status about the channels, the pending operations on this peer and some flags
  121. * about the status of the peer itself. (online, valid, ...)
  122. */
  123. struct PeerContext
  124. {
  125. /**
  126. * The Sub this context belongs to.
  127. */
  128. struct Sub *sub;
  129. /**
  130. * Message queue open to client
  131. */
  132. struct GNUNET_MQ_Handle *mq;
  133. /**
  134. * Channel open to client.
  135. */
  136. struct ChannelCtx *send_channel_ctx;
  137. /**
  138. * Channel open from client.
  139. */
  140. struct ChannelCtx *recv_channel_ctx;
  141. /**
  142. * Array of pending operations on this peer.
  143. */
  144. struct PeerPendingOp *pending_ops;
  145. /**
  146. * Handle to the callback given to cadet_ntfy_tmt_rdy()
  147. *
  148. * To be canceled on shutdown.
  149. */
  150. struct PendingMessage *online_check_pending;
  151. /**
  152. * Number of pending operations.
  153. */
  154. unsigned int num_pending_ops;
  155. /**
  156. * Identity of the peer
  157. */
  158. struct GNUNET_PeerIdentity peer_id;
  159. /**
  160. * Flags indicating status of peer
  161. */
  162. uint32_t peer_flags;
  163. /**
  164. * Last time we received something from that peer.
  165. */
  166. struct GNUNET_TIME_Absolute last_message_recv;
  167. /**
  168. * Last time we received a keepalive message.
  169. */
  170. struct GNUNET_TIME_Absolute last_keepalive;
  171. /**
  172. * DLL with all messages that are yet to be sent
  173. */
  174. struct PendingMessage *pending_messages_head;
  175. struct PendingMessage *pending_messages_tail;
  176. /**
  177. * This is pobably followed by 'statistical' data (when we first saw
  178. * it, how did we get its ID, how many pushes (in a timeinterval),
  179. * ...)
  180. */
  181. uint32_t round_pull_req;
  182. };
  183. /**
  184. * @brief Closure to #valid_peer_iterator
  185. */
  186. struct PeersIteratorCls
  187. {
  188. /**
  189. * Iterator function
  190. */
  191. PeersIterator iterator;
  192. /**
  193. * Closure to iterator
  194. */
  195. void *cls;
  196. };
  197. /**
  198. * @brief Context for a channel
  199. */
  200. struct ChannelCtx
  201. {
  202. /**
  203. * @brief The channel itself
  204. */
  205. struct GNUNET_CADET_Channel *channel;
  206. /**
  207. * @brief The peer context associated with the channel
  208. */
  209. struct PeerContext *peer_ctx;
  210. /**
  211. * @brief When channel destruction needs to be delayed (because it is called
  212. * from within the cadet routine of another channel destruction) this task
  213. * refers to the respective _SCHEDULER_Task.
  214. */
  215. struct GNUNET_SCHEDULER_Task *destruction_task;
  216. };
  217. #if ENABLE_MALICIOUS
  218. /**
  219. * If type is 2 This struct is used to store the attacked peers in a DLL
  220. */
  221. struct AttackedPeer
  222. {
  223. /**
  224. * DLL
  225. */
  226. struct AttackedPeer *next;
  227. struct AttackedPeer *prev;
  228. /**
  229. * PeerID
  230. */
  231. struct GNUNET_PeerIdentity peer_id;
  232. };
  233. #endif /* ENABLE_MALICIOUS */
  234. /**
  235. * @brief This number determines the number of slots for files that represent
  236. * histograms
  237. */
  238. #define HISTOGRAM_FILE_SLOTS 32
  239. /**
  240. * @brief The size (in bytes) a file needs to store the histogram
  241. *
  242. * Per slot: 1 newline, up to 4 chars,
  243. * Additionally: 1 null termination
  244. */
  245. #define SIZE_DUMP_FILE (HISTOGRAM_FILE_SLOTS * 5) + 1
  246. /**
  247. * @brief One Sub.
  248. *
  249. * Essentially one instance of brahms that only connects to other instances
  250. * with the same (secret) value.
  251. */
  252. struct Sub
  253. {
  254. /**
  255. * @brief Hash of the shared value that defines Subs.
  256. */
  257. struct GNUNET_HashCode hash;
  258. /**
  259. * @brief Port to communicate to other peers.
  260. */
  261. struct GNUNET_CADET_Port *cadet_port;
  262. /**
  263. * @brief Hashmap of valid peers.
  264. */
  265. struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
  266. /**
  267. * @brief Filename of the file that stores the valid peers persistently.
  268. */
  269. char *filename_valid_peers;
  270. /**
  271. * Set of all peers to keep track of them.
  272. */
  273. struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
  274. /**
  275. * @brief This is the minimum estimate used as sampler size.
  276. *
  277. * It is configured by the user.
  278. */
  279. unsigned int sampler_size_est_min;
  280. /**
  281. * The size of sampler we need to be able to satisfy the Brahms protocol's
  282. * need of random peers.
  283. *
  284. * This is one minimum size the sampler grows to.
  285. */
  286. unsigned int sampler_size_est_need;
  287. /**
  288. * Time interval the do_round task runs in.
  289. */
  290. struct GNUNET_TIME_Relative round_interval;
  291. /**
  292. * Sampler used for the Brahms protocol itself.
  293. */
  294. struct RPS_Sampler *sampler;
  295. #ifdef TO_FILE_FULL
  296. /**
  297. * Name to log view to
  298. */
  299. char *file_name_view_log;
  300. #endif /* TO_FILE_FULL */
  301. #ifdef TO_FILE
  302. #ifdef TO_FILE_FULL
  303. /**
  304. * Name to log number of observed peers to
  305. */
  306. char *file_name_observed_log;
  307. #endif /* TO_FILE_FULL */
  308. /**
  309. * @brief Count the observed peers
  310. */
  311. uint32_t num_observed_peers;
  312. /**
  313. * @brief Multipeermap (ab-) used to count unique peer_ids
  314. */
  315. struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
  316. #endif /* TO_FILE */
  317. /**
  318. * List to store peers received through pushes temporary.
  319. */
  320. struct CustomPeerMap *push_map;
  321. /**
  322. * List to store peers received through pulls temporary.
  323. */
  324. struct CustomPeerMap *pull_map;
  325. /**
  326. * @brief This is the estimate used as view size.
  327. *
  328. * It is initialised with the minimum
  329. */
  330. unsigned int view_size_est_need;
  331. /**
  332. * @brief This is the minimum estimate used as view size.
  333. *
  334. * It is configured by the user.
  335. */
  336. unsigned int view_size_est_min;
  337. /**
  338. * @brief The view.
  339. */
  340. struct View *view;
  341. /**
  342. * Identifier for the main task that runs periodically.
  343. */
  344. struct GNUNET_SCHEDULER_Task *do_round_task;
  345. /* === stats === */
  346. /**
  347. * @brief Counts the executed rounds.
  348. */
  349. uint32_t num_rounds;
  350. /**
  351. * @brief This array accumulates the number of received pushes per round.
  352. *
  353. * Number at index i represents the number of rounds with i observed pushes.
  354. */
  355. uint32_t push_recv[HISTOGRAM_FILE_SLOTS];
  356. /**
  357. * @brief Histogram of deltas between the expected and actual number of
  358. * received pushes.
  359. *
  360. * As half of the entries are expected to be negative, this is shifted by
  361. * #HISTOGRAM_FILE_SLOTS/2.
  362. */
  363. uint32_t push_delta[HISTOGRAM_FILE_SLOTS];
  364. /**
  365. * @brief Number of pull replies with this delay measured in rounds.
  366. *
  367. * Number at index i represents the number of pull replies with a delay of i
  368. * rounds.
  369. */
  370. uint32_t pull_delays[HISTOGRAM_FILE_SLOTS];
  371. };
  372. /***********************************************************************
  373. * Globals
  374. ***********************************************************************/
  375. /**
  376. * Our configuration.
  377. */
  378. static const struct GNUNET_CONFIGURATION_Handle *cfg;
  379. /**
  380. * Handle to the statistics service.
  381. */
  382. struct GNUNET_STATISTICS_Handle *stats;
  383. /**
  384. * Handler to CADET.
  385. */
  386. struct GNUNET_CADET_Handle *cadet_handle;
  387. /**
  388. * Handle to CORE
  389. */
  390. struct GNUNET_CORE_Handle *core_handle;
  391. /**
  392. * @brief PeerMap to keep track of connected peers.
  393. */
  394. struct GNUNET_CONTAINER_MultiPeerMap *map_single_hop;
  395. /**
  396. * Our own identity.
  397. */
  398. static struct GNUNET_PeerIdentity own_identity;
  399. /**
  400. * Percentage of total peer number in the view
  401. * to send random PUSHes to
  402. */
  403. static float alpha;
  404. /**
  405. * Percentage of total peer number in the view
  406. * to send random PULLs to
  407. */
  408. static float beta;
  409. /**
  410. * Handler to NSE.
  411. */
  412. static struct GNUNET_NSE_Handle *nse;
  413. /**
  414. * Handler to PEERINFO.
  415. */
  416. static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
  417. /**
  418. * Handle for cancellation of iteration over peers.
  419. */
  420. static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
  421. #if ENABLE_MALICIOUS
  422. /**
  423. * Type of malicious peer
  424. *
  425. * 0 Don't act malicious at all - Default
  426. * 1 Try to maximise representation
  427. * 2 Try to partition the network
  428. * 3 Combined attack
  429. */
  430. static uint32_t mal_type;
  431. /**
  432. * Other malicious peers
  433. */
  434. static struct GNUNET_PeerIdentity *mal_peers;
  435. /**
  436. * Hashmap of malicious peers used as set.
  437. * Used to more efficiently check whether we know that peer.
  438. */
  439. static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
  440. /**
  441. * Number of other malicious peers
  442. */
  443. static uint32_t num_mal_peers;
  444. /**
  445. * If type is 2 this is the DLL of attacked peers
  446. */
  447. static struct AttackedPeer *att_peers_head;
  448. static struct AttackedPeer *att_peers_tail;
  449. /**
  450. * This index is used to point to an attacked peer to
  451. * implement the round-robin-ish way to select attacked peers.
  452. */
  453. static struct AttackedPeer *att_peer_index;
  454. /**
  455. * Hashmap of attacked peers used as set.
  456. * Used to more efficiently check whether we know that peer.
  457. */
  458. static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
  459. /**
  460. * Number of attacked peers
  461. */
  462. static uint32_t num_attacked_peers;
  463. /**
  464. * If type is 1 this is the attacked peer
  465. */
  466. static struct GNUNET_PeerIdentity attacked_peer;
  467. /**
  468. * The limit of PUSHes we can send in one round.
  469. * This is an assumption of the Brahms protocol and either implemented
  470. * via proof of work
  471. * or
  472. * assumed to be the bandwidth limitation.
  473. */
  474. static uint32_t push_limit = 10000;
  475. #endif /* ENABLE_MALICIOUS */
  476. /**
  477. * @brief Main Sub.
  478. *
  479. * This is run in any case by all peers and connects to all peers without
  480. * specifying a shared value.
  481. */
  482. static struct Sub *msub;
  483. /**
  484. * @brief Maximum number of valid peers to keep.
  485. * TODO read from config
  486. */
  487. static const uint32_t num_valid_peers_max = UINT32_MAX;
  488. /***********************************************************************
  489. * /Globals
  490. ***********************************************************************/
  491. static void
  492. do_round (void *cls);
  493. #if ENABLE_MALICIOUS
  494. static void
  495. do_mal_round (void *cls);
  496. #endif /* ENABLE_MALICIOUS */
  497. /**
  498. * @brief Get the #PeerContext associated with a peer
  499. *
  500. * @param peer_map The peer map containing the context
  501. * @param peer the peer id
  502. *
  503. * @return the #PeerContext
  504. */
  505. static struct PeerContext *
  506. get_peer_ctx (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
  507. const struct GNUNET_PeerIdentity *peer)
  508. {
  509. struct PeerContext *ctx;
  510. int ret;
  511. ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
  512. GNUNET_assert (GNUNET_YES == ret);
  513. ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
  514. GNUNET_assert (NULL != ctx);
  515. return ctx;
  516. }
  517. /**
  518. * @brief Check whether we have information about the given peer.
  519. *
  520. * FIXME probably deprecated. Make this the new _online.
  521. *
  522. * @param peer_map The peer map to check for the existence of @a peer
  523. * @param peer peer in question
  524. *
  525. * @return #GNUNET_YES if peer is known
  526. * #GNUNET_NO if peer is not known
  527. */
  528. static int
  529. check_peer_known (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
  530. const struct GNUNET_PeerIdentity *peer)
  531. {
  532. if (NULL != peer_map)
  533. {
  534. return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
  535. }
  536. else
  537. {
  538. return GNUNET_NO;
  539. }
  540. }
  541. /**
  542. * @brief Create a new #PeerContext and insert it into the peer map
  543. *
  544. * @param sub The Sub this context belongs to.
  545. * @param peer the peer to create the #PeerContext for
  546. *
  547. * @return the #PeerContext
  548. */
  549. static struct PeerContext *
  550. create_peer_ctx (struct Sub *sub,
  551. const struct GNUNET_PeerIdentity *peer)
  552. {
  553. struct PeerContext *ctx;
  554. int ret;
  555. GNUNET_assert (GNUNET_NO == check_peer_known (sub->peer_map, peer));
  556. ctx = GNUNET_new (struct PeerContext);
  557. ctx->peer_id = *peer;
  558. ctx->sub = sub;
  559. ret = GNUNET_CONTAINER_multipeermap_put (sub->peer_map, peer, ctx,
  560. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
  561. GNUNET_assert (GNUNET_OK == ret);
  562. if (sub == msub)
  563. {
  564. GNUNET_STATISTICS_set (stats,
  565. "# known peers",
  566. GNUNET_CONTAINER_multipeermap_size (sub->peer_map),
  567. GNUNET_NO);
  568. }
  569. return ctx;
  570. }
  571. /**
  572. * @brief Create or get a #PeerContext
  573. *
  574. * @param sub The Sub to which the created context belongs to
  575. * @param peer the peer to get the associated context to
  576. *
  577. * @return the context
  578. */
  579. static struct PeerContext *
  580. create_or_get_peer_ctx (struct Sub *sub,
  581. const struct GNUNET_PeerIdentity *peer)
  582. {
  583. if (GNUNET_NO == check_peer_known (sub->peer_map, peer))
  584. {
  585. return create_peer_ctx (sub, peer);
  586. }
  587. return get_peer_ctx (sub->peer_map, peer);
  588. }
  589. /**
  590. * @brief Check whether we have a connection to this @a peer
  591. *
  592. * Also sets the #Peers_ONLINE flag accordingly
  593. *
  594. * @param peer_ctx Context of the peer of which connectivity is to be checked
  595. *
  596. * @return #GNUNET_YES if we are connected
  597. * #GNUNET_NO otherwise
  598. */
  599. static int
  600. check_connected (struct PeerContext *peer_ctx)
  601. {
  602. /* If we don't know about this peer we don't know whether it's online */
  603. if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
  604. &peer_ctx->peer_id))
  605. {
  606. return GNUNET_NO;
  607. }
  608. /* Get the context */
  609. peer_ctx = get_peer_ctx (peer_ctx->sub->peer_map, &peer_ctx->peer_id);
  610. /* If we have no channel to this peer we don't know whether it's online */
  611. if ((NULL == peer_ctx->send_channel_ctx) &&
  612. (NULL == peer_ctx->recv_channel_ctx))
  613. {
  614. UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
  615. return GNUNET_NO;
  616. }
  617. /* Otherwise (if we have a channel, we know that it's online */
  618. SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
  619. return GNUNET_YES;
  620. }
  621. /**
  622. * @brief The closure to #get_rand_peer_iterator.
  623. */
  624. struct GetRandPeerIteratorCls
  625. {
  626. /**
  627. * @brief The index of the peer to return.
  628. * Will be decreased until 0.
  629. * Then current peer is returned.
  630. */
  631. uint32_t index;
  632. /**
  633. * @brief Pointer to peer to return.
  634. */
  635. const struct GNUNET_PeerIdentity *peer;
  636. };
  637. /**
  638. * @brief Iterator function for #get_random_peer_from_peermap.
  639. *
  640. * Implements #GNUNET_CONTAINER_PeerMapIterator.
  641. * Decreases the index until the index is null.
  642. * Then returns the current peer.
  643. *
  644. * @param cls the #GetRandPeerIteratorCls containing index and peer
  645. * @param peer current peer
  646. * @param value unused
  647. *
  648. * @return #GNUNET_YES if we should continue to
  649. * iterate,
  650. * #GNUNET_NO if not.
  651. */
  652. static int
  653. get_rand_peer_iterator (void *cls,
  654. const struct GNUNET_PeerIdentity *peer,
  655. void *value)
  656. {
  657. struct GetRandPeerIteratorCls *iterator_cls = cls;
  658. (void) value;
  659. if (0 >= iterator_cls->index)
  660. {
  661. iterator_cls->peer = peer;
  662. return GNUNET_NO;
  663. }
  664. iterator_cls->index--;
  665. return GNUNET_YES;
  666. }
  667. /**
  668. * @brief Get a random peer from @a peer_map
  669. *
  670. * @param valid_peers Peer map containing valid peers from which to select a
  671. * random one
  672. *
  673. * @return a random peer
  674. */
  675. static const struct GNUNET_PeerIdentity *
  676. get_random_peer_from_peermap (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
  677. {
  678. struct GetRandPeerIteratorCls *iterator_cls;
  679. const struct GNUNET_PeerIdentity *ret;
  680. iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
  681. iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
  682. GNUNET_CONTAINER_multipeermap_size (
  683. valid_peers));
  684. (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
  685. get_rand_peer_iterator,
  686. iterator_cls);
  687. ret = iterator_cls->peer;
  688. GNUNET_free (iterator_cls);
  689. return ret;
  690. }
  691. /**
  692. * @brief Add a given @a peer to valid peers.
  693. *
  694. * If valid peers are already #num_valid_peers_max, delete a peer previously.
  695. *
  696. * @param peer The peer that is added to the valid peers.
  697. * @param valid_peers Peer map of valid peers to which to add the @a peer
  698. *
  699. * @return #GNUNET_YES if no other peer had to be removed
  700. * #GNUNET_NO otherwise
  701. */
  702. static int
  703. add_valid_peer (const struct GNUNET_PeerIdentity *peer,
  704. struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
  705. {
  706. const struct GNUNET_PeerIdentity *rand_peer;
  707. int ret;
  708. ret = GNUNET_YES;
  709. /* Remove random peers until there is space for a new one */
  710. while (num_valid_peers_max <=
  711. GNUNET_CONTAINER_multipeermap_size (valid_peers))
  712. {
  713. rand_peer = get_random_peer_from_peermap (valid_peers);
  714. GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
  715. ret = GNUNET_NO;
  716. }
  717. (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
  718. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
  719. if (valid_peers == msub->valid_peers)
  720. {
  721. GNUNET_STATISTICS_set (stats,
  722. "# valid peers",
  723. GNUNET_CONTAINER_multipeermap_size (valid_peers),
  724. GNUNET_NO);
  725. }
  726. return ret;
  727. }
  728. static void
  729. remove_pending_message (struct PendingMessage *pending_msg, int cancel);
  730. /**
  731. * @brief Set the peer flag to living and
  732. * call the pending operations on this peer.
  733. *
  734. * Also adds peer to #valid_peers.
  735. *
  736. * @param peer_ctx the #PeerContext of the peer to set online
  737. */
  738. static void
  739. set_peer_online (struct PeerContext *peer_ctx)
  740. {
  741. struct GNUNET_PeerIdentity *peer;
  742. unsigned int i;
  743. peer = &peer_ctx->peer_id;
  744. LOG (GNUNET_ERROR_TYPE_DEBUG,
  745. "Peer %s is online and valid, calling %i pending operations on it\n",
  746. GNUNET_i2s (peer),
  747. peer_ctx->num_pending_ops);
  748. if (NULL != peer_ctx->online_check_pending)
  749. {
  750. LOG (GNUNET_ERROR_TYPE_DEBUG,
  751. "Removing pending online check for peer %s\n",
  752. GNUNET_i2s (&peer_ctx->peer_id));
  753. // TODO wait until cadet sets mq->cancel_impl
  754. // GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
  755. remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
  756. peer_ctx->online_check_pending = NULL;
  757. }
  758. SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
  759. /* Call pending operations */
  760. for (i = 0; i < peer_ctx->num_pending_ops; i++)
  761. {
  762. peer_ctx->pending_ops[i].op (peer_ctx->pending_ops[i].op_cls, peer);
  763. }
  764. GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
  765. }
  766. static void
  767. cleanup_destroyed_channel (void *cls,
  768. const struct GNUNET_CADET_Channel *channel);
  769. /* Declaration of handlers */
  770. static void
  771. handle_peer_check (void *cls,
  772. const struct GNUNET_MessageHeader *msg);
  773. static void
  774. handle_peer_push (void *cls,
  775. const struct GNUNET_MessageHeader *msg);
  776. static void
  777. handle_peer_pull_request (void *cls,
  778. const struct GNUNET_MessageHeader *msg);
  779. static int
  780. check_peer_pull_reply (void *cls,
  781. const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
  782. static void
  783. handle_peer_pull_reply (void *cls,
  784. const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
  785. /* End declaration of handlers */
  786. /**
  787. * @brief Allocate memory for a new channel context and insert it into DLL
  788. *
  789. * @param peer_ctx context of the according peer
  790. *
  791. * @return The channel context
  792. */
  793. static struct ChannelCtx *
  794. add_channel_ctx (struct PeerContext *peer_ctx)
  795. {
  796. struct ChannelCtx *channel_ctx;
  797. channel_ctx = GNUNET_new (struct ChannelCtx);
  798. channel_ctx->peer_ctx = peer_ctx;
  799. return channel_ctx;
  800. }
  801. /**
  802. * @brief Free memory and NULL pointers.
  803. *
  804. * @param channel_ctx The channel context.
  805. */
  806. static void
  807. remove_channel_ctx (struct ChannelCtx *channel_ctx)
  808. {
  809. struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
  810. if (NULL != channel_ctx->destruction_task)
  811. {
  812. GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
  813. channel_ctx->destruction_task = NULL;
  814. }
  815. if (NULL == peer_ctx)
  816. return;
  817. if (channel_ctx == peer_ctx->send_channel_ctx)
  818. {
  819. peer_ctx->send_channel_ctx = NULL;
  820. peer_ctx->mq = NULL;
  821. }
  822. else if (channel_ctx == peer_ctx->recv_channel_ctx)
  823. {
  824. peer_ctx->recv_channel_ctx = NULL;
  825. }
  826. GNUNET_free (channel_ctx);
  827. }
  828. /**
  829. * @brief Get the channel of a peer. If not existing, create.
  830. *
  831. * @param peer_ctx Context of the peer of which to get the channel
  832. * @return the #GNUNET_CADET_Channel used to send data to @a peer_ctx
  833. */
  834. struct GNUNET_CADET_Channel *
  835. get_channel (struct PeerContext *peer_ctx)
  836. {
  837. /* There exists a copy-paste-clone in run() */
  838. struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
  839. GNUNET_MQ_hd_fixed_size (peer_check,
  840. GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
  841. struct GNUNET_MessageHeader,
  842. NULL),
  843. GNUNET_MQ_hd_fixed_size (peer_push,
  844. GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
  845. struct GNUNET_MessageHeader,
  846. NULL),
  847. GNUNET_MQ_hd_fixed_size (peer_pull_request,
  848. GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
  849. struct GNUNET_MessageHeader,
  850. NULL),
  851. GNUNET_MQ_hd_var_size (peer_pull_reply,
  852. GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
  853. struct GNUNET_RPS_P2P_PullReplyMessage,
  854. NULL),
  855. GNUNET_MQ_handler_end ()
  856. };
  857. if (NULL == peer_ctx->send_channel_ctx)
  858. {
  859. LOG (GNUNET_ERROR_TYPE_DEBUG,
  860. "Trying to establish channel to peer %s\n",
  861. GNUNET_i2s (&peer_ctx->peer_id));
  862. peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx);
  863. peer_ctx->send_channel_ctx->channel =
  864. GNUNET_CADET_channel_create (cadet_handle,
  865. peer_ctx->send_channel_ctx, /* context */
  866. &peer_ctx->peer_id,
  867. &peer_ctx->sub->hash,
  868. NULL, /* WindowSize handler */
  869. &cleanup_destroyed_channel, /* Disconnect handler */
  870. cadet_handlers);
  871. }
  872. GNUNET_assert (NULL != peer_ctx->send_channel_ctx);
  873. GNUNET_assert (NULL != peer_ctx->send_channel_ctx->channel);
  874. return peer_ctx->send_channel_ctx->channel;
  875. }
  876. /**
  877. * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
  878. *
  879. * If we already have a message queue open to this client,
  880. * simply return it, otherwise create one.
  881. *
  882. * @param peer_ctx Context of the peer of which to get the mq
  883. * @return the #GNUNET_MQ_Handle
  884. */
  885. static struct GNUNET_MQ_Handle *
  886. get_mq (struct PeerContext *peer_ctx)
  887. {
  888. if (NULL == peer_ctx->mq)
  889. {
  890. peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer_ctx));
  891. }
  892. return peer_ctx->mq;
  893. }
  894. /**
  895. * @brief Add an envelope to a message passed to mq to list of pending messages
  896. *
  897. * @param peer_ctx Context of the peer for which to insert the envelope
  898. * @param ev envelope to the message
  899. * @param type type of the message to be sent
  900. * @return pointer to pending message
  901. */
  902. static struct PendingMessage *
  903. insert_pending_message (struct PeerContext *peer_ctx,
  904. struct GNUNET_MQ_Envelope *ev,
  905. const char *type)
  906. {
  907. struct PendingMessage *pending_msg;
  908. pending_msg = GNUNET_new (struct PendingMessage);
  909. pending_msg->ev = ev;
  910. pending_msg->peer_ctx = peer_ctx;
  911. pending_msg->type = type;
  912. GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
  913. peer_ctx->pending_messages_tail,
  914. pending_msg);
  915. return pending_msg;
  916. }
  917. /**
  918. * @brief Remove a pending message from the respective DLL
  919. *
  920. * @param pending_msg the pending message to remove
  921. * @param cancel whether to cancel the pending message, too
  922. */
  923. static void
  924. remove_pending_message (struct PendingMessage *pending_msg, int cancel)
  925. {
  926. struct PeerContext *peer_ctx;
  927. (void) cancel;
  928. peer_ctx = pending_msg->peer_ctx;
  929. GNUNET_assert (NULL != peer_ctx);
  930. GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
  931. peer_ctx->pending_messages_tail,
  932. pending_msg);
  933. // TODO wait for the cadet implementation of message cancellation
  934. // if (GNUNET_YES == cancel)
  935. // {
  936. // GNUNET_MQ_send_cancel (pending_msg->ev);
  937. // }
  938. GNUNET_free (pending_msg);
  939. }
  940. /**
  941. * @brief This is called in response to the first message we sent as a
  942. * online check.
  943. *
  944. * @param cls #PeerContext of peer with pending online check
  945. */
  946. static void
  947. mq_online_check_successful (void *cls)
  948. {
  949. struct PeerContext *peer_ctx = cls;
  950. if (NULL != peer_ctx->online_check_pending)
  951. {
  952. LOG (GNUNET_ERROR_TYPE_DEBUG,
  953. "Online check for peer %s was successful\n",
  954. GNUNET_i2s (&peer_ctx->peer_id));
  955. remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
  956. peer_ctx->online_check_pending = NULL;
  957. set_peer_online (peer_ctx);
  958. (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
  959. }
  960. }
  961. /**
  962. * Issue a check whether peer is online
  963. *
  964. * @param peer_ctx the context of the peer
  965. */
  966. static void
  967. check_peer_online (struct PeerContext *peer_ctx)
  968. {
  969. LOG (GNUNET_ERROR_TYPE_DEBUG,
  970. "Get informed about peer %s getting online\n",
  971. GNUNET_i2s (&peer_ctx->peer_id));
  972. struct GNUNET_MQ_Handle *mq;
  973. struct GNUNET_MQ_Envelope *ev;
  974. ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
  975. peer_ctx->online_check_pending =
  976. insert_pending_message (peer_ctx, ev, "Check online");
  977. mq = get_mq (peer_ctx);
  978. GNUNET_MQ_notify_sent (ev,
  979. mq_online_check_successful,
  980. peer_ctx);
  981. GNUNET_MQ_send (mq, ev);
  982. if (peer_ctx->sub == msub)
  983. {
  984. GNUNET_STATISTICS_update (stats,
  985. "# pending online checks",
  986. 1,
  987. GNUNET_NO);
  988. }
  989. }
  990. /**
  991. * @brief Check whether function of type #PeerOp was already scheduled
  992. *
  993. * The array with pending operations will probably never grow really big, so
  994. * iterating over it should be ok.
  995. *
  996. * @param peer_ctx Context of the peer to check for the operation
  997. * @param peer_op the operation (#PeerOp) on the peer
  998. *
  999. * @return #GNUNET_YES if this operation is scheduled on that peer
  1000. * #GNUNET_NO otherwise
  1001. */
  1002. static int
  1003. check_operation_scheduled (const struct PeerContext *peer_ctx,
  1004. const PeerOp peer_op)
  1005. {
  1006. unsigned int i;
  1007. for (i = 0; i < peer_ctx->num_pending_ops; i++)
  1008. if (peer_op == peer_ctx->pending_ops[i].op)
  1009. return GNUNET_YES;
  1010. return GNUNET_NO;
  1011. }
  1012. /**
  1013. * @brief Callback for scheduler to destroy a channel
  1014. *
  1015. * @param cls Context of the channel
  1016. */
  1017. static void
  1018. destroy_channel (struct ChannelCtx *channel_ctx)
  1019. {
  1020. struct GNUNET_CADET_Channel *channel;
  1021. if (NULL != channel_ctx->destruction_task)
  1022. {
  1023. GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
  1024. channel_ctx->destruction_task = NULL;
  1025. }
  1026. GNUNET_assert (channel_ctx->channel != NULL);
  1027. channel = channel_ctx->channel;
  1028. channel_ctx->channel = NULL;
  1029. GNUNET_CADET_channel_destroy (channel);
  1030. remove_channel_ctx (channel_ctx);
  1031. }
  1032. /**
  1033. * @brief Destroy a cadet channel.
  1034. *
  1035. * This satisfies the function signature of #GNUNET_SCHEDULER_TaskCallback.
  1036. *
  1037. * @param cls
  1038. */
  1039. static void
  1040. destroy_channel_cb (void *cls)
  1041. {
  1042. struct ChannelCtx *channel_ctx = cls;
  1043. channel_ctx->destruction_task = NULL;
  1044. destroy_channel (channel_ctx);
  1045. }
  1046. /**
  1047. * @brief Schedule the destruction of a channel for immediately afterwards.
  1048. *
  1049. * In case a channel is to be destroyed from within the callback to the
  1050. * destruction of another channel (send channel), we cannot call
  1051. * GNUNET_CADET_channel_destroy directly, but need to use this scheduling
  1052. * construction.
  1053. *
  1054. * @param channel_ctx channel to be destroyed.
  1055. */
  1056. static void
  1057. schedule_channel_destruction (struct ChannelCtx *channel_ctx)
  1058. {
  1059. GNUNET_assert (NULL ==
  1060. channel_ctx->destruction_task);
  1061. GNUNET_assert (NULL !=
  1062. channel_ctx->channel);
  1063. channel_ctx->destruction_task =
  1064. GNUNET_SCHEDULER_add_now (&destroy_channel_cb,
  1065. channel_ctx);
  1066. }
  1067. /**
  1068. * @brief Remove peer
  1069. *
  1070. * - Empties the list with pending operations
  1071. * - Empties the list with pending messages
  1072. * - Cancels potentially existing online check
  1073. * - Schedules closing of send and recv channels
  1074. * - Removes peer from peer map
  1075. *
  1076. * @param peer_ctx Context of the peer to be destroyed
  1077. * @return #GNUNET_YES if peer was removed
  1078. * #GNUNET_NO otherwise
  1079. */
  1080. static int
  1081. destroy_peer (struct PeerContext *peer_ctx)
  1082. {
  1083. GNUNET_assert (NULL != peer_ctx);
  1084. GNUNET_assert (NULL != peer_ctx->sub->peer_map);
  1085. if (GNUNET_NO ==
  1086. GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
  1087. &peer_ctx->peer_id))
  1088. {
  1089. return GNUNET_NO;
  1090. }
  1091. SET_PEER_FLAG (peer_ctx, Peers_TO_DESTROY);
  1092. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1093. "Going to remove peer %s\n",
  1094. GNUNET_i2s (&peer_ctx->peer_id));
  1095. UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
  1096. /* Clear list of pending operations */
  1097. // TODO this probably leaks memory
  1098. // ('only' the cls to the function. Not sure what to do with it)
  1099. GNUNET_array_grow (peer_ctx->pending_ops,
  1100. peer_ctx->num_pending_ops,
  1101. 0);
  1102. /* Remove all pending messages */
  1103. while (NULL != peer_ctx->pending_messages_head)
  1104. {
  1105. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1106. "Removing unsent %s\n",
  1107. peer_ctx->pending_messages_head->type);
  1108. /* Cancel pending message, too */
  1109. if ((NULL != peer_ctx->online_check_pending) &&
  1110. (0 == memcmp (peer_ctx->pending_messages_head,
  1111. peer_ctx->online_check_pending,
  1112. sizeof(struct PendingMessage))))
  1113. {
  1114. peer_ctx->online_check_pending = NULL;
  1115. if (peer_ctx->sub == msub)
  1116. {
  1117. GNUNET_STATISTICS_update (stats,
  1118. "# pending online checks",
  1119. -1,
  1120. GNUNET_NO);
  1121. }
  1122. }
  1123. remove_pending_message (peer_ctx->pending_messages_head,
  1124. GNUNET_YES);
  1125. }
  1126. /* If we are still waiting for notification whether this peer is online
  1127. * cancel the according task */
  1128. if (NULL != peer_ctx->online_check_pending)
  1129. {
  1130. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1131. "Removing pending online check for peer %s\n",
  1132. GNUNET_i2s (&peer_ctx->peer_id));
  1133. // TODO wait until cadet sets mq->cancel_impl
  1134. // GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
  1135. remove_pending_message (peer_ctx->online_check_pending,
  1136. GNUNET_YES);
  1137. peer_ctx->online_check_pending = NULL;
  1138. }
  1139. if (NULL != peer_ctx->send_channel_ctx)
  1140. {
  1141. /* This is possibly called from within channel destruction */
  1142. peer_ctx->send_channel_ctx->peer_ctx = NULL;
  1143. schedule_channel_destruction (peer_ctx->send_channel_ctx);
  1144. peer_ctx->send_channel_ctx = NULL;
  1145. peer_ctx->mq = NULL;
  1146. }
  1147. if (NULL != peer_ctx->recv_channel_ctx)
  1148. {
  1149. /* This is possibly called from within channel destruction */
  1150. peer_ctx->recv_channel_ctx->peer_ctx = NULL;
  1151. schedule_channel_destruction (peer_ctx->recv_channel_ctx);
  1152. peer_ctx->recv_channel_ctx = NULL;
  1153. }
  1154. if (GNUNET_YES !=
  1155. GNUNET_CONTAINER_multipeermap_remove_all (peer_ctx->sub->peer_map,
  1156. &peer_ctx->peer_id))
  1157. {
  1158. LOG (GNUNET_ERROR_TYPE_WARNING,
  1159. "removing peer from peer_ctx->sub->peer_map failed\n");
  1160. }
  1161. if (peer_ctx->sub == msub)
  1162. {
  1163. GNUNET_STATISTICS_set (stats,
  1164. "# known peers",
  1165. GNUNET_CONTAINER_multipeermap_size (
  1166. peer_ctx->sub->peer_map),
  1167. GNUNET_NO);
  1168. }
  1169. GNUNET_free (peer_ctx);
  1170. return GNUNET_YES;
  1171. }
  1172. /**
  1173. * Iterator over hash map entries. Deletes all contexts of peers.
  1174. *
  1175. * @param cls closure
  1176. * @param key current public key
  1177. * @param value value in the hash map
  1178. * @return #GNUNET_YES if we should continue to iterate,
  1179. * #GNUNET_NO if not.
  1180. */
  1181. static int
  1182. peermap_clear_iterator (void *cls,
  1183. const struct GNUNET_PeerIdentity *key,
  1184. void *value)
  1185. {
  1186. struct Sub *sub = cls;
  1187. (void) value;
  1188. destroy_peer (get_peer_ctx (sub->peer_map, key));
  1189. return GNUNET_YES;
  1190. }
  1191. /**
  1192. * @brief This is called once a message is sent.
  1193. *
  1194. * Removes the pending message
  1195. *
  1196. * @param cls type of the message that was sent
  1197. */
  1198. static void
  1199. mq_notify_sent_cb (void *cls)
  1200. {
  1201. struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
  1202. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1203. "%s was sent.\n",
  1204. pending_msg->type);
  1205. if (pending_msg->peer_ctx->sub == msub)
  1206. {
  1207. if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
  1208. GNUNET_STATISTICS_update (stats, "# pull replies sent", 1, GNUNET_NO);
  1209. if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
  1210. GNUNET_STATISTICS_update (stats, "# pull requests sent", 1, GNUNET_NO);
  1211. if (0 == strncmp ("PUSH", pending_msg->type, 4))
  1212. GNUNET_STATISTICS_update (stats, "# pushes sent", 1, GNUNET_NO);
  1213. if ((0 == strncmp ("PULL REQUEST", pending_msg->type, 12)) &&
  1214. (NULL != map_single_hop) &&
  1215. (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
  1216. &pending_msg->
  1217. peer_ctx->peer_id)) )
  1218. GNUNET_STATISTICS_update (stats,
  1219. "# pull requests sent (multi-hop peer)",
  1220. 1,
  1221. GNUNET_NO);
  1222. }
  1223. /* Do not cancel message */
  1224. remove_pending_message (pending_msg, GNUNET_NO);
  1225. }
  1226. /**
  1227. * @brief Iterator function for #store_valid_peers.
  1228. *
  1229. * Implements #GNUNET_CONTAINER_PeerMapIterator.
  1230. * Writes single peer to disk.
  1231. *
  1232. * @param cls the file handle to write to.
  1233. * @param peer current peer
  1234. * @param value unused
  1235. *
  1236. * @return #GNUNET_YES if we should continue to
  1237. * iterate,
  1238. * #GNUNET_NO if not.
  1239. */
  1240. static int
  1241. store_peer_presistently_iterator (void *cls,
  1242. const struct GNUNET_PeerIdentity *peer,
  1243. void *value)
  1244. {
  1245. const struct GNUNET_DISK_FileHandle *fh = cls;
  1246. char peer_string[128];
  1247. int size;
  1248. ssize_t ret;
  1249. (void) value;
  1250. if (NULL == peer)
  1251. {
  1252. return GNUNET_YES;
  1253. }
  1254. size = GNUNET_snprintf (peer_string,
  1255. sizeof(peer_string),
  1256. "%s\n",
  1257. GNUNET_i2s_full (peer));
  1258. GNUNET_assert (53 == size);
  1259. ret = GNUNET_DISK_file_write (fh,
  1260. peer_string,
  1261. size);
  1262. GNUNET_assert (size == ret);
  1263. return GNUNET_YES;
  1264. }
  1265. /**
  1266. * @brief Store the peers currently in #valid_peers to disk.
  1267. *
  1268. * @param sub Sub for which to store the valid peers
  1269. */
  1270. static void
  1271. store_valid_peers (const struct Sub *sub)
  1272. {
  1273. struct GNUNET_DISK_FileHandle *fh;
  1274. uint32_t number_written_peers;
  1275. int ret;
  1276. if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
  1277. {
  1278. return;
  1279. }
  1280. ret = GNUNET_DISK_directory_create_for_file (sub->filename_valid_peers);
  1281. if (GNUNET_SYSERR == ret)
  1282. {
  1283. LOG (GNUNET_ERROR_TYPE_WARNING,
  1284. "Not able to create directory for file `%s'\n",
  1285. sub->filename_valid_peers);
  1286. GNUNET_break (0);
  1287. }
  1288. else if (GNUNET_NO == ret)
  1289. {
  1290. LOG (GNUNET_ERROR_TYPE_WARNING,
  1291. "Directory for file `%s' exists but is not writable for us\n",
  1292. sub->filename_valid_peers);
  1293. GNUNET_break (0);
  1294. }
  1295. fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
  1296. GNUNET_DISK_OPEN_WRITE
  1297. | GNUNET_DISK_OPEN_CREATE,
  1298. GNUNET_DISK_PERM_USER_READ
  1299. | GNUNET_DISK_PERM_USER_WRITE);
  1300. if (NULL == fh)
  1301. {
  1302. LOG (GNUNET_ERROR_TYPE_WARNING,
  1303. "Not able to write valid peers to file `%s'\n",
  1304. sub->filename_valid_peers);
  1305. return;
  1306. }
  1307. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1308. "Writing %u valid peers to disk\n",
  1309. GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
  1310. number_written_peers =
  1311. GNUNET_CONTAINER_multipeermap_iterate (sub->valid_peers,
  1312. store_peer_presistently_iterator,
  1313. fh);
  1314. GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
  1315. GNUNET_assert (number_written_peers ==
  1316. GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
  1317. }
  1318. /**
  1319. * @brief Convert string representation of peer id to peer id.
  1320. *
  1321. * Counterpart to #GNUNET_i2s_full.
  1322. *
  1323. * @param string_repr The string representation of the peer id
  1324. *
  1325. * @return The peer id
  1326. */
  1327. static const struct GNUNET_PeerIdentity *
  1328. s2i_full (const char *string_repr)
  1329. {
  1330. struct GNUNET_PeerIdentity *peer;
  1331. size_t len;
  1332. int ret;
  1333. peer = GNUNET_new (struct GNUNET_PeerIdentity);
  1334. len = strlen (string_repr);
  1335. if (52 > len)
  1336. {
  1337. LOG (GNUNET_ERROR_TYPE_WARNING,
  1338. "Not able to convert string representation of PeerID to PeerID\n"
  1339. "String representation: %s (len %lu) - too short\n",
  1340. string_repr,
  1341. len);
  1342. GNUNET_break (0);
  1343. }
  1344. else if (52 < len)
  1345. {
  1346. len = 52;
  1347. }
  1348. ret = GNUNET_CRYPTO_eddsa_public_key_from_string (string_repr,
  1349. len,
  1350. &peer->public_key);
  1351. if (GNUNET_OK != ret)
  1352. {
  1353. LOG (GNUNET_ERROR_TYPE_WARNING,
  1354. "Not able to convert string representation of PeerID to PeerID\n"
  1355. "String representation: %s\n",
  1356. string_repr);
  1357. GNUNET_break (0);
  1358. }
  1359. return peer;
  1360. }
  1361. /**
  1362. * @brief Restore the peers on disk to #valid_peers.
  1363. *
  1364. * @param sub Sub for which to restore the valid peers
  1365. */
  1366. static void
  1367. restore_valid_peers (const struct Sub *sub)
  1368. {
  1369. off_t file_size;
  1370. uint32_t num_peers;
  1371. struct GNUNET_DISK_FileHandle *fh;
  1372. char *buf;
  1373. ssize_t size_read;
  1374. char *iter_buf;
  1375. char *str_repr;
  1376. const struct GNUNET_PeerIdentity *peer;
  1377. if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
  1378. {
  1379. return;
  1380. }
  1381. if (GNUNET_OK != GNUNET_DISK_file_test (sub->filename_valid_peers))
  1382. {
  1383. return;
  1384. }
  1385. fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
  1386. GNUNET_DISK_OPEN_READ,
  1387. GNUNET_DISK_PERM_NONE);
  1388. GNUNET_assert (NULL != fh);
  1389. GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_handle_size (fh, &file_size));
  1390. num_peers = file_size / 53;
  1391. buf = GNUNET_malloc (file_size);
  1392. size_read = GNUNET_DISK_file_read (fh, buf, file_size);
  1393. GNUNET_assert (size_read == file_size);
  1394. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1395. "Restoring %" PRIu32 " peers from file `%s'\n",
  1396. num_peers,
  1397. sub->filename_valid_peers);
  1398. for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
  1399. {
  1400. str_repr = GNUNET_strndup (iter_buf, 53);
  1401. peer = s2i_full (str_repr);
  1402. GNUNET_free (str_repr);
  1403. add_valid_peer (peer, sub->valid_peers);
  1404. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1405. "Restored valid peer %s from disk\n",
  1406. GNUNET_i2s_full (peer));
  1407. }
  1408. iter_buf = NULL;
  1409. GNUNET_free (buf);
  1410. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1411. "num_peers: %" PRIu32 ", _size (sub->valid_peers): %u\n",
  1412. num_peers,
  1413. GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
  1414. if (num_peers != GNUNET_CONTAINER_multipeermap_size (sub->valid_peers))
  1415. {
  1416. LOG (GNUNET_ERROR_TYPE_WARNING,
  1417. "Number of restored peers does not match file size. Have probably duplicates.\n");
  1418. }
  1419. GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
  1420. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1421. "Restored %u valid peers from disk\n",
  1422. GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
  1423. }
  1424. /**
  1425. * @brief Delete storage of peers that was created with #initialise_peers ()
  1426. *
  1427. * @param sub Sub for which the storage is deleted
  1428. */
  1429. static void
  1430. peers_terminate (struct Sub *sub)
  1431. {
  1432. if (GNUNET_SYSERR ==
  1433. GNUNET_CONTAINER_multipeermap_iterate (sub->peer_map,
  1434. &peermap_clear_iterator,
  1435. sub))
  1436. {
  1437. LOG (GNUNET_ERROR_TYPE_WARNING,
  1438. "Iteration destroying peers was aborted.\n");
  1439. }
  1440. GNUNET_CONTAINER_multipeermap_destroy (sub->peer_map);
  1441. sub->peer_map = NULL;
  1442. store_valid_peers (sub);
  1443. GNUNET_free (sub->filename_valid_peers);
  1444. sub->filename_valid_peers = NULL;
  1445. GNUNET_CONTAINER_multipeermap_destroy (sub->valid_peers);
  1446. sub->valid_peers = NULL;
  1447. }
  1448. /**
  1449. * Iterator over #valid_peers hash map entries.
  1450. *
  1451. * @param cls Closure that contains iterator function and closure
  1452. * @param peer current peer id
  1453. * @param value value in the hash map - unused
  1454. * @return #GNUNET_YES if we should continue to
  1455. * iterate,
  1456. * #GNUNET_NO if not.
  1457. */
  1458. static int
  1459. valid_peer_iterator (void *cls,
  1460. const struct GNUNET_PeerIdentity *peer,
  1461. void *value)
  1462. {
  1463. struct PeersIteratorCls *it_cls = cls;
  1464. (void) value;
  1465. return it_cls->iterator (it_cls->cls, peer);
  1466. }
  1467. /**
  1468. * @brief Get all currently known, valid peer ids.
  1469. *
  1470. * @param valid_peers Peer map containing the valid peers in question
  1471. * @param iterator function to call on each peer id
  1472. * @param it_cls extra argument to @a iterator
  1473. * @return the number of key value pairs processed,
  1474. * #GNUNET_SYSERR if it aborted iteration
  1475. */
  1476. static int
  1477. get_valid_peers (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
  1478. PeersIterator iterator,
  1479. void *it_cls)
  1480. {
  1481. struct PeersIteratorCls *cls;
  1482. int ret;
  1483. cls = GNUNET_new (struct PeersIteratorCls);
  1484. cls->iterator = iterator;
  1485. cls->cls = it_cls;
  1486. ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
  1487. valid_peer_iterator,
  1488. cls);
  1489. GNUNET_free (cls);
  1490. return ret;
  1491. }
  1492. /**
  1493. * @brief Add peer to known peers.
  1494. *
  1495. * This function is called on new peer_ids from 'external' sources
  1496. * (client seed, cadet get_peers(), ...)
  1497. *
  1498. * @param sub Sub with the peer map that the @a peer will be added to
  1499. * @param peer the new #GNUNET_PeerIdentity
  1500. *
  1501. * @return #GNUNET_YES if peer was inserted
  1502. * #GNUNET_NO otherwise
  1503. */
  1504. static int
  1505. insert_peer (struct Sub *sub,
  1506. const struct GNUNET_PeerIdentity *peer)
  1507. {
  1508. if (GNUNET_YES == check_peer_known (sub->peer_map, peer))
  1509. {
  1510. return GNUNET_NO; /* We already know this peer - nothing to do */
  1511. }
  1512. (void) create_peer_ctx (sub, peer);
  1513. return GNUNET_YES;
  1514. }
  1515. /**
  1516. * @brief Check whether flags on a peer are set.
  1517. *
  1518. * @param peer_map Peer map that is expected to contain the @a peer
  1519. * @param peer the peer to check the flag of
  1520. * @param flags the flags to check
  1521. *
  1522. * @return #GNUNET_SYSERR if peer is not known
  1523. * #GNUNET_YES if all given flags are set
  1524. * #GNUNET_NO otherwise
  1525. */
  1526. static int
  1527. check_peer_flag (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
  1528. const struct GNUNET_PeerIdentity *peer,
  1529. enum Peers_PeerFlags flags)
  1530. {
  1531. struct PeerContext *peer_ctx;
  1532. if (GNUNET_NO == check_peer_known (peer_map, peer))
  1533. {
  1534. return GNUNET_SYSERR;
  1535. }
  1536. peer_ctx = get_peer_ctx (peer_map, peer);
  1537. return check_peer_flag_set (peer_ctx, flags);
  1538. }
  1539. /**
  1540. * @brief Try connecting to a peer to see whether it is online
  1541. *
  1542. * If not known yet, insert into known peers
  1543. *
  1544. * @param sub Sub which would contain the @a peer
  1545. * @param peer the peer whose online is to be checked
  1546. * @return #GNUNET_YES if the check was issued
  1547. * #GNUNET_NO otherwise
  1548. */
  1549. static int
  1550. issue_peer_online_check (struct Sub *sub,
  1551. const struct GNUNET_PeerIdentity *peer)
  1552. {
  1553. struct PeerContext *peer_ctx;
  1554. (void) insert_peer (sub, peer); // TODO even needed?
  1555. peer_ctx = get_peer_ctx (sub->peer_map, peer);
  1556. if ((GNUNET_NO == check_peer_flag (sub->peer_map, peer, Peers_ONLINE)) &&
  1557. (NULL == peer_ctx->online_check_pending))
  1558. {
  1559. check_peer_online (peer_ctx);
  1560. return GNUNET_YES;
  1561. }
  1562. return GNUNET_NO;
  1563. }
  1564. /**
  1565. * @brief Check if peer is removable.
  1566. *
  1567. * Check if
  1568. * - a recv channel exists
  1569. * - there are pending messages
  1570. * - there is no pending pull reply
  1571. *
  1572. * @param peer_ctx Context of the peer in question
  1573. * @return #GNUNET_YES if peer is removable
  1574. * #GNUNET_NO if peer is NOT removable
  1575. * #GNUNET_SYSERR if peer is not known
  1576. */
  1577. static int
  1578. check_removable (const struct PeerContext *peer_ctx)
  1579. {
  1580. if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (
  1581. peer_ctx->sub->peer_map,
  1582. &peer_ctx->peer_id))
  1583. {
  1584. return GNUNET_SYSERR;
  1585. }
  1586. if ((NULL != peer_ctx->recv_channel_ctx) ||
  1587. (NULL != peer_ctx->pending_messages_head) ||
  1588. (GNUNET_YES == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)))
  1589. {
  1590. return GNUNET_NO;
  1591. }
  1592. return GNUNET_YES;
  1593. }
  1594. /**
  1595. * @brief Check whether @a peer is actually a peer.
  1596. *
  1597. * A valid peer is a peer that we know exists eg. we were connected to once.
  1598. *
  1599. * @param valid_peers Peer map that would contain the @a peer
  1600. * @param peer peer in question
  1601. *
  1602. * @return #GNUNET_YES if peer is valid
  1603. * #GNUNET_NO if peer is not valid
  1604. */
  1605. static int
  1606. check_peer_valid (const struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
  1607. const struct GNUNET_PeerIdentity *peer)
  1608. {
  1609. return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
  1610. }
  1611. /**
  1612. * @brief Indicate that we want to send to the other peer
  1613. *
  1614. * This establishes a sending channel
  1615. *
  1616. * @param peer_ctx Context of the target peer
  1617. */
  1618. static void
  1619. indicate_sending_intention (struct PeerContext *peer_ctx)
  1620. {
  1621. GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
  1622. &peer_ctx->peer_id));
  1623. (void) get_channel (peer_ctx);
  1624. }
  1625. /**
  1626. * @brief Check whether other peer has the intention to send/opened channel
  1627. * towars us
  1628. *
  1629. * @param peer_ctx Context of the peer in question
  1630. *
  1631. * @return #GNUNET_YES if peer has the intention to send
  1632. * #GNUNET_NO otherwise
  1633. */
  1634. static int
  1635. check_peer_send_intention (const struct PeerContext *peer_ctx)
  1636. {
  1637. if (NULL != peer_ctx->recv_channel_ctx)
  1638. {
  1639. return GNUNET_YES;
  1640. }
  1641. return GNUNET_NO;
  1642. }
  1643. /**
  1644. * Handle the channel a peer opens to us.
  1645. *
  1646. * @param cls The closure - Sub
  1647. * @param channel The channel the peer wants to establish
  1648. * @param initiator The peer's peer ID
  1649. *
  1650. * @return initial channel context for the channel
  1651. * (can be NULL -- that's not an error)
  1652. */
  1653. static void *
  1654. handle_inbound_channel (void *cls,
  1655. struct GNUNET_CADET_Channel *channel,
  1656. const struct GNUNET_PeerIdentity *initiator)
  1657. {
  1658. struct PeerContext *peer_ctx;
  1659. struct ChannelCtx *channel_ctx;
  1660. struct Sub *sub = cls;
  1661. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1662. "New channel was established to us (Peer %s).\n",
  1663. GNUNET_i2s (initiator));
  1664. GNUNET_assert (NULL != channel); /* according to cadet API */
  1665. /* Make sure we 'know' about this peer */
  1666. peer_ctx = create_or_get_peer_ctx (sub, initiator);
  1667. set_peer_online (peer_ctx);
  1668. (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
  1669. channel_ctx = add_channel_ctx (peer_ctx);
  1670. channel_ctx->channel = channel;
  1671. /* We only accept one incoming channel per peer */
  1672. if (GNUNET_YES == check_peer_send_intention (get_peer_ctx (sub->peer_map,
  1673. initiator)))
  1674. {
  1675. LOG (GNUNET_ERROR_TYPE_WARNING,
  1676. "Already got one receive channel. Destroying old one.\n");
  1677. GNUNET_break_op (0);
  1678. destroy_channel (peer_ctx->recv_channel_ctx);
  1679. peer_ctx->recv_channel_ctx = channel_ctx;
  1680. /* return the channel context */
  1681. return channel_ctx;
  1682. }
  1683. peer_ctx->recv_channel_ctx = channel_ctx;
  1684. return channel_ctx;
  1685. }
  1686. /**
  1687. * @brief Check whether a sending channel towards the given peer exists
  1688. *
  1689. * @param peer_ctx Context of the peer in question
  1690. *
  1691. * @return #GNUNET_YES if a sending channel towards that peer exists
  1692. * #GNUNET_NO otherwise
  1693. */
  1694. static int
  1695. check_sending_channel_exists (const struct PeerContext *peer_ctx)
  1696. {
  1697. if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
  1698. &peer_ctx->peer_id))
  1699. { /* If no such peer exists, there is no channel */
  1700. return GNUNET_NO;
  1701. }
  1702. if (NULL == peer_ctx->send_channel_ctx)
  1703. {
  1704. return GNUNET_NO;
  1705. }
  1706. return GNUNET_YES;
  1707. }
  1708. /**
  1709. * @brief Destroy the send channel of a peer e.g. stop indicating a sending
  1710. * intention to another peer
  1711. *
  1712. * @param peer_ctx Context to the peer
  1713. * @return #GNUNET_YES if channel was destroyed
  1714. * #GNUNET_NO otherwise
  1715. */
  1716. static int
  1717. destroy_sending_channel (struct PeerContext *peer_ctx)
  1718. {
  1719. if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
  1720. &peer_ctx->peer_id))
  1721. {
  1722. return GNUNET_NO;
  1723. }
  1724. if (NULL != peer_ctx->send_channel_ctx)
  1725. {
  1726. destroy_channel (peer_ctx->send_channel_ctx);
  1727. (void) check_connected (peer_ctx);
  1728. return GNUNET_YES;
  1729. }
  1730. return GNUNET_NO;
  1731. }
  1732. /**
  1733. * @brief Send a message to another peer.
  1734. *
  1735. * Keeps track about pending messages so they can be properly removed when the
  1736. * peer is destroyed.
  1737. *
  1738. * @param peer_ctx Context of the peer to which the message is to be sent
  1739. * @param ev envelope of the message
  1740. * @param type type of the message
  1741. */
  1742. static void
  1743. send_message (struct PeerContext *peer_ctx,
  1744. struct GNUNET_MQ_Envelope *ev,
  1745. const char *type)
  1746. {
  1747. struct PendingMessage *pending_msg;
  1748. struct GNUNET_MQ_Handle *mq;
  1749. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1750. "Sending message to %s of type %s\n",
  1751. GNUNET_i2s (&peer_ctx->peer_id),
  1752. type);
  1753. pending_msg = insert_pending_message (peer_ctx, ev, type);
  1754. mq = get_mq (peer_ctx);
  1755. GNUNET_MQ_notify_sent (ev,
  1756. mq_notify_sent_cb,
  1757. pending_msg);
  1758. GNUNET_MQ_send (mq, ev);
  1759. }
  1760. /**
  1761. * @brief Schedule a operation on given peer
  1762. *
  1763. * Avoids scheduling an operation twice.
  1764. *
  1765. * @param peer_ctx Context of the peer for which to schedule the operation
  1766. * @param peer_op the operation to schedule
  1767. * @param cls Closure to @a peer_op
  1768. *
  1769. * @return #GNUNET_YES if the operation was scheduled
  1770. * #GNUNET_NO otherwise
  1771. */
  1772. static int
  1773. schedule_operation (struct PeerContext *peer_ctx,
  1774. const PeerOp peer_op,
  1775. void *cls)
  1776. {
  1777. struct PeerPendingOp pending_op;
  1778. GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
  1779. &peer_ctx->peer_id));
  1780. // TODO if ONLINE execute immediately
  1781. if (GNUNET_NO == check_operation_scheduled (peer_ctx, peer_op))
  1782. {
  1783. pending_op.op = peer_op;
  1784. pending_op.op_cls = cls;
  1785. GNUNET_array_append (peer_ctx->pending_ops,
  1786. peer_ctx->num_pending_ops,
  1787. pending_op);
  1788. return GNUNET_YES;
  1789. }
  1790. return GNUNET_NO;
  1791. }
  1792. /***********************************************************************
  1793. * /Old gnunet-service-rps_peers.c
  1794. ***********************************************************************/
  1795. /***********************************************************************
  1796. * Housekeeping with clients
  1797. ***********************************************************************/
  1798. /**
  1799. * Closure used to pass the client and the id to the callback
  1800. * that replies to a client's request
  1801. */
  1802. struct ReplyCls
  1803. {
  1804. /**
  1805. * DLL
  1806. */
  1807. struct ReplyCls *next;
  1808. struct ReplyCls *prev;
  1809. /**
  1810. * The identifier of the request
  1811. */
  1812. uint32_t id;
  1813. /**
  1814. * The handle to the request
  1815. */
  1816. struct RPS_SamplerRequestHandle *req_handle;
  1817. /**
  1818. * The client handle to send the reply to
  1819. */
  1820. struct ClientContext *cli_ctx;
  1821. };
  1822. /**
  1823. * Struct used to store the context of a connected client.
  1824. */
  1825. struct ClientContext
  1826. {
  1827. /**
  1828. * DLL
  1829. */
  1830. struct ClientContext *next;
  1831. struct ClientContext *prev;
  1832. /**
  1833. * The message queue to communicate with the client.
  1834. */
  1835. struct GNUNET_MQ_Handle *mq;
  1836. /**
  1837. * @brief How many updates this client expects to receive.
  1838. */
  1839. int64_t view_updates_left;
  1840. /**
  1841. * @brief Whether this client wants to receive stream updates.
  1842. * Either #GNUNET_YES or #GNUNET_NO
  1843. */
  1844. int8_t stream_update;
  1845. /**
  1846. * The client handle to send the reply to
  1847. */
  1848. struct GNUNET_SERVICE_Client *client;
  1849. /**
  1850. * The #Sub this context belongs to
  1851. */
  1852. struct Sub *sub;
  1853. };
  1854. /**
  1855. * DLL with all clients currently connected to us
  1856. */
  1857. struct ClientContext *cli_ctx_head;
  1858. struct ClientContext *cli_ctx_tail;
  1859. /***********************************************************************
  1860. * /Housekeeping with clients
  1861. ***********************************************************************/
  1862. /***********************************************************************
  1863. * Util functions
  1864. ***********************************************************************/
  1865. /**
  1866. * Print peerlist to log.
  1867. */
  1868. static void
  1869. print_peer_list (struct GNUNET_PeerIdentity *list,
  1870. unsigned int len)
  1871. {
  1872. unsigned int i;
  1873. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1874. "Printing peer list of length %u at %p:\n",
  1875. len,
  1876. list);
  1877. for (i = 0; i < len; i++)
  1878. {
  1879. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1880. "%u. peer: %s\n",
  1881. i, GNUNET_i2s (&list[i]));
  1882. }
  1883. }
  1884. /**
  1885. * Remove peer from list.
  1886. */
  1887. static void
  1888. rem_from_list (struct GNUNET_PeerIdentity **peer_list,
  1889. unsigned int *list_size,
  1890. const struct GNUNET_PeerIdentity *peer)
  1891. {
  1892. unsigned int i;
  1893. struct GNUNET_PeerIdentity *tmp;
  1894. tmp = *peer_list;
  1895. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1896. "Removing peer %s from list at %p\n",
  1897. GNUNET_i2s (peer),
  1898. tmp);
  1899. for (i = 0; i < *list_size; i++)
  1900. {
  1901. if (0 == GNUNET_memcmp (&tmp[i], peer))
  1902. {
  1903. if (i < *list_size - 1)
  1904. { /* Not at the last entry -- shift peers left */
  1905. memmove (&tmp[i], &tmp[i + 1],
  1906. ((*list_size) - i - 1) * sizeof(struct GNUNET_PeerIdentity));
  1907. }
  1908. /* Remove last entry (should be now useless PeerID) */
  1909. GNUNET_array_grow (tmp, *list_size, (*list_size) - 1);
  1910. }
  1911. }
  1912. *peer_list = tmp;
  1913. }
  1914. /**
  1915. * Insert PeerID in #view
  1916. *
  1917. * Called once we know a peer is online.
  1918. * Implements #PeerOp
  1919. *
  1920. * @return GNUNET_OK if peer was actually inserted
  1921. * GNUNET_NO if peer was not inserted
  1922. */
  1923. static void
  1924. insert_in_view_op (void *cls,
  1925. const struct GNUNET_PeerIdentity *peer);
  1926. /**
  1927. * Insert PeerID in #view
  1928. *
  1929. * Called once we know a peer is online.
  1930. *
  1931. * @param sub Sub in with the view to insert in
  1932. * @param peer the peer to insert
  1933. *
  1934. * @return GNUNET_OK if peer was actually inserted
  1935. * GNUNET_NO if peer was not inserted
  1936. */
  1937. static int
  1938. insert_in_view (struct Sub *sub,
  1939. const struct GNUNET_PeerIdentity *peer)
  1940. {
  1941. struct PeerContext *peer_ctx;
  1942. int online;
  1943. int ret;
  1944. online = check_peer_flag (sub->peer_map, peer, Peers_ONLINE);
  1945. peer_ctx = get_peer_ctx (sub->peer_map, peer); // TODO indirection needed?
  1946. if ((GNUNET_NO == online) ||
  1947. (GNUNET_SYSERR == online)) /* peer is not even known */
  1948. {
  1949. (void) issue_peer_online_check (sub, peer);
  1950. (void) schedule_operation (peer_ctx, insert_in_view_op, sub);
  1951. return GNUNET_NO;
  1952. }
  1953. /* Open channel towards peer to keep connection open */
  1954. indicate_sending_intention (peer_ctx);
  1955. ret = View_put (sub->view, peer);
  1956. if (peer_ctx->sub == msub)
  1957. {
  1958. GNUNET_STATISTICS_set (stats,
  1959. "view size",
  1960. View_size (peer_ctx->sub->view),
  1961. GNUNET_NO);
  1962. }
  1963. return ret;
  1964. }
  1965. /**
  1966. * @brief Send view to client
  1967. *
  1968. * @param cli_ctx the context of the client
  1969. * @param view_array the peerids of the view as array (can be empty)
  1970. * @param view_size the size of the view array (can be 0)
  1971. */
  1972. static void
  1973. send_view (const struct ClientContext *cli_ctx,
  1974. const struct GNUNET_PeerIdentity *view_array,
  1975. uint64_t view_size)
  1976. {
  1977. struct GNUNET_MQ_Envelope *ev;
  1978. struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
  1979. struct Sub *sub;
  1980. if (NULL == view_array)
  1981. {
  1982. if (NULL == cli_ctx->sub)
  1983. sub = msub;
  1984. else
  1985. sub = cli_ctx->sub;
  1986. view_size = View_size (sub->view);
  1987. view_array = View_get_as_array (sub->view);
  1988. }
  1989. ev = GNUNET_MQ_msg_extra (out_msg,
  1990. view_size * sizeof(struct GNUNET_PeerIdentity),
  1991. GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
  1992. out_msg->num_peers = htonl (view_size);
  1993. GNUNET_memcpy (&out_msg[1],
  1994. view_array,
  1995. view_size * sizeof(struct GNUNET_PeerIdentity));
  1996. GNUNET_MQ_send (cli_ctx->mq, ev);
  1997. }
  1998. /**
  1999. * @brief Send peer from biased stream to client.
  2000. *
  2001. * TODO merge with send_view, parameterise
  2002. *
  2003. * @param cli_ctx the context of the client
  2004. * @param view_array the peerids of the view as array (can be empty)
  2005. * @param view_size the size of the view array (can be 0)
  2006. */
  2007. static void
  2008. send_stream_peers (const struct ClientContext *cli_ctx,
  2009. uint64_t num_peers,
  2010. const struct GNUNET_PeerIdentity *peers)
  2011. {
  2012. struct GNUNET_MQ_Envelope *ev;
  2013. struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
  2014. GNUNET_assert (NULL != peers);
  2015. ev = GNUNET_MQ_msg_extra (out_msg,
  2016. num_peers * sizeof(struct GNUNET_PeerIdentity),
  2017. GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
  2018. out_msg->num_peers = htonl (num_peers);
  2019. GNUNET_memcpy (&out_msg[1],
  2020. peers,
  2021. num_peers * sizeof(struct GNUNET_PeerIdentity));
  2022. GNUNET_MQ_send (cli_ctx->mq, ev);
  2023. }
  2024. /**
  2025. * @brief sends updates to clients that are interested
  2026. *
  2027. * @param sub Sub for which to notify clients
  2028. */
  2029. static void
  2030. clients_notify_view_update (const struct Sub *sub)
  2031. {
  2032. struct ClientContext *cli_ctx_iter;
  2033. uint64_t num_peers;
  2034. const struct GNUNET_PeerIdentity *view_array;
  2035. num_peers = View_size (sub->view);
  2036. view_array = View_get_as_array (sub->view);
  2037. /* check size of view is small enough */
  2038. if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
  2039. {
  2040. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  2041. "View is too big to send\n");
  2042. return;
  2043. }
  2044. for (cli_ctx_iter = cli_ctx_head;
  2045. NULL != cli_ctx_iter;
  2046. cli_ctx_iter = cli_ctx_iter->next)
  2047. {
  2048. if (1 < cli_ctx_iter->view_updates_left)
  2049. {
  2050. /* Client wants to receive limited amount of updates */
  2051. cli_ctx_iter->view_updates_left -= 1;
  2052. }
  2053. else if (1 == cli_ctx_iter->view_updates_left)
  2054. {
  2055. /* Last update of view for client */
  2056. cli_ctx_iter->view_updates_left = -1;
  2057. }
  2058. else if (0 > cli_ctx_iter->view_updates_left)
  2059. {
  2060. /* Client is not interested in updates */
  2061. continue;
  2062. }
  2063. /* else _updates_left == 0 - infinite amount of updates */
  2064. /* send view */
  2065. send_view (cli_ctx_iter, view_array, num_peers);
  2066. }
  2067. }
  2068. /**
  2069. * @brief sends updates to clients that are interested
  2070. *
  2071. * @param num_peers Number of peers to send
  2072. * @param peers the array of peers to send
  2073. */
  2074. static void
  2075. clients_notify_stream_peer (const struct Sub *sub,
  2076. uint64_t num_peers,
  2077. const struct GNUNET_PeerIdentity *peers)
  2078. // TODO enum StreamPeerSource)
  2079. {
  2080. struct ClientContext *cli_ctx_iter;
  2081. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2082. "Got peer (%s) from biased stream - update all clients\n",
  2083. GNUNET_i2s (peers));
  2084. for (cli_ctx_iter = cli_ctx_head;
  2085. NULL != cli_ctx_iter;
  2086. cli_ctx_iter = cli_ctx_iter->next)
  2087. {
  2088. if ((GNUNET_YES == cli_ctx_iter->stream_update) &&
  2089. ((sub == cli_ctx_iter->sub) || (sub == msub) ))
  2090. {
  2091. send_stream_peers (cli_ctx_iter, num_peers, peers);
  2092. }
  2093. }
  2094. }
  2095. /**
  2096. * Put random peer from sampler into the view as history update.
  2097. *
  2098. * @param ids Array of Peers to insert into view
  2099. * @param num_peers Number of peers to insert
  2100. * @param cls Closure - The Sub for which this is to be done
  2101. */
  2102. static void
  2103. hist_update (const struct GNUNET_PeerIdentity *ids,
  2104. uint32_t num_peers,
  2105. void *cls)
  2106. {
  2107. unsigned int i;
  2108. struct Sub *sub = cls;
  2109. for (i = 0; i < num_peers; i++)
  2110. {
  2111. int inserted;
  2112. if (GNUNET_YES != check_peer_known (sub->peer_map, &ids[i]))
  2113. {
  2114. LOG (GNUNET_ERROR_TYPE_WARNING,
  2115. "Peer in history update not known!\n");
  2116. continue;
  2117. }
  2118. inserted = insert_in_view (sub, &ids[i]);
  2119. if (GNUNET_OK == inserted)
  2120. {
  2121. clients_notify_stream_peer (sub, 1, &ids[i]);
  2122. }
  2123. #ifdef TO_FILE_FULL
  2124. to_file (sub->file_name_view_log,
  2125. "+%s\t(history)",
  2126. GNUNET_i2s_full (ids));
  2127. #endif /* TO_FILE_FULL */
  2128. }
  2129. clients_notify_view_update (sub);
  2130. }
  2131. /**
  2132. * Wrapper around #RPS_sampler_resize()
  2133. *
  2134. * If we do not have enough sampler elements, double current sampler size
  2135. * If we have more than enough sampler elements, halv current sampler size
  2136. *
  2137. * @param sampler The sampler to resize
  2138. * @param new_size New size to which to resize
  2139. */
  2140. static void
  2141. resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
  2142. {
  2143. unsigned int sampler_size;
  2144. // TODO statistics
  2145. // TODO respect the min, max
  2146. sampler_size = RPS_sampler_get_size (sampler);
  2147. if (sampler_size > new_size * 4)
  2148. { /* Shrinking */
  2149. RPS_sampler_resize (sampler, sampler_size / 2);
  2150. }
  2151. else if (sampler_size < new_size)
  2152. { /* Growing */
  2153. RPS_sampler_resize (sampler, sampler_size * 2);
  2154. }
  2155. LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
  2156. }
  2157. #if ENABLE_MALICIOUS
  2158. /**
  2159. * Add all peers in @a peer_array to @a peer_map used as set.
  2160. *
  2161. * @param peer_array array containing the peers
  2162. * @param num_peers number of peers in @peer_array
  2163. * @param peer_map the peermap to use as set
  2164. */
  2165. static void
  2166. add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
  2167. unsigned int num_peers,
  2168. struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
  2169. {
  2170. unsigned int i;
  2171. if (NULL == peer_map)
  2172. {
  2173. LOG (GNUNET_ERROR_TYPE_WARNING,
  2174. "Trying to add peers to non-existing peermap.\n");
  2175. return;
  2176. }
  2177. for (i = 0; i < num_peers; i++)
  2178. {
  2179. GNUNET_CONTAINER_multipeermap_put (peer_map,
  2180. &peer_array[i],
  2181. NULL,
  2182. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
  2183. if (msub->peer_map == peer_map)
  2184. {
  2185. GNUNET_STATISTICS_set (stats,
  2186. "# known peers",
  2187. GNUNET_CONTAINER_multipeermap_size (peer_map),
  2188. GNUNET_NO);
  2189. }
  2190. }
  2191. }
  2192. #endif /* ENABLE_MALICIOUS */
  2193. /**
  2194. * Send a PULL REPLY to @a peer_id
  2195. *
  2196. * @param peer_ctx Context of the peer to send the reply to
  2197. * @param peer_ids the peers to send to @a peer_id
  2198. * @param num_peer_ids the number of peers to send to @a peer_id
  2199. */
  2200. static void
  2201. send_pull_reply (struct PeerContext *peer_ctx,
  2202. const struct GNUNET_PeerIdentity *peer_ids,
  2203. unsigned int num_peer_ids)
  2204. {
  2205. uint32_t send_size;
  2206. struct GNUNET_MQ_Envelope *ev;
  2207. struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
  2208. /* Compute actual size */
  2209. send_size = sizeof(struct GNUNET_RPS_P2P_PullReplyMessage)
  2210. + num_peer_ids * sizeof(struct GNUNET_PeerIdentity);
  2211. if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
  2212. /* Compute number of peers to send
  2213. * If too long, simply truncate */
  2214. // TODO select random ones via permutation
  2215. // or even better: do good protocol design
  2216. send_size =
  2217. (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE
  2218. - sizeof(struct GNUNET_RPS_P2P_PullReplyMessage))
  2219. / sizeof(struct GNUNET_PeerIdentity);
  2220. else
  2221. send_size = num_peer_ids;
  2222. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2223. "Going to send PULL REPLY with %u peers to %s\n",
  2224. send_size, GNUNET_i2s (&peer_ctx->peer_id));
  2225. ev = GNUNET_MQ_msg_extra (out_msg,
  2226. send_size * sizeof(struct GNUNET_PeerIdentity),
  2227. GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
  2228. out_msg->num_peers = htonl (send_size);
  2229. GNUNET_memcpy (&out_msg[1], peer_ids,
  2230. send_size * sizeof(struct GNUNET_PeerIdentity));
  2231. send_message (peer_ctx, ev, "PULL REPLY");
  2232. if (peer_ctx->sub == msub)
  2233. {
  2234. GNUNET_STATISTICS_update (stats, "# pull reply send issued", 1, GNUNET_NO);
  2235. }
  2236. // TODO check with send intention: as send_channel is used/opened we indicate
  2237. // a sending intention without intending it.
  2238. // -> clean peer afterwards?
  2239. // -> use recv_channel?
  2240. }
  2241. /**
  2242. * Insert PeerID in #pull_map
  2243. *
  2244. * Called once we know a peer is online.
  2245. *
  2246. * @param cls Closure - Sub with the pull map to insert into
  2247. * @param peer Peer to insert
  2248. */
  2249. static void
  2250. insert_in_pull_map (void *cls,
  2251. const struct GNUNET_PeerIdentity *peer)
  2252. {
  2253. struct Sub *sub = cls;
  2254. CustomPeerMap_put (sub->pull_map, peer);
  2255. }
  2256. /**
  2257. * Insert PeerID in #view
  2258. *
  2259. * Called once we know a peer is online.
  2260. * Implements #PeerOp
  2261. *
  2262. * @param cls Closure - Sub with view to insert peer into
  2263. * @param peer the peer to insert
  2264. */
  2265. static void
  2266. insert_in_view_op (void *cls,
  2267. const struct GNUNET_PeerIdentity *peer)
  2268. {
  2269. struct Sub *sub = cls;
  2270. int inserted;
  2271. inserted = insert_in_view (sub, peer);
  2272. if (GNUNET_OK == inserted)
  2273. {
  2274. clients_notify_stream_peer (sub, 1, peer);
  2275. }
  2276. }
  2277. /**
  2278. * Update sampler with given PeerID.
  2279. * Implements #PeerOp
  2280. *
  2281. * @param cls Closure - Sub containing the sampler to insert into
  2282. * @param peer Peer to insert
  2283. */
  2284. static void
  2285. insert_in_sampler (void *cls,
  2286. const struct GNUNET_PeerIdentity *peer)
  2287. {
  2288. struct Sub *sub = cls;
  2289. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2290. "Updating samplers with peer %s from insert_in_sampler()\n",
  2291. GNUNET_i2s (peer));
  2292. RPS_sampler_update (sub->sampler, peer);
  2293. if (0 < RPS_sampler_count_id (sub->sampler, peer))
  2294. {
  2295. /* Make sure we 'know' about this peer */
  2296. (void) issue_peer_online_check (sub, peer);
  2297. /* Establish a channel towards that peer to indicate we are going to send
  2298. * messages to it */
  2299. // indicate_sending_intention (peer);
  2300. }
  2301. if (sub == msub)
  2302. {
  2303. GNUNET_STATISTICS_update (stats,
  2304. "# observed peers in gossip",
  2305. 1,
  2306. GNUNET_NO);
  2307. }
  2308. #ifdef TO_FILE
  2309. sub->num_observed_peers++;
  2310. (void) GNUNET_CONTAINER_multipeermap_put
  2311. (sub->observed_unique_peers,
  2312. peer,
  2313. NULL,
  2314. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
  2315. uint32_t num_observed_unique_peers =
  2316. GNUNET_CONTAINER_multipeermap_size (sub->observed_unique_peers);
  2317. GNUNET_STATISTICS_set (stats,
  2318. "# unique peers in gossip",
  2319. num_observed_unique_peers,
  2320. GNUNET_NO);
  2321. #ifdef TO_FILE_FULL
  2322. to_file (sub->file_name_observed_log,
  2323. "%" PRIu32 " %" PRIu32 " %f\n",
  2324. sub->num_observed_peers,
  2325. num_observed_unique_peers,
  2326. 1.0 * num_observed_unique_peers / sub->num_observed_peers)
  2327. #endif /* TO_FILE_FULL */
  2328. #endif /* TO_FILE */
  2329. }
  2330. /**
  2331. * @brief This is called on peers from external sources (cadet, peerinfo, ...)
  2332. * If the peer is not known, online check is issued and it is
  2333. * scheduled to be inserted in sampler and view.
  2334. *
  2335. * "External sources" refer to every source except the gossip.
  2336. *
  2337. * @param sub Sub for which @a peer was received
  2338. * @param peer peer to insert/peer received
  2339. */
  2340. static void
  2341. got_peer (struct Sub *sub,
  2342. const struct GNUNET_PeerIdentity *peer)
  2343. {
  2344. /* If we did not know this peer already, insert it into sampler and view */
  2345. if (GNUNET_YES == issue_peer_online_check (sub, peer))
  2346. {
  2347. schedule_operation (get_peer_ctx (sub->peer_map, peer),
  2348. &insert_in_sampler, sub);
  2349. schedule_operation (get_peer_ctx (sub->peer_map, peer),
  2350. &insert_in_view_op, sub);
  2351. }
  2352. if (sub == msub)
  2353. {
  2354. GNUNET_STATISTICS_update (stats,
  2355. "# learnd peers",
  2356. 1,
  2357. GNUNET_NO);
  2358. }
  2359. }
  2360. /**
  2361. * @brief Checks if there is a sending channel and if it is needed
  2362. *
  2363. * @param peer_ctx Context of the peer to check
  2364. * @return GNUNET_YES if sending channel exists and is still needed
  2365. * GNUNET_NO otherwise
  2366. */
  2367. static int
  2368. check_sending_channel_needed (const struct PeerContext *peer_ctx)
  2369. {
  2370. /* struct GNUNET_CADET_Channel *channel; */
  2371. if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
  2372. &peer_ctx->peer_id))
  2373. {
  2374. return GNUNET_NO;
  2375. }
  2376. if (GNUNET_YES == check_sending_channel_exists (peer_ctx))
  2377. {
  2378. if ((0 < RPS_sampler_count_id (peer_ctx->sub->sampler,
  2379. &peer_ctx->peer_id)) ||
  2380. (GNUNET_YES == View_contains_peer (peer_ctx->sub->view,
  2381. &peer_ctx->peer_id)) ||
  2382. (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->push_map,
  2383. &peer_ctx->peer_id)) ||
  2384. (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->pull_map,
  2385. &peer_ctx->peer_id)) ||
  2386. (GNUNET_YES == check_peer_flag (peer_ctx->sub->peer_map,
  2387. &peer_ctx->peer_id,
  2388. Peers_PULL_REPLY_PENDING)))
  2389. { /* If we want to keep the connection to peer open */
  2390. return GNUNET_YES;
  2391. }
  2392. return GNUNET_NO;
  2393. }
  2394. return GNUNET_NO;
  2395. }
  2396. /**
  2397. * @brief remove peer from our knowledge, the view, push and pull maps and
  2398. * samplers.
  2399. *
  2400. * @param sub Sub with the data structures the peer is to be removed from
  2401. * @param peer the peer to remove
  2402. */
  2403. static void
  2404. remove_peer (struct Sub *sub,
  2405. const struct GNUNET_PeerIdentity *peer)
  2406. {
  2407. (void) View_remove_peer (sub->view,
  2408. peer);
  2409. CustomPeerMap_remove_peer (sub->pull_map,
  2410. peer);
  2411. CustomPeerMap_remove_peer (sub->push_map,
  2412. peer);
  2413. RPS_sampler_reinitialise_by_value (sub->sampler,
  2414. peer);
  2415. /* We want to destroy the peer now.
  2416. * Sometimes, it just seems that it's already been removed from the peer_map,
  2417. * so check the peer_map first. */
  2418. if (GNUNET_YES == check_peer_known (sub->peer_map,
  2419. peer))
  2420. {
  2421. destroy_peer (get_peer_ctx (sub->peer_map,
  2422. peer));
  2423. }
  2424. }
  2425. /**
  2426. * @brief Remove data that is not needed anymore.
  2427. *
  2428. * If the sending channel is no longer needed it is destroyed.
  2429. *
  2430. * @param sub Sub in which the current peer is to be cleaned
  2431. * @param peer the peer whose data is about to be cleaned
  2432. */
  2433. static void
  2434. clean_peer (struct Sub *sub,
  2435. const struct GNUNET_PeerIdentity *peer)
  2436. {
  2437. if (GNUNET_NO == check_sending_channel_needed (get_peer_ctx (sub->peer_map,
  2438. peer)))
  2439. {
  2440. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2441. "Going to remove send channel to peer %s\n",
  2442. GNUNET_i2s (peer));
  2443. #if ENABLE_MALICIOUS
  2444. if (0 != GNUNET_memcmp (&attacked_peer,
  2445. peer))
  2446. (void) destroy_sending_channel (get_peer_ctx (sub->peer_map,
  2447. peer));
  2448. #else /* ENABLE_MALICIOUS */
  2449. (void) destroy_sending_channel (get_peer_ctx (sub->peer_map,
  2450. peer));
  2451. #endif /* ENABLE_MALICIOUS */
  2452. }
  2453. if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (sub->peer_map,
  2454. peer))
  2455. {
  2456. /* Peer was already removed by callback on destroyed channel */
  2457. LOG (GNUNET_ERROR_TYPE_WARNING,
  2458. "Peer was removed from our knowledge during cleanup\n");
  2459. return;
  2460. }
  2461. if ((GNUNET_NO == check_peer_send_intention (get_peer_ctx (sub->peer_map,
  2462. peer))) &&
  2463. (GNUNET_NO == View_contains_peer (sub->view, peer)) &&
  2464. (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
  2465. (GNUNET_NO == CustomPeerMap_contains_peer (sub->pull_map, peer)) &&
  2466. (0 == RPS_sampler_count_id (sub->sampler, peer)) &&
  2467. (GNUNET_YES == check_removable (get_peer_ctx (sub->peer_map, peer))))
  2468. { /* We can safely remove this peer */
  2469. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2470. "Going to remove peer %s\n",
  2471. GNUNET_i2s (peer));
  2472. remove_peer (sub, peer);
  2473. return;
  2474. }
  2475. }
  2476. /**
  2477. * @brief This is called when a channel is destroyed.
  2478. *
  2479. * Removes peer completely from our knowledge if the send_channel was destroyed
  2480. * Otherwise simply delete the recv_channel
  2481. * Also check if the knowledge about this peer is still needed.
  2482. * If not, remove this peer from our knowledge.
  2483. *
  2484. * @param cls The closure - Context to the channel
  2485. * @param channel The channel being closed
  2486. */
  2487. static void
  2488. cleanup_destroyed_channel (void *cls,
  2489. const struct GNUNET_CADET_Channel *channel)
  2490. {
  2491. struct ChannelCtx *channel_ctx = cls;
  2492. struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
  2493. (void) channel;
  2494. channel_ctx->channel = NULL;
  2495. if ((NULL != peer_ctx) &&
  2496. (peer_ctx->send_channel_ctx == channel_ctx) &&
  2497. (GNUNET_YES == check_sending_channel_needed (peer_ctx)) )
  2498. {
  2499. remove_channel_ctx (channel_ctx);
  2500. remove_peer (peer_ctx->sub, &peer_ctx->peer_id);
  2501. }
  2502. else
  2503. {
  2504. /* We need this if-else construct because we need to make sure the channel
  2505. * (context) is cleaned up before removing the peer, but still need to
  2506. * compare it while checking the condition */
  2507. remove_channel_ctx (channel_ctx);
  2508. }
  2509. }
  2510. /***********************************************************************
  2511. * /Util functions
  2512. ***********************************************************************/
  2513. /***********************************************************************
  2514. * Sub
  2515. ***********************************************************************/
  2516. /**
  2517. * @brief Create a new Sub
  2518. *
  2519. * @param hash Hash of value shared among rps instances on other hosts that
  2520. * defines a subgroup to sample from.
  2521. * @param sampler_size Size of the sampler
  2522. * @param round_interval Interval (in average) between two rounds
  2523. *
  2524. * @return Sub
  2525. */
  2526. struct Sub *
  2527. new_sub (const struct GNUNET_HashCode *hash,
  2528. uint32_t sampler_size,
  2529. struct GNUNET_TIME_Relative round_interval)
  2530. {
  2531. struct Sub *sub;
  2532. sub = GNUNET_new (struct Sub);
  2533. /* With the hash generated from the secret value this service only connects
  2534. * to rps instances that share the value */
  2535. struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
  2536. GNUNET_MQ_hd_fixed_size (peer_check,
  2537. GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
  2538. struct GNUNET_MessageHeader,
  2539. NULL),
  2540. GNUNET_MQ_hd_fixed_size (peer_push,
  2541. GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
  2542. struct GNUNET_MessageHeader,
  2543. NULL),
  2544. GNUNET_MQ_hd_fixed_size (peer_pull_request,
  2545. GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
  2546. struct GNUNET_MessageHeader,
  2547. NULL),
  2548. GNUNET_MQ_hd_var_size (peer_pull_reply,
  2549. GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
  2550. struct GNUNET_RPS_P2P_PullReplyMessage,
  2551. NULL),
  2552. GNUNET_MQ_handler_end ()
  2553. };
  2554. sub->hash = *hash;
  2555. sub->cadet_port =
  2556. GNUNET_CADET_open_port (cadet_handle,
  2557. &sub->hash,
  2558. &handle_inbound_channel, /* Connect handler */
  2559. sub, /* cls */
  2560. NULL, /* WindowSize handler */
  2561. &cleanup_destroyed_channel, /* Disconnect handler */
  2562. cadet_handlers);
  2563. if (NULL == sub->cadet_port)
  2564. {
  2565. LOG (GNUNET_ERROR_TYPE_ERROR,
  2566. "Cadet port `%s' is already in use.\n",
  2567. GNUNET_APPLICATION_PORT_RPS);
  2568. GNUNET_assert (0);
  2569. }
  2570. /* Set up general data structure to keep track about peers */
  2571. sub->valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
  2572. if (GNUNET_OK !=
  2573. GNUNET_CONFIGURATION_get_value_filename (cfg,
  2574. "rps",
  2575. "FILENAME_VALID_PEERS",
  2576. &sub->filename_valid_peers))
  2577. {
  2578. GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
  2579. "rps",
  2580. "FILENAME_VALID_PEERS");
  2581. }
  2582. if (0 != strncmp ("DISABLE", sub->filename_valid_peers, 7))
  2583. {
  2584. char *tmp_filename_valid_peers;
  2585. char str_hash[105];
  2586. GNUNET_snprintf (str_hash,
  2587. sizeof(str_hash), "%s",
  2588. GNUNET_h2s_full (hash));
  2589. tmp_filename_valid_peers = sub->filename_valid_peers;
  2590. GNUNET_asprintf (&sub->filename_valid_peers,
  2591. "%s%s",
  2592. tmp_filename_valid_peers,
  2593. str_hash);
  2594. GNUNET_free (tmp_filename_valid_peers);
  2595. }
  2596. sub->peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
  2597. /* Set up the sampler */
  2598. sub->sampler_size_est_min = sampler_size;
  2599. sub->sampler_size_est_need = sampler_size;;
  2600. LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sub->sampler_size_est_min);
  2601. GNUNET_assert (0 != round_interval.rel_value_us);
  2602. sub->round_interval = round_interval;
  2603. sub->sampler = RPS_sampler_init (sampler_size,
  2604. round_interval);
  2605. /* Logging of internals */
  2606. #ifdef TO_FILE_FULL
  2607. // FIXME: The service cannot know the index, which is required by this
  2608. // function:
  2609. // sub->file_name_view_log = store_prefix_file_name (&own_identity, "view");
  2610. #endif /* TO_FILE_FULL */
  2611. #ifdef TO_FILE
  2612. #ifdef TO_FILE_FULL
  2613. // FIXME: The service cannot know the index, which is required by this
  2614. // function:
  2615. // sub->file_name_observed_log = store_prefix_file_name (&own_identity,
  2616. // "observed");
  2617. #endif /* TO_FILE_FULL */
  2618. sub->num_observed_peers = 0;
  2619. sub->observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1,
  2620. GNUNET_NO);
  2621. #endif /* TO_FILE */
  2622. /* Set up data structures for gossip */
  2623. sub->push_map = CustomPeerMap_create (4);
  2624. sub->pull_map = CustomPeerMap_create (4);
  2625. sub->view_size_est_min = sampler_size;;
  2626. sub->view = View_create (sub->view_size_est_min);
  2627. if (sub == msub)
  2628. {
  2629. GNUNET_STATISTICS_set (stats,
  2630. "view size aim",
  2631. sub->view_size_est_min,
  2632. GNUNET_NO);
  2633. }
  2634. /* Start executing rounds */
  2635. sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
  2636. return sub;
  2637. }
  2638. #ifdef TO_FILE
  2639. // /**
  2640. // * @brief Write all numbers in the given array into the given file
  2641. // *
  2642. // * Single numbers divided by a newline
  2643. // *
  2644. // * FIXME: The call to store_prefix_file_name expects the index of the peer,
  2645. // * which cannot be known to the service.
  2646. // * Write a dedicated function that uses the peer id.
  2647. // *
  2648. // * @param hist_array[] the array to dump
  2649. // * @param file_name file to dump into
  2650. // */
  2651. // static void
  2652. // write_histogram_to_file (const uint32_t hist_array[],
  2653. // const char *file_name)
  2654. // {
  2655. // char collect_str[SIZE_DUMP_FILE + 1] = "";
  2656. // char *recv_str_iter;
  2657. // char *file_name_full;
  2658. //
  2659. // recv_str_iter = collect_str;
  2660. // file_name_full = store_prefix_file_name (&own_identity,
  2661. // file_name);
  2662. // for (uint32_t i = 0; i < HISTOGRAM_FILE_SLOTS; i++)
  2663. // {
  2664. // char collect_str_tmp[8];
  2665. //
  2666. // GNUNET_snprintf (collect_str_tmp,
  2667. // sizeof(collect_str_tmp),
  2668. // "%" PRIu32 "\n",
  2669. // hist_array[i]);
  2670. // recv_str_iter = stpncpy (recv_str_iter,
  2671. // collect_str_tmp,
  2672. // 6);
  2673. // }
  2674. // (void) stpcpy (recv_str_iter,
  2675. // "\n");
  2676. // LOG (GNUNET_ERROR_TYPE_DEBUG,
  2677. // "Writing push stats to disk\n");
  2678. // to_file_w_len (file_name_full,
  2679. // SIZE_DUMP_FILE, "%s",
  2680. // collect_str);
  2681. // GNUNET_free (file_name_full);
  2682. // }
  2683. #endif /* TO_FILE */
  2684. /**
  2685. * @brief Destroy Sub.
  2686. *
  2687. * @param sub Sub to destroy
  2688. */
  2689. static void
  2690. destroy_sub (struct Sub *sub)
  2691. {
  2692. GNUNET_assert (NULL != sub);
  2693. GNUNET_assert (NULL != sub->do_round_task);
  2694. GNUNET_SCHEDULER_cancel (sub->do_round_task);
  2695. sub->do_round_task = NULL;
  2696. /* Disconnect from cadet */
  2697. GNUNET_CADET_close_port (sub->cadet_port);
  2698. sub->cadet_port = NULL;
  2699. /* Clean up data structures for peers */
  2700. RPS_sampler_destroy (sub->sampler);
  2701. sub->sampler = NULL;
  2702. View_destroy (sub->view);
  2703. sub->view = NULL;
  2704. CustomPeerMap_destroy (sub->push_map);
  2705. sub->push_map = NULL;
  2706. CustomPeerMap_destroy (sub->pull_map);
  2707. sub->pull_map = NULL;
  2708. peers_terminate (sub);
  2709. /* Free leftover data structures */
  2710. #ifdef TO_FILE_FULL
  2711. GNUNET_free (sub->file_name_view_log);
  2712. sub->file_name_view_log = NULL;
  2713. #endif /* TO_FILE_FULL */
  2714. #ifdef TO_FILE
  2715. #ifdef TO_FILE_FULL
  2716. GNUNET_free (sub->file_name_observed_log);
  2717. sub->file_name_observed_log = NULL;
  2718. #endif /* TO_FILE_FULL */
  2719. // FIXME: Currently this calls malfunctionning code
  2720. // /* Write push frequencies to disk */
  2721. // write_histogram_to_file (sub->push_recv,
  2722. // "push_recv");
  2723. // /* Write push deltas to disk */
  2724. // write_histogram_to_file (sub->push_delta,
  2725. // "push_delta");
  2726. // /* Write pull delays to disk */
  2727. // write_histogram_to_file (sub->pull_delays,
  2728. // "pull_delays");
  2729. GNUNET_CONTAINER_multipeermap_destroy (sub->observed_unique_peers);
  2730. sub->observed_unique_peers = NULL;
  2731. #endif /* TO_FILE */
  2732. GNUNET_free (sub);
  2733. }
  2734. /***********************************************************************
  2735. * /Sub
  2736. ***********************************************************************/
  2737. /***********************************************************************
  2738. * Core handlers
  2739. ***********************************************************************/
  2740. /**
  2741. * @brief Callback on initialisation of Core.
  2742. *
  2743. * @param cls - unused
  2744. * @param my_identity - unused
  2745. */
  2746. void
  2747. core_init (void *cls,
  2748. const struct GNUNET_PeerIdentity *my_identity)
  2749. {
  2750. (void) cls;
  2751. (void) my_identity;
  2752. map_single_hop = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
  2753. }
  2754. /**
  2755. * @brief Callback for core.
  2756. * Method called whenever a given peer connects.
  2757. *
  2758. * @param cls closure - unused
  2759. * @param peer peer identity this notification is about
  2760. * @return closure given to #core_disconnects as peer_cls
  2761. */
  2762. void *
  2763. core_connects (void *cls,
  2764. const struct GNUNET_PeerIdentity *peer,
  2765. struct GNUNET_MQ_Handle *mq)
  2766. {
  2767. (void) cls;
  2768. (void) mq;
  2769. GNUNET_assert (GNUNET_YES ==
  2770. GNUNET_CONTAINER_multipeermap_put (map_single_hop,
  2771. peer,
  2772. NULL,
  2773. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
  2774. return NULL;
  2775. }
  2776. /**
  2777. * @brief Callback for core.
  2778. * Method called whenever a peer disconnects.
  2779. *
  2780. * @param cls closure - unused
  2781. * @param peer peer identity this notification is about
  2782. * @param peer_cls closure given in #core_connects - unused
  2783. */
  2784. void
  2785. core_disconnects (void *cls,
  2786. const struct GNUNET_PeerIdentity *peer,
  2787. void *peer_cls)
  2788. {
  2789. (void) cls;
  2790. (void) peer_cls;
  2791. GNUNET_CONTAINER_multipeermap_remove_all (map_single_hop, peer);
  2792. }
  2793. /***********************************************************************
  2794. * /Core handlers
  2795. ***********************************************************************/
  2796. /**
  2797. * @brief Destroy the context for a (connected) client
  2798. *
  2799. * @param cli_ctx Context to destroy
  2800. */
  2801. static void
  2802. destroy_cli_ctx (struct ClientContext *cli_ctx)
  2803. {
  2804. GNUNET_assert (NULL != cli_ctx);
  2805. GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
  2806. cli_ctx_tail,
  2807. cli_ctx);
  2808. if (NULL != cli_ctx->sub)
  2809. {
  2810. destroy_sub (cli_ctx->sub);
  2811. cli_ctx->sub = NULL;
  2812. }
  2813. GNUNET_free (cli_ctx);
  2814. }
  2815. /**
  2816. * @brief Update sizes in sampler and view on estimate update from nse service
  2817. *
  2818. * @param sub Sub
  2819. * @param logestimate the log(Base 2) value of the current network size estimate
  2820. * @param std_dev standard deviation for the estimate
  2821. */
  2822. static void
  2823. adapt_sizes (struct Sub *sub, double logestimate, double std_dev)
  2824. {
  2825. double estimate;
  2826. // double scale; // TODO this might go global/config
  2827. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2828. "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
  2829. logestimate, std_dev, RPS_sampler_get_size (sub->sampler));
  2830. // scale = .01;
  2831. estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
  2832. // GNUNET_NSE_log_estimate_to_n (logestimate);
  2833. estimate = pow (estimate, 1.0 / 3);
  2834. // TODO add if std_dev is a number
  2835. // estimate += (std_dev * scale);
  2836. if (sub->view_size_est_min < ceil (estimate))
  2837. {
  2838. LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
  2839. sub->sampler_size_est_need = estimate;
  2840. sub->view_size_est_need = estimate;
  2841. }
  2842. else
  2843. {
  2844. LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
  2845. // sub->sampler_size_est_need = sub->view_size_est_min;
  2846. sub->view_size_est_need = sub->view_size_est_min;
  2847. }
  2848. if (sub == msub)
  2849. {
  2850. GNUNET_STATISTICS_set (stats,
  2851. "view size aim",
  2852. sub->view_size_est_need,
  2853. GNUNET_NO);
  2854. }
  2855. /* If the NSE has changed adapt the lists accordingly */
  2856. resize_wrapper (sub->sampler, sub->sampler_size_est_need);
  2857. View_change_len (sub->view, sub->view_size_est_need);
  2858. }
  2859. /**
  2860. * Function called by NSE.
  2861. *
  2862. * Updates sizes of sampler list and view and adapt those lists
  2863. * accordingly.
  2864. *
  2865. * implements #GNUNET_NSE_Callback
  2866. *
  2867. * @param cls Closure - unused
  2868. * @param timestamp time when the estimate was received from the server (or created by the server)
  2869. * @param logestimate the log(Base 2) value of the current network size estimate
  2870. * @param std_dev standard deviation for the estimate
  2871. */
  2872. static void
  2873. nse_callback (void *cls,
  2874. struct GNUNET_TIME_Absolute timestamp,
  2875. double logestimate, double std_dev)
  2876. {
  2877. (void) cls;
  2878. (void) timestamp;
  2879. struct ClientContext *cli_ctx_iter;
  2880. adapt_sizes (msub, logestimate, std_dev);
  2881. for (cli_ctx_iter = cli_ctx_head;
  2882. NULL != cli_ctx_iter;
  2883. cli_ctx_iter = cli_ctx_iter->next)
  2884. {
  2885. if (NULL != cli_ctx_iter->sub)
  2886. {
  2887. adapt_sizes (cli_ctx_iter->sub, logestimate, std_dev);
  2888. }
  2889. }
  2890. }
  2891. /**
  2892. * @brief This function is called, when the client seeds peers.
  2893. * It verifies that @a msg is well-formed.
  2894. *
  2895. * @param cls the closure (#ClientContext)
  2896. * @param msg the message
  2897. * @return #GNUNET_OK if @a msg is well-formed
  2898. * #GNUNET_SYSERR otherwise
  2899. */
  2900. static int
  2901. check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
  2902. {
  2903. struct ClientContext *cli_ctx = cls;
  2904. uint16_t msize = ntohs (msg->header.size);
  2905. uint32_t num_peers = ntohl (msg->num_peers);
  2906. msize -= sizeof(struct GNUNET_RPS_CS_SeedMessage);
  2907. if ((msize / sizeof(struct GNUNET_PeerIdentity) != num_peers) ||
  2908. (msize % sizeof(struct GNUNET_PeerIdentity) != 0))
  2909. {
  2910. LOG (GNUNET_ERROR_TYPE_ERROR,
  2911. "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
  2912. ntohl (msg->num_peers),
  2913. (msize / sizeof(struct GNUNET_PeerIdentity)));
  2914. GNUNET_break (0);
  2915. GNUNET_SERVICE_client_drop (cli_ctx->client);
  2916. return GNUNET_SYSERR;
  2917. }
  2918. return GNUNET_OK;
  2919. }
  2920. /**
  2921. * Handle seed from the client.
  2922. *
  2923. * @param cls closure
  2924. * @param message the actual message
  2925. */
  2926. static void
  2927. handle_client_seed (void *cls,
  2928. const struct GNUNET_RPS_CS_SeedMessage *msg)
  2929. {
  2930. struct ClientContext *cli_ctx = cls;
  2931. struct GNUNET_PeerIdentity *peers;
  2932. uint32_t num_peers;
  2933. uint32_t i;
  2934. num_peers = ntohl (msg->num_peers);
  2935. peers = (struct GNUNET_PeerIdentity *) &msg[1];
  2936. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2937. "Client seeded peers:\n");
  2938. print_peer_list (peers, num_peers);
  2939. for (i = 0; i < num_peers; i++)
  2940. {
  2941. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2942. "Updating samplers with seed %" PRIu32 ": %s\n",
  2943. i,
  2944. GNUNET_i2s (&peers[i]));
  2945. if (NULL != msub)
  2946. got_peer (msub, &peers[i]); /* Condition needed? */
  2947. if (NULL != cli_ctx->sub)
  2948. got_peer (cli_ctx->sub, &peers[i]);
  2949. }
  2950. GNUNET_SERVICE_client_continue (cli_ctx->client);
  2951. }
  2952. /**
  2953. * Handle RPS request from the client.
  2954. *
  2955. * @param cls Client context
  2956. * @param message Message containing the number of updates the client wants to
  2957. * receive
  2958. */
  2959. static void
  2960. handle_client_view_request (void *cls,
  2961. const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
  2962. {
  2963. struct ClientContext *cli_ctx = cls;
  2964. uint64_t num_updates;
  2965. num_updates = ntohl (msg->num_updates);
  2966. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2967. "Client requested %" PRIu64 " updates of view.\n",
  2968. num_updates);
  2969. GNUNET_assert (NULL != cli_ctx);
  2970. cli_ctx->view_updates_left = num_updates;
  2971. send_view (cli_ctx, NULL, 0);
  2972. GNUNET_SERVICE_client_continue (cli_ctx->client);
  2973. }
  2974. /**
  2975. * @brief Handle the cancellation of the view updates.
  2976. *
  2977. * @param cls The client context
  2978. * @param msg Unused
  2979. */
  2980. static void
  2981. handle_client_view_cancel (void *cls,
  2982. const struct GNUNET_MessageHeader *msg)
  2983. {
  2984. struct ClientContext *cli_ctx = cls;
  2985. (void) msg;
  2986. LOG (GNUNET_ERROR_TYPE_DEBUG,
  2987. "Client does not want to receive updates of view any more.\n");
  2988. GNUNET_assert (NULL != cli_ctx);
  2989. cli_ctx->view_updates_left = 0;
  2990. GNUNET_SERVICE_client_continue (cli_ctx->client);
  2991. if (GNUNET_YES == cli_ctx->stream_update)
  2992. {
  2993. destroy_cli_ctx (cli_ctx);
  2994. }
  2995. }
  2996. /**
  2997. * Handle RPS request for biased stream from the client.
  2998. *
  2999. * @param cls Client context
  3000. * @param message unused
  3001. */
  3002. static void
  3003. handle_client_stream_request (void *cls,
  3004. const struct
  3005. GNUNET_RPS_CS_DEBUG_StreamRequest *msg)
  3006. {
  3007. struct ClientContext *cli_ctx = cls;
  3008. (void) msg;
  3009. LOG (GNUNET_ERROR_TYPE_DEBUG,
  3010. "Client requested peers from biased stream.\n");
  3011. cli_ctx->stream_update = GNUNET_YES;
  3012. GNUNET_assert (NULL != cli_ctx);
  3013. GNUNET_SERVICE_client_continue (cli_ctx->client);
  3014. }
  3015. /**
  3016. * @brief Handles the cancellation of the stream of biased peer ids
  3017. *
  3018. * @param cls The client context
  3019. * @param msg unused
  3020. */
  3021. static void
  3022. handle_client_stream_cancel (void *cls,
  3023. const struct GNUNET_MessageHeader *msg)
  3024. {
  3025. struct ClientContext *cli_ctx = cls;
  3026. (void) msg;
  3027. LOG (GNUNET_ERROR_TYPE_DEBUG,
  3028. "Client canceled receiving peers from biased stream.\n");
  3029. cli_ctx->stream_update = GNUNET_NO;
  3030. GNUNET_assert (NULL != cli_ctx);
  3031. GNUNET_SERVICE_client_continue (cli_ctx->client);
  3032. }
  3033. /**
  3034. * @brief Create and start a Sub.
  3035. *
  3036. * @param cls Closure - unused
  3037. * @param msg Message containing the necessary information
  3038. */
  3039. static void
  3040. handle_client_start_sub (void *cls,
  3041. const struct GNUNET_RPS_CS_SubStartMessage *msg)
  3042. {
  3043. struct ClientContext *cli_ctx = cls;
  3044. LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested start of a new sub.\n");
  3045. if ((NULL != cli_ctx->sub) &&
  3046. (0 != memcmp (&cli_ctx->sub->hash,
  3047. &msg->hash,
  3048. sizeof(struct GNUNET_HashCode))) )
  3049. {
  3050. LOG (GNUNET_ERROR_TYPE_WARNING,
  3051. "Already have a Sub with different share for this client. Remove old one, add new.\n");
  3052. destroy_sub (cli_ctx->sub);
  3053. cli_ctx->sub = NULL;
  3054. }
  3055. cli_ctx->sub = new_sub (&msg->hash,
  3056. msub->sampler_size_est_min, // TODO make api input?
  3057. GNUNET_TIME_relative_ntoh (msg->round_interval));
  3058. GNUNET_SERVICE_client_continue (cli_ctx->client);
  3059. }
  3060. /**
  3061. * @brief Destroy the Sub
  3062. *
  3063. * @param cls Closure - unused
  3064. * @param msg Message containing the hash that identifies the Sub
  3065. */
  3066. static void
  3067. handle_client_stop_sub (void *cls,
  3068. const struct GNUNET_RPS_CS_SubStopMessage *msg)
  3069. {
  3070. struct ClientContext *cli_ctx = cls;
  3071. GNUNET_assert (NULL != cli_ctx->sub);
  3072. if (0 != memcmp (&cli_ctx->sub->hash, &msg->hash, sizeof(struct
  3073. GNUNET_HashCode)))
  3074. {
  3075. LOG (GNUNET_ERROR_TYPE_WARNING,
  3076. "Share of current sub and request differ!\n");
  3077. }
  3078. destroy_sub (cli_ctx->sub);
  3079. cli_ctx->sub = NULL;
  3080. GNUNET_SERVICE_client_continue (cli_ctx->client);
  3081. }
  3082. /**
  3083. * Handle a CHECK_LIVE message from another peer.
  3084. *
  3085. * This does nothing. But without calling #GNUNET_CADET_receive_done()
  3086. * the channel is blocked for all other communication.
  3087. *
  3088. * @param cls Closure - Context of channel
  3089. * @param msg Message - unused
  3090. */
  3091. static void
  3092. handle_peer_check (void *cls,
  3093. const struct GNUNET_MessageHeader *msg)
  3094. {
  3095. const struct ChannelCtx *channel_ctx = cls;
  3096. const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
  3097. (void) msg;
  3098. LOG (GNUNET_ERROR_TYPE_DEBUG,
  3099. "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer));
  3100. if (channel_ctx->peer_ctx->sub == msub)
  3101. {
  3102. GNUNET_STATISTICS_update (stats,
  3103. "# pending online checks",
  3104. -1,
  3105. GNUNET_NO);
  3106. }
  3107. GNUNET_CADET_receive_done (channel_ctx->channel);
  3108. }
  3109. /**
  3110. * Handle a PUSH message from another peer.
  3111. *
  3112. * Check the proof of work and store the PeerID
  3113. * in the temporary list for pushed PeerIDs.
  3114. *
  3115. * @param cls Closure - Context of channel
  3116. * @param msg Message - unused
  3117. */
  3118. static void
  3119. handle_peer_push (void *cls,
  3120. const struct GNUNET_MessageHeader *msg)
  3121. {
  3122. const struct ChannelCtx *channel_ctx = cls;
  3123. const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
  3124. (void) msg;
  3125. // (check the proof of work (?))
  3126. LOG (GNUNET_ERROR_TYPE_DEBUG,
  3127. "Received PUSH (%s)\n",
  3128. GNUNET_i2s (peer));
  3129. if (channel_ctx->peer_ctx->sub == msub)
  3130. {
  3131. GNUNET_STATISTICS_update (stats, "# push message received", 1, GNUNET_NO);
  3132. if ((NULL != map_single_hop) &&
  3133. (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
  3134. peer)))
  3135. {
  3136. GNUNET_STATISTICS_update (stats,
  3137. "# push message received (multi-hop peer)",
  3138. 1,
  3139. GNUNET_NO);
  3140. }
  3141. }
  3142. #if ENABLE_MALICIOUS
  3143. struct AttackedPeer *tmp_att_peer;
  3144. if ((1 == mal_type) ||
  3145. (3 == mal_type))
  3146. { /* Try to maximise representation */
  3147. tmp_att_peer = GNUNET_new (struct AttackedPeer);
  3148. tmp_att_peer->peer_id = *peer;
  3149. if (NULL == att_peer_set)
  3150. att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
  3151. if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
  3152. peer))
  3153. {
  3154. GNUNET_CONTAINER_DLL_insert (att_peers_head,
  3155. att_peers_tail,
  3156. tmp_att_peer);
  3157. add_peer_array_to_set (peer, 1, att_peer_set);
  3158. }
  3159. else
  3160. {
  3161. GNUNET_free (tmp_att_peer);
  3162. }
  3163. }
  3164. else if (2 == mal_type)
  3165. {
  3166. /* We attack one single well-known peer - simply ignore */
  3167. }
  3168. #endif /* ENABLE_MALICIOUS */
  3169. /* Add the sending peer to the push_map */
  3170. CustomPeerMap_put (channel_ctx->peer_ctx->sub->push_map, peer);
  3171. GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
  3172. &channel_ctx->peer_ctx->peer_id));
  3173. GNUNET_CADET_receive_done (channel_ctx->channel);
  3174. }
  3175. /**
  3176. * Handle PULL REQUEST request message from another peer.
  3177. *
  3178. * Reply with the view of PeerIDs.
  3179. *
  3180. * @param cls Closure - Context of channel
  3181. * @param msg Message - unused
  3182. */
  3183. static void
  3184. handle_peer_pull_request (void *cls,
  3185. const struct GNUNET_MessageHeader *msg)
  3186. {
  3187. const struct ChannelCtx *channel_ctx = cls;
  3188. struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
  3189. const struct GNUNET_PeerIdentity *peer = &peer_ctx->peer_id;
  3190. const struct GNUNET_PeerIdentity *view_array;
  3191. (void) msg;
  3192. LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (
  3193. peer));
  3194. if (peer_ctx->sub == msub)
  3195. {
  3196. GNUNET_STATISTICS_update (stats,
  3197. "# pull request message received",
  3198. 1,
  3199. GNUNET_NO);
  3200. if ((NULL != map_single_hop) &&
  3201. (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
  3202. &peer_ctx->peer_id)))
  3203. {
  3204. GNUNET_STATISTICS_update (stats,
  3205. "# pull request message received (multi-hop peer)",
  3206. 1,
  3207. GNUNET_NO);
  3208. }
  3209. }
  3210. #if ENABLE_MALICIOUS
  3211. if ((1 == mal_type)
  3212. || (3 == mal_type))
  3213. { /* Try to maximise representation */
  3214. send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
  3215. }
  3216. else if (2 == mal_type)
  3217. { /* Try to partition network */
  3218. if (0 == GNUNET_memcmp (&attacked_peer, peer))
  3219. {
  3220. send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
  3221. }
  3222. }
  3223. #endif /* ENABLE_MALICIOUS */
  3224. GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
  3225. &channel_ctx->peer_ctx->peer_id));
  3226. GNUNET_CADET_receive_done (channel_ctx->channel);
  3227. view_array = View_get_as_array (channel_ctx->peer_ctx->sub->view);
  3228. send_pull_reply (peer_ctx,
  3229. view_array,
  3230. View_size (channel_ctx->peer_ctx->sub->view));
  3231. }
  3232. /**
  3233. * Check whether we sent a corresponding request and
  3234. * whether this reply is the first one.
  3235. *
  3236. * @param cls Closure - Context of channel
  3237. * @param msg Message containing the replied peers
  3238. */
  3239. static int
  3240. check_peer_pull_reply (void *cls,
  3241. const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
  3242. {
  3243. struct ChannelCtx *channel_ctx = cls;
  3244. struct PeerContext *sender_ctx = channel_ctx->peer_ctx;
  3245. if (sizeof(struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->header.size))
  3246. {
  3247. GNUNET_break_op (0);
  3248. return GNUNET_SYSERR;
  3249. }
  3250. if ((ntohs (msg->header.size) - sizeof(struct
  3251. GNUNET_RPS_P2P_PullReplyMessage))
  3252. / sizeof(struct GNUNET_PeerIdentity) != ntohl (msg->num_peers))
  3253. {
  3254. LOG (GNUNET_ERROR_TYPE_ERROR,
  3255. "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
  3256. ntohl (msg->num_peers),
  3257. (ntohs (msg->header.size) - sizeof(struct
  3258. GNUNET_RPS_P2P_PullReplyMessage))
  3259. / sizeof(struct GNUNET_PeerIdentity));
  3260. GNUNET_break_op (0);
  3261. return GNUNET_SYSERR;
  3262. }
  3263. if (GNUNET_YES != check_peer_flag (sender_ctx->sub->peer_map,
  3264. &sender_ctx->peer_id,
  3265. Peers_PULL_REPLY_PENDING))
  3266. {
  3267. LOG (GNUNET_ERROR_TYPE_WARNING,
  3268. "Received a pull reply from a peer (%s) we didn't request one from!\n",
  3269. GNUNET_i2s (&sender_ctx->peer_id));
  3270. if (sender_ctx->sub == msub)
  3271. {
  3272. GNUNET_STATISTICS_update (stats,
  3273. "# unrequested pull replies",
  3274. 1,
  3275. GNUNET_NO);
  3276. }
  3277. }
  3278. return GNUNET_OK;
  3279. }
  3280. /**
  3281. * Handle PULL REPLY message from another peer.
  3282. *
  3283. * @param cls Closure
  3284. * @param msg The message header
  3285. */
  3286. static void
  3287. handle_peer_pull_reply (void *cls,
  3288. const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
  3289. {
  3290. const struct ChannelCtx *channel_ctx = cls;
  3291. const struct GNUNET_PeerIdentity *sender = &channel_ctx->peer_ctx->peer_id;
  3292. const struct GNUNET_PeerIdentity *peers;
  3293. struct Sub *sub = channel_ctx->peer_ctx->sub;
  3294. uint32_t i;
  3295. #if ENABLE_MALICIOUS
  3296. struct AttackedPeer *tmp_att_peer;
  3297. #endif /* ENABLE_MALICIOUS */
  3298. sub->pull_delays[sub->num_rounds - channel_ctx->peer_ctx->round_pull_req]++;
  3299. LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (
  3300. sender));
  3301. if (channel_ctx->peer_ctx->sub == msub)
  3302. {
  3303. GNUNET_STATISTICS_update (stats,
  3304. "# pull reply messages received",
  3305. 1,
  3306. GNUNET_NO);
  3307. if ((NULL != map_single_hop) &&
  3308. (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
  3309. &channel_ctx->
  3310. peer_ctx->peer_id)) )
  3311. {
  3312. GNUNET_STATISTICS_update (stats,
  3313. "# pull reply messages received (multi-hop peer)",
  3314. 1,
  3315. GNUNET_NO);
  3316. }
  3317. }
  3318. #if ENABLE_MALICIOUS
  3319. // We shouldn't even receive pull replies as we're not sending
  3320. if (2 == mal_type)
  3321. {
  3322. }
  3323. #endif /* ENABLE_MALICIOUS */
  3324. /* Do actual logic */
  3325. peers = (const struct GNUNET_PeerIdentity *) &msg[1];
  3326. LOG (GNUNET_ERROR_TYPE_DEBUG,
  3327. "PULL REPLY received, got following %u peers:\n",
  3328. ntohl (msg->num_peers));
  3329. for (i = 0; i < ntohl (msg->num_peers); i++)
  3330. {
  3331. LOG (GNUNET_ERROR_TYPE_DEBUG,
  3332. "%u. %s\n",
  3333. i,
  3334. GNUNET_i2s (&peers[i]));
  3335. #if ENABLE_MALICIOUS
  3336. if ((NULL != att_peer_set) &&
  3337. ((1 == mal_type) || (3 == mal_type) ))
  3338. { /* Add attacked peer to local list */
  3339. // TODO check if we sent a request and this was the first reply
  3340. if ((GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
  3341. &peers[i]))
  3342. && (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
  3343. &peers[i])) )
  3344. {
  3345. tmp_att_peer = GNUNET_new (struct AttackedPeer);
  3346. tmp_att_peer->peer_id = peers[i];
  3347. GNUNET_CONTAINER_DLL_insert (att_peers_head,
  3348. att_peers_tail,
  3349. tmp_att_peer);
  3350. add_peer_array_to_set (&peers[i], 1, att_peer_set);
  3351. }
  3352. continue;
  3353. }
  3354. #endif /* ENABLE_MALICIOUS */
  3355. /* Make sure we 'know' about this peer */
  3356. (void) insert_peer (channel_ctx->peer_ctx->sub,
  3357. &peers[i]);
  3358. if (GNUNET_YES == check_peer_valid (channel_ctx->peer_ctx->sub->valid_peers,
  3359. &peers[i]))
  3360. {
  3361. CustomPeerMap_put (channel_ctx->peer_ctx->sub->pull_map,
  3362. &peers[i]);
  3363. }
  3364. else
  3365. {
  3366. schedule_operation (channel_ctx->peer_ctx,
  3367. insert_in_pull_map,
  3368. channel_ctx->peer_ctx->sub); /* cls */
  3369. (void) issue_peer_online_check (channel_ctx->peer_ctx->sub,
  3370. &peers[i]);
  3371. }
  3372. }
  3373. UNSET_PEER_FLAG (get_peer_ctx (channel_ctx->peer_ctx->sub->peer_map,
  3374. sender),
  3375. Peers_PULL_REPLY_PENDING);
  3376. clean_peer (channel_ctx->peer_ctx->sub,
  3377. sender);
  3378. GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
  3379. sender));
  3380. GNUNET_CADET_receive_done (channel_ctx->channel);
  3381. }
  3382. /**
  3383. * Compute a random delay.
  3384. * A uniformly distributed value between mean + spread and mean - spread.
  3385. *
  3386. * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
  3387. * It would return a random value between 2 and 6 min.
  3388. *
  3389. * @param mean the mean time until the next round
  3390. * @param spread the inverse amount of deviation from the mean
  3391. */
  3392. static struct GNUNET_TIME_Relative
  3393. compute_rand_delay (struct GNUNET_TIME_Relative mean,
  3394. unsigned int spread)
  3395. {
  3396. struct GNUNET_TIME_Relative half_interval;
  3397. struct GNUNET_TIME_Relative ret;
  3398. unsigned int rand_delay;
  3399. unsigned int max_rand_delay;
  3400. if (0 == spread)
  3401. {
  3402. LOG (GNUNET_ERROR_TYPE_WARNING,
  3403. "Not accepting spread of 0\n");
  3404. GNUNET_break (0);
  3405. GNUNET_assert (0);
  3406. }
  3407. GNUNET_assert (0 != mean.rel_value_us);
  3408. /* Compute random time value between spread * mean and spread * mean */
  3409. half_interval = GNUNET_TIME_relative_divide (mean, spread);
  3410. max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us
  3411. / mean.rel_value_us * (2 / spread);
  3412. /**
  3413. * Compute random value between (0 and 1) * round_interval
  3414. * via multiplying round_interval with a 'fraction' (0 to value)/value
  3415. */
  3416. rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
  3417. max_rand_delay);
  3418. ret = GNUNET_TIME_relative_saturating_multiply (mean, rand_delay);
  3419. ret = GNUNET_TIME_relative_divide (ret, max_rand_delay);
  3420. ret = GNUNET_TIME_relative_add (ret, half_interval);
  3421. if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
  3422. LOG (GNUNET_ERROR_TYPE_WARNING,
  3423. "Returning FOREVER_REL\n");
  3424. return ret;
  3425. }
  3426. /**
  3427. * Send single pull request
  3428. *
  3429. * @param peer_ctx Context to the peer to send request to
  3430. */
  3431. static void
  3432. send_pull_request (struct PeerContext *peer_ctx)
  3433. {
  3434. struct GNUNET_MQ_Envelope *ev;
  3435. GNUNET_assert (GNUNET_NO == check_peer_flag (peer_ctx->sub->peer_map,
  3436. &peer_ctx->peer_id,
  3437. Peers_PULL_REPLY_PENDING));
  3438. SET_PEER_FLAG (peer_ctx,
  3439. Peers_PULL_REPLY_PENDING);
  3440. peer_ctx->round_pull_req = peer_ctx->sub->num_rounds;
  3441. LOG (GNUNET_ERROR_TYPE_DEBUG,
  3442. "Going to send PULL REQUEST to peer %s.\n",
  3443. GNUNET_i2s (&peer_ctx->peer_id));
  3444. ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
  3445. send_message (peer_ctx,
  3446. ev,
  3447. "PULL REQUEST");
  3448. if (peer_ctx->sub)
  3449. {
  3450. GNUNET_STATISTICS_update (stats,
  3451. "# pull request send issued",
  3452. 1,
  3453. GNUNET_NO);
  3454. if ((NULL != map_single_hop) &&
  3455. (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
  3456. &peer_ctx->peer_id)))
  3457. {
  3458. GNUNET_STATISTICS_update (stats,
  3459. "# pull request send issued (multi-hop peer)",
  3460. 1,
  3461. GNUNET_NO);
  3462. }
  3463. }
  3464. }
  3465. /**
  3466. * Send single push
  3467. *
  3468. * @param peer_ctx Context of peer to send push to
  3469. */
  3470. static void
  3471. send_push (struct PeerContext *peer_ctx)
  3472. {
  3473. struct GNUNET_MQ_Envelope *ev;
  3474. LOG (GNUNET_ERROR_TYPE_DEBUG,
  3475. "Going to send PUSH to peer %s.\n",
  3476. GNUNET_i2s (&peer_ctx->peer_id));
  3477. ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
  3478. send_message (peer_ctx, ev, "PUSH");
  3479. if (peer_ctx->sub)
  3480. {
  3481. GNUNET_STATISTICS_update (stats,
  3482. "# push send issued",
  3483. 1,
  3484. GNUNET_NO);
  3485. if ((NULL != map_single_hop) &&
  3486. (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
  3487. &peer_ctx->peer_id)))
  3488. {
  3489. GNUNET_STATISTICS_update (stats,
  3490. "# push send issued (multi-hop peer)",
  3491. 1,
  3492. GNUNET_NO);
  3493. }
  3494. }
  3495. }
  3496. #if ENABLE_MALICIOUS
  3497. /**
  3498. * @brief This function is called, when the client tells us to act malicious.
  3499. * It verifies that @a msg is well-formed.
  3500. *
  3501. * @param cls the closure (#ClientContext)
  3502. * @param msg the message
  3503. * @return #GNUNET_OK if @a msg is well-formed
  3504. */
  3505. static int
  3506. check_client_act_malicious (void *cls,
  3507. const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
  3508. {
  3509. struct ClientContext *cli_ctx = cls;
  3510. uint16_t msize = ntohs (msg->header.size);
  3511. uint32_t num_peers = ntohl (msg->num_peers);
  3512. msize -= sizeof(struct GNUNET_RPS_CS_ActMaliciousMessage);
  3513. if ((msize / sizeof(struct GNUNET_PeerIdentity) != num_peers) ||
  3514. (msize % sizeof(struct GNUNET_PeerIdentity) != 0))
  3515. {
  3516. LOG (GNUNET_ERROR_TYPE_ERROR,
  3517. "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
  3518. ntohl (msg->num_peers),
  3519. (msize / sizeof(struct GNUNET_PeerIdentity)));
  3520. GNUNET_break (0);
  3521. GNUNET_SERVICE_client_drop (cli_ctx->client);
  3522. return GNUNET_SYSERR;
  3523. }
  3524. return GNUNET_OK;
  3525. }
  3526. /**
  3527. * Turn RPS service to act malicious.
  3528. *
  3529. * @param cls Closure
  3530. * @param client The client that sent the message
  3531. * @param msg The message header
  3532. */
  3533. static void
  3534. handle_client_act_malicious (void *cls,
  3535. const struct
  3536. GNUNET_RPS_CS_ActMaliciousMessage *msg)
  3537. {
  3538. struct ClientContext *cli_ctx = cls;
  3539. struct GNUNET_PeerIdentity *peers;
  3540. uint32_t num_mal_peers_sent;
  3541. uint32_t num_mal_peers_old;
  3542. struct Sub *sub = cli_ctx->sub;
  3543. if (NULL == sub)
  3544. sub = msub;
  3545. /* Do actual logic */
  3546. peers = (struct GNUNET_PeerIdentity *) &msg[1];
  3547. mal_type = ntohl (msg->type);
  3548. if (NULL == mal_peer_set)
  3549. mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
  3550. LOG (GNUNET_ERROR_TYPE_DEBUG,
  3551. "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
  3552. mal_type,
  3553. ntohl (msg->num_peers));
  3554. if (1 == mal_type)
  3555. { /* Try to maximise representation */
  3556. /* Add other malicious peers to those we already know */
  3557. num_mal_peers_sent = ntohl (msg->num_peers);
  3558. num_mal_peers_old = num_mal_peers;
  3559. GNUNET_array_grow (mal_peers,
  3560. num_mal_peers,
  3561. num_mal_peers + num_mal_peers_sent);
  3562. GNUNET_memcpy (&mal_peers[num_mal_peers_old],
  3563. peers,
  3564. num_mal_peers_sent * sizeof(struct GNUNET_PeerIdentity));
  3565. /* Add all mal peers to mal_peer_set */
  3566. add_peer_array_to_set (&mal_peers[num_mal_peers_old],
  3567. num_mal_peers_sent,
  3568. mal_peer_set);
  3569. /* Substitute do_round () with do_mal_round () */
  3570. GNUNET_assert (NULL != sub->do_round_task);
  3571. GNUNET_SCHEDULER_cancel (sub->do_round_task);
  3572. sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
  3573. }
  3574. else if ((2 == mal_type) ||
  3575. (3 == mal_type))
  3576. { /* Try to partition the network */
  3577. /* Add other malicious peers to those we already know */
  3578. num_mal_peers_sent = ntohl (msg->num_peers) - 1;
  3579. num_mal_peers_old = num_mal_peers;
  3580. GNUNET_assert (GNUNET_MAX_MALLOC_CHECKED > num_mal_peers_sent);
  3581. GNUNET_array_grow (mal_peers,
  3582. num_mal_peers,
  3583. num_mal_peers + num_mal_peers_sent);
  3584. if ((NULL != mal_peers) &&
  3585. (0 != num_mal_peers) )
  3586. {
  3587. GNUNET_memcpy (&mal_peers[num_mal_peers_old],
  3588. peers,
  3589. num_mal_peers_sent * sizeof(struct GNUNET_PeerIdentity));
  3590. /* Add all mal peers to mal_peer_set */
  3591. add_peer_array_to_set (&mal_peers[num_mal_peers_old],
  3592. num_mal_peers_sent,
  3593. mal_peer_set);
  3594. }
  3595. /* Store the one attacked peer */
  3596. GNUNET_memcpy (&attacked_peer,
  3597. &msg->attacked_peer,
  3598. sizeof(struct GNUNET_PeerIdentity));
  3599. /* Set the flag of the attacked peer to valid to avoid problems */
  3600. if (GNUNET_NO == check_peer_known (sub->peer_map, &attacked_peer))
  3601. {
  3602. (void) issue_peer_online_check (sub, &attacked_peer);
  3603. }
  3604. LOG (GNUNET_ERROR_TYPE_DEBUG,
  3605. "Attacked peer is %s\n",
  3606. GNUNET_i2s (&attacked_peer));
  3607. /* Substitute do_round () with do_mal_round () */
  3608. if (NULL != sub->do_round_task)
  3609. {
  3610. /* Probably in shutdown */
  3611. GNUNET_SCHEDULER_cancel (sub->do_round_task);
  3612. sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
  3613. }
  3614. }
  3615. else if (0 == mal_type)
  3616. { /* Stop acting malicious */
  3617. GNUNET_array_grow (mal_peers, num_mal_peers, 0);
  3618. /* Substitute do_mal_round () with do_round () */
  3619. GNUNET_SCHEDULER_cancel (sub->do_round_task);
  3620. sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
  3621. }
  3622. else
  3623. {
  3624. GNUNET_break (0);
  3625. GNUNET_SERVICE_client_continue (cli_ctx->client);
  3626. }
  3627. GNUNET_SERVICE_client_continue (cli_ctx->client);
  3628. }
  3629. /**
  3630. * Send out PUSHes and PULLs maliciously.
  3631. *
  3632. * This is executed regylary.
  3633. *
  3634. * @param cls Closure - Sub
  3635. */
  3636. static void
  3637. do_mal_round (void *cls)
  3638. {
  3639. uint32_t num_pushes;
  3640. uint32_t i;
  3641. struct GNUNET_TIME_Relative time_next_round;
  3642. struct AttackedPeer *tmp_att_peer;
  3643. struct Sub *sub = cls;
  3644. LOG (GNUNET_ERROR_TYPE_DEBUG,
  3645. "Going to execute next round maliciously type %" PRIu32 ".\n",
  3646. mal_type);
  3647. sub->do_round_task = NULL;
  3648. GNUNET_assert (mal_type <= 3);
  3649. /* Do malicious actions */
  3650. if (1 == mal_type)
  3651. { /* Try to maximise representation */
  3652. /* The maximum of pushes we're going to send this round */
  3653. num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
  3654. num_attacked_peers),
  3655. GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
  3656. LOG (GNUNET_ERROR_TYPE_DEBUG,
  3657. "Going to send %" PRIu32 " pushes\n",
  3658. num_pushes);
  3659. /* Send PUSHes to attacked peers */
  3660. for (i = 0; i < num_pushes; i++)
  3661. {
  3662. if (att_peers_tail == att_peer_index)
  3663. att_peer_index = att_peers_head;
  3664. else
  3665. att_peer_index = att_peer_index->next;
  3666. send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
  3667. }
  3668. /* Send PULLs to some peers to learn about additional peers to attack */
  3669. tmp_att_peer = att_peer_index;
  3670. for (i = 0; i < num_pushes * alpha; i++)
  3671. {
  3672. if (att_peers_tail == tmp_att_peer)
  3673. tmp_att_peer = att_peers_head;
  3674. else
  3675. att_peer_index = tmp_att_peer->next;
  3676. send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
  3677. }
  3678. }
  3679. else if (2 == mal_type)
  3680. { /**
  3681. * Try to partition the network
  3682. * Send as many pushes to the attacked peer as possible
  3683. * That is one push per round as it will ignore more.
  3684. */
  3685. (void) issue_peer_online_check (sub, &attacked_peer);
  3686. if (GNUNET_YES == check_peer_flag (sub->peer_map,
  3687. &attacked_peer,
  3688. Peers_ONLINE))
  3689. send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
  3690. }
  3691. if (3 == mal_type)
  3692. { /* Combined attack */
  3693. /* Send PUSH to attacked peers */
  3694. if (GNUNET_YES == check_peer_known (sub->peer_map, &attacked_peer))
  3695. {
  3696. (void) issue_peer_online_check (sub, &attacked_peer);
  3697. if (GNUNET_YES == check_peer_flag (sub->peer_map,
  3698. &attacked_peer,
  3699. Peers_ONLINE))
  3700. {
  3701. LOG (GNUNET_ERROR_TYPE_DEBUG,
  3702. "Goding to send push to attacked peer (%s)\n",
  3703. GNUNET_i2s (&attacked_peer));
  3704. send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
  3705. }
  3706. }
  3707. (void) issue_peer_online_check (sub, &attacked_peer);
  3708. /* The maximum of pushes we're going to send this round */
  3709. num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
  3710. num_attacked_peers),
  3711. GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
  3712. LOG (GNUNET_ERROR_TYPE_DEBUG,
  3713. "Going to send %" PRIu32 " pushes\n",
  3714. num_pushes);
  3715. for (i = 0; i < num_pushes; i++)
  3716. {
  3717. if (att_peers_tail == att_peer_index)
  3718. att_peer_index = att_peers_head;
  3719. else
  3720. att_peer_index = att_peer_index->next;
  3721. send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
  3722. }
  3723. /* Send PULLs to some peers to learn about additional peers to attack */
  3724. tmp_att_peer = att_peer_index;
  3725. for (i = 0; i < num_pushes * alpha; i++)
  3726. {
  3727. if (att_peers_tail == tmp_att_peer)
  3728. tmp_att_peer = att_peers_head;
  3729. else
  3730. att_peer_index = tmp_att_peer->next;
  3731. send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
  3732. }
  3733. }
  3734. /* Schedule next round */
  3735. time_next_round = compute_rand_delay (sub->round_interval, 2);
  3736. GNUNET_assert (NULL == sub->do_round_task);
  3737. sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
  3738. &do_mal_round, sub);
  3739. LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
  3740. }
  3741. #endif /* ENABLE_MALICIOUS */
  3742. /**
  3743. * Send out PUSHes and PULLs, possibly update #view, samplers.
  3744. *
  3745. * This is executed regylary.
  3746. *
  3747. * @param cls Closure - Sub
  3748. */
  3749. static void
  3750. do_round (void *cls)
  3751. {
  3752. unsigned int i;
  3753. const struct GNUNET_PeerIdentity *view_array;
  3754. unsigned int *permut;
  3755. unsigned int a_peers; /* Number of peers we send pushes to */
  3756. unsigned int b_peers; /* Number of peers we send pull requests to */
  3757. uint32_t first_border;
  3758. uint32_t second_border;
  3759. struct GNUNET_PeerIdentity peer;
  3760. struct GNUNET_PeerIdentity *update_peer;
  3761. struct Sub *sub = cls;
  3762. sub->num_rounds++;
  3763. LOG (GNUNET_ERROR_TYPE_DEBUG,
  3764. "Going to execute next round.\n");
  3765. if (sub == msub)
  3766. {
  3767. GNUNET_STATISTICS_update (stats, "# rounds", 1, GNUNET_NO);
  3768. }
  3769. sub->do_round_task = NULL;
  3770. #ifdef TO_FILE_FULL
  3771. to_file (sub->file_name_view_log,
  3772. "___ new round ___");
  3773. #endif /* TO_FILE_FULL */
  3774. view_array = View_get_as_array (sub->view);
  3775. for (i = 0; i < View_size (sub->view); i++)
  3776. {
  3777. LOG (GNUNET_ERROR_TYPE_DEBUG,
  3778. "\t%s\n", GNUNET_i2s (&view_array[i]));
  3779. #ifdef TO_FILE_FULL
  3780. to_file (sub->file_name_view_log,
  3781. "=%s\t(do round)",
  3782. GNUNET_i2s_full (&view_array[i]));
  3783. #endif /* TO_FILE_FULL */
  3784. }
  3785. /* Send pushes and pull requests */
  3786. if (0 < View_size (sub->view))
  3787. {
  3788. permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
  3789. View_size (sub->view));
  3790. /* Send PUSHes */
  3791. a_peers = ceil (alpha * View_size (sub->view));
  3792. LOG (GNUNET_ERROR_TYPE_DEBUG,
  3793. "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
  3794. a_peers, alpha, View_size (sub->view));
  3795. for (i = 0; i < a_peers; i++)
  3796. {
  3797. peer = view_array[permut[i]];
  3798. // FIXME if this fails schedule/loop this for later
  3799. send_push (get_peer_ctx (sub->peer_map, &peer));
  3800. }
  3801. /* Send PULL requests */
  3802. b_peers = ceil (beta * View_size (sub->view));
  3803. first_border = a_peers;
  3804. second_border = a_peers + b_peers;
  3805. if (second_border > View_size (sub->view))
  3806. {
  3807. first_border = View_size (sub->view) - b_peers;
  3808. second_border = View_size (sub->view);
  3809. }
  3810. LOG (GNUNET_ERROR_TYPE_DEBUG,
  3811. "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
  3812. b_peers, beta, View_size (sub->view));
  3813. for (i = first_border; i < second_border; i++)
  3814. {
  3815. peer = view_array[permut[i]];
  3816. if (GNUNET_NO == check_peer_flag (sub->peer_map,
  3817. &peer,
  3818. Peers_PULL_REPLY_PENDING))
  3819. { // FIXME if this fails schedule/loop this for later
  3820. send_pull_request (get_peer_ctx (sub->peer_map, &peer));
  3821. }
  3822. }
  3823. GNUNET_free (permut);
  3824. permut = NULL;
  3825. }
  3826. /* Update view */
  3827. /* TODO see how many peers are in push-/pull- list! */
  3828. if ((CustomPeerMap_size (sub->push_map) <= alpha * sub->view_size_est_need) &&
  3829. (0 < CustomPeerMap_size (sub->push_map)) &&
  3830. (0 < CustomPeerMap_size (sub->pull_map)))
  3831. { /* If conditions for update are fulfilled, update */
  3832. LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
  3833. uint32_t final_size;
  3834. uint32_t peers_to_clean_size;
  3835. struct GNUNET_PeerIdentity *peers_to_clean;
  3836. peers_to_clean = NULL;
  3837. peers_to_clean_size = 0;
  3838. GNUNET_array_grow (peers_to_clean,
  3839. peers_to_clean_size,
  3840. View_size (sub->view));
  3841. GNUNET_memcpy (peers_to_clean,
  3842. view_array,
  3843. View_size (sub->view) * sizeof(struct GNUNET_PeerIdentity));
  3844. /* Seems like recreating is the easiest way of emptying the peermap */
  3845. View_clear (sub->view);
  3846. #ifdef TO_FILE_FULL
  3847. to_file (sub->file_name_view_log,
  3848. "--- emptied ---");
  3849. #endif /* TO_FILE_FULL */
  3850. first_border = GNUNET_MIN (ceil (alpha * sub->view_size_est_need),
  3851. CustomPeerMap_size (sub->push_map));
  3852. second_border = first_border
  3853. + GNUNET_MIN (floor (beta * sub->view_size_est_need),
  3854. CustomPeerMap_size (sub->pull_map));
  3855. final_size = second_border
  3856. + ceil ((1 - (alpha + beta)) * sub->view_size_est_need);
  3857. LOG (GNUNET_ERROR_TYPE_DEBUG,
  3858. "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"
  3859. PRIu32 "\n",
  3860. first_border,
  3861. second_border,
  3862. final_size);
  3863. /* Update view with peers received through PUSHes */
  3864. permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
  3865. CustomPeerMap_size (sub->push_map));
  3866. for (i = 0; i < first_border; i++)
  3867. {
  3868. int inserted;
  3869. inserted = insert_in_view (sub,
  3870. CustomPeerMap_get_peer_by_index (sub->push_map,
  3871. permut[i]));
  3872. if (GNUNET_OK == inserted)
  3873. {
  3874. clients_notify_stream_peer (sub,
  3875. 1,
  3876. CustomPeerMap_get_peer_by_index (
  3877. sub->push_map, permut[i]));
  3878. }
  3879. #ifdef TO_FILE_FULL
  3880. to_file (sub->file_name_view_log,
  3881. "+%s\t(push list)",
  3882. GNUNET_i2s_full (&view_array[i]));
  3883. #endif /* TO_FILE_FULL */
  3884. // TODO change the peer_flags accordingly
  3885. }
  3886. GNUNET_free (permut);
  3887. permut = NULL;
  3888. /* Update view with peers received through PULLs */
  3889. permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
  3890. CustomPeerMap_size (sub->pull_map));
  3891. for (i = first_border; i < second_border; i++)
  3892. {
  3893. int inserted;
  3894. inserted = insert_in_view (sub,
  3895. CustomPeerMap_get_peer_by_index (sub->pull_map,
  3896. permut[i
  3897. -
  3898. first_border
  3899. ]));
  3900. if (GNUNET_OK == inserted)
  3901. {
  3902. clients_notify_stream_peer (sub,
  3903. 1,
  3904. CustomPeerMap_get_peer_by_index (
  3905. sub->pull_map,
  3906. permut[i
  3907. - first_border]));
  3908. }
  3909. #ifdef TO_FILE_FULL
  3910. to_file (sub->file_name_view_log,
  3911. "+%s\t(pull list)",
  3912. GNUNET_i2s_full (&view_array[i]));
  3913. #endif /* TO_FILE_FULL */
  3914. // TODO change the peer_flags accordingly
  3915. }
  3916. GNUNET_free (permut);
  3917. permut = NULL;
  3918. /* Update view with peers from history */
  3919. RPS_sampler_get_n_rand_peers (sub->sampler,
  3920. final_size - second_border,
  3921. hist_update,
  3922. sub);
  3923. // TODO change the peer_flags accordingly
  3924. for (i = 0; i < View_size (sub->view); i++)
  3925. rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
  3926. /* Clean peers that were removed from the view */
  3927. for (i = 0; i < peers_to_clean_size; i++)
  3928. {
  3929. #ifdef TO_FILE_FULL
  3930. to_file (sub->file_name_view_log,
  3931. "-%s",
  3932. GNUNET_i2s_full (&peers_to_clean[i]));
  3933. #endif /* TO_FILE_FULL */
  3934. clean_peer (sub, &peers_to_clean[i]);
  3935. }
  3936. GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
  3937. clients_notify_view_update (sub);
  3938. }
  3939. else
  3940. {
  3941. LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
  3942. if (sub == msub)
  3943. {
  3944. GNUNET_STATISTICS_update (stats, "# rounds blocked", 1, GNUNET_NO);
  3945. if ((CustomPeerMap_size (sub->push_map) > alpha
  3946. * sub->view_size_est_need) &&
  3947. ! (0 >= CustomPeerMap_size (sub->pull_map)))
  3948. GNUNET_STATISTICS_update (stats, "# rounds blocked - too many pushes",
  3949. 1, GNUNET_NO);
  3950. if ((CustomPeerMap_size (sub->push_map) > alpha
  3951. * sub->view_size_est_need) &&
  3952. (0 >= CustomPeerMap_size (sub->pull_map)))
  3953. GNUNET_STATISTICS_update (stats,
  3954. "# rounds blocked - too many pushes, no pull replies",
  3955. 1, GNUNET_NO);
  3956. if ((0 >= CustomPeerMap_size (sub->push_map)) &&
  3957. ! (0 >= CustomPeerMap_size (sub->pull_map)))
  3958. GNUNET_STATISTICS_update (stats, "# rounds blocked - no pushes", 1,
  3959. GNUNET_NO);
  3960. if ((0 >= CustomPeerMap_size (sub->push_map)) &&
  3961. (0 >= CustomPeerMap_size (sub->pull_map)))
  3962. GNUNET_STATISTICS_update (stats,
  3963. "# rounds blocked - no pushes, no pull replies",
  3964. 1, GNUNET_NO);
  3965. if ((0 >= CustomPeerMap_size (sub->pull_map)) &&
  3966. (CustomPeerMap_size (sub->push_map) > alpha
  3967. * sub->view_size_est_need) &&
  3968. (0 >= CustomPeerMap_size (sub->push_map)) )
  3969. GNUNET_STATISTICS_update (stats, "# rounds blocked - no pull replies",
  3970. 1, GNUNET_NO);
  3971. }
  3972. }
  3973. // TODO independent of that also get some peers from CADET_get_peers()?
  3974. if (CustomPeerMap_size (sub->push_map) < HISTOGRAM_FILE_SLOTS)
  3975. {
  3976. sub->push_recv[CustomPeerMap_size (sub->push_map)]++;
  3977. }
  3978. else
  3979. {
  3980. LOG (GNUNET_ERROR_TYPE_WARNING,
  3981. "Push map size too big for histogram (%u, %u)\n",
  3982. CustomPeerMap_size (sub->push_map),
  3983. HISTOGRAM_FILE_SLOTS);
  3984. }
  3985. // FIXME check bounds of histogram
  3986. sub->push_delta[(int32_t) (CustomPeerMap_size (sub->push_map)
  3987. - (alpha * sub->view_size_est_need))
  3988. + (HISTOGRAM_FILE_SLOTS / 2)]++;
  3989. if (sub == msub)
  3990. {
  3991. GNUNET_STATISTICS_set (stats,
  3992. "# peers in push map at end of round",
  3993. CustomPeerMap_size (sub->push_map),
  3994. GNUNET_NO);
  3995. GNUNET_STATISTICS_set (stats,
  3996. "# peers in pull map at end of round",
  3997. CustomPeerMap_size (sub->pull_map),
  3998. GNUNET_NO);
  3999. GNUNET_STATISTICS_set (stats,
  4000. "# peers in view at end of round",
  4001. View_size (sub->view),
  4002. GNUNET_NO);
  4003. GNUNET_STATISTICS_set (stats,
  4004. "# expected pushes",
  4005. alpha * sub->view_size_est_need,
  4006. GNUNET_NO);
  4007. GNUNET_STATISTICS_set (stats,
  4008. "delta expected - received pushes",
  4009. CustomPeerMap_size (sub->push_map) - (alpha
  4010. * sub->
  4011. view_size_est_need),
  4012. GNUNET_NO);
  4013. }
  4014. LOG (GNUNET_ERROR_TYPE_DEBUG,
  4015. "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (sub->view%u) = %.2f)\n",
  4016. CustomPeerMap_size (sub->push_map),
  4017. CustomPeerMap_size (sub->pull_map),
  4018. alpha,
  4019. View_size (sub->view),
  4020. alpha * View_size (sub->view));
  4021. /* Update samplers */
  4022. for (i = 0; i < CustomPeerMap_size (sub->push_map); i++)
  4023. {
  4024. update_peer = CustomPeerMap_get_peer_by_index (sub->push_map, i);
  4025. LOG (GNUNET_ERROR_TYPE_DEBUG,
  4026. "Updating with peer %s from push list\n",
  4027. GNUNET_i2s (update_peer));
  4028. insert_in_sampler (sub, update_peer);
  4029. clean_peer (sub, update_peer); /* This cleans only if it is not in the view */
  4030. }
  4031. for (i = 0; i < CustomPeerMap_size (sub->pull_map); i++)
  4032. {
  4033. LOG (GNUNET_ERROR_TYPE_DEBUG,
  4034. "Updating with peer %s from pull list\n",
  4035. GNUNET_i2s (CustomPeerMap_get_peer_by_index (sub->pull_map, i)));
  4036. insert_in_sampler (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
  4037. /* This cleans only if it is not in the view */
  4038. clean_peer (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
  4039. }
  4040. /* Empty push/pull lists */
  4041. CustomPeerMap_clear (sub->push_map);
  4042. CustomPeerMap_clear (sub->pull_map);
  4043. if (sub == msub)
  4044. {
  4045. GNUNET_STATISTICS_set (stats,
  4046. "view size",
  4047. View_size (sub->view),
  4048. GNUNET_NO);
  4049. }
  4050. struct GNUNET_TIME_Relative time_next_round;
  4051. time_next_round = compute_rand_delay (sub->round_interval, 2);
  4052. /* Schedule next round */
  4053. sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
  4054. &do_round, sub);
  4055. LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
  4056. }
  4057. /**
  4058. * This is called from GNUNET_CADET_get_peers().
  4059. *
  4060. * It is called on every peer(ID) that cadet somehow has contact with.
  4061. * We use those to initialise the sampler.
  4062. *
  4063. * implements #GNUNET_CADET_PeersCB
  4064. *
  4065. * @param cls Closure - Sub
  4066. * @param peer Peer, or NULL on "EOF".
  4067. * @param tunnel Do we have a tunnel towards this peer?
  4068. * @param n_paths Number of known paths towards this peer.
  4069. * @param best_path How long is the best path?
  4070. * (0 = unknown, 1 = ourselves, 2 = neighbor)
  4071. */
  4072. void
  4073. init_peer_cb (void *cls,
  4074. const struct GNUNET_PeerIdentity *peer,
  4075. int tunnel, /* "Do we have a tunnel towards this peer?" */
  4076. unsigned int n_paths, /* "Number of known paths towards this peer" */
  4077. unsigned int best_path) /* "How long is the best path?
  4078. * (0 = unknown, 1 = ourselves, 2 = neighbor)" */
  4079. {
  4080. struct Sub *sub = cls;
  4081. (void) tunnel;
  4082. (void) n_paths;
  4083. (void) best_path;
  4084. if (NULL != peer)
  4085. {
  4086. LOG (GNUNET_ERROR_TYPE_DEBUG,
  4087. "Got peer_id %s from cadet\n",
  4088. GNUNET_i2s (peer));
  4089. got_peer (sub, peer);
  4090. }
  4091. }
  4092. /**
  4093. * @brief Iterator function over stored, valid peers.
  4094. *
  4095. * We initialise the sampler with those.
  4096. *
  4097. * @param cls Closure - Sub
  4098. * @param peer the peer id
  4099. * @return #GNUNET_YES if we should continue to
  4100. * iterate,
  4101. * #GNUNET_NO if not.
  4102. */
  4103. static int
  4104. valid_peers_iterator (void *cls,
  4105. const struct GNUNET_PeerIdentity *peer)
  4106. {
  4107. struct Sub *sub = cls;
  4108. if (NULL != peer)
  4109. {
  4110. LOG (GNUNET_ERROR_TYPE_DEBUG,
  4111. "Got stored, valid peer %s\n",
  4112. GNUNET_i2s (peer));
  4113. got_peer (sub, peer);
  4114. }
  4115. return GNUNET_YES;
  4116. }
  4117. /**
  4118. * Iterator over peers from peerinfo.
  4119. *
  4120. * @param cls Closure - Sub
  4121. * @param peer id of the peer, NULL for last call
  4122. * @param hello hello message for the peer (can be NULL)
  4123. * @param error message
  4124. */
  4125. void
  4126. process_peerinfo_peers (void *cls,
  4127. const struct GNUNET_PeerIdentity *peer,
  4128. const struct GNUNET_HELLO_Message *hello,
  4129. const char *err_msg)
  4130. {
  4131. struct Sub *sub = cls;
  4132. (void) hello;
  4133. (void) err_msg;
  4134. if (NULL != peer)
  4135. {
  4136. LOG (GNUNET_ERROR_TYPE_DEBUG,
  4137. "Got peer_id %s from peerinfo\n",
  4138. GNUNET_i2s (peer));
  4139. got_peer (sub, peer);
  4140. }
  4141. }
  4142. /**
  4143. * Task run during shutdown.
  4144. *
  4145. * @param cls Closure - unused
  4146. */
  4147. static void
  4148. shutdown_task (void *cls)
  4149. {
  4150. (void) cls;
  4151. struct ClientContext *client_ctx;
  4152. LOG (GNUNET_ERROR_TYPE_DEBUG,
  4153. "RPS service is going down\n");
  4154. /* Clean all clients */
  4155. for (client_ctx = cli_ctx_head;
  4156. NULL != cli_ctx_head;
  4157. client_ctx = cli_ctx_head)
  4158. {
  4159. destroy_cli_ctx (client_ctx);
  4160. }
  4161. if (NULL != msub)
  4162. {
  4163. destroy_sub (msub);
  4164. msub = NULL;
  4165. }
  4166. /* Disconnect from other services */
  4167. GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
  4168. GNUNET_PEERINFO_disconnect (peerinfo_handle);
  4169. peerinfo_handle = NULL;
  4170. GNUNET_NSE_disconnect (nse);
  4171. if (NULL != map_single_hop)
  4172. {
  4173. /* core_init was called - core was initialised */
  4174. /* disconnect first, so no callback tries to access missing peermap */
  4175. GNUNET_CORE_disconnect (core_handle);
  4176. core_handle = NULL;
  4177. GNUNET_CONTAINER_multipeermap_destroy (map_single_hop);
  4178. map_single_hop = NULL;
  4179. }
  4180. if (NULL != stats)
  4181. {
  4182. GNUNET_STATISTICS_destroy (stats,
  4183. GNUNET_NO);
  4184. stats = NULL;
  4185. }
  4186. GNUNET_CADET_disconnect (cadet_handle);
  4187. cadet_handle = NULL;
  4188. #if ENABLE_MALICIOUS
  4189. struct AttackedPeer *tmp_att_peer;
  4190. GNUNET_array_grow (mal_peers,
  4191. num_mal_peers,
  4192. 0);
  4193. if (NULL != mal_peer_set)
  4194. GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
  4195. if (NULL != att_peer_set)
  4196. GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
  4197. while (NULL != att_peers_head)
  4198. {
  4199. tmp_att_peer = att_peers_head;
  4200. GNUNET_CONTAINER_DLL_remove (att_peers_head,
  4201. att_peers_tail,
  4202. tmp_att_peer);
  4203. GNUNET_free (tmp_att_peer);
  4204. }
  4205. #endif /* ENABLE_MALICIOUS */
  4206. close_all_files ();
  4207. }
  4208. /**
  4209. * Handle client connecting to the service.
  4210. *
  4211. * @param cls unused
  4212. * @param client the new client
  4213. * @param mq the message queue of @a client
  4214. * @return @a client
  4215. */
  4216. static void *
  4217. client_connect_cb (void *cls,
  4218. struct GNUNET_SERVICE_Client *client,
  4219. struct GNUNET_MQ_Handle *mq)
  4220. {
  4221. struct ClientContext *cli_ctx;
  4222. (void) cls;
  4223. LOG (GNUNET_ERROR_TYPE_DEBUG,
  4224. "Client connected\n");
  4225. if (NULL == client)
  4226. return client; /* Server was destroyed before a client connected. Shutting down */
  4227. cli_ctx = GNUNET_new (struct ClientContext);
  4228. cli_ctx->mq = mq;
  4229. cli_ctx->view_updates_left = -1;
  4230. cli_ctx->stream_update = GNUNET_NO;
  4231. cli_ctx->client = client;
  4232. GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
  4233. cli_ctx_tail,
  4234. cli_ctx);
  4235. return cli_ctx;
  4236. }
  4237. /**
  4238. * Callback called when a client disconnected from the service
  4239. *
  4240. * @param cls closure for the service
  4241. * @param c the client that disconnected
  4242. * @param internal_cls should be equal to @a c
  4243. */
  4244. static void
  4245. client_disconnect_cb (void *cls,
  4246. struct GNUNET_SERVICE_Client *client,
  4247. void *internal_cls)
  4248. {
  4249. struct ClientContext *cli_ctx = internal_cls;
  4250. (void) cls;
  4251. GNUNET_assert (client == cli_ctx->client);
  4252. if (NULL == client)
  4253. { /* shutdown task - destroy all clients */
  4254. while (NULL != cli_ctx_head)
  4255. destroy_cli_ctx (cli_ctx_head);
  4256. }
  4257. else
  4258. { /* destroy this client */
  4259. LOG (GNUNET_ERROR_TYPE_DEBUG,
  4260. "Client disconnected. Destroy its context.\n");
  4261. destroy_cli_ctx (cli_ctx);
  4262. }
  4263. }
  4264. /**
  4265. * Handle random peer sampling clients.
  4266. *
  4267. * @param cls closure
  4268. * @param c configuration to use
  4269. * @param service the initialized service
  4270. */
  4271. static void
  4272. run (void *cls,
  4273. const struct GNUNET_CONFIGURATION_Handle *c,
  4274. struct GNUNET_SERVICE_Handle *service)
  4275. {
  4276. struct GNUNET_TIME_Relative round_interval;
  4277. long long unsigned int sampler_size;
  4278. char hash_port_string[] = GNUNET_APPLICATION_PORT_RPS;
  4279. struct GNUNET_HashCode hash;
  4280. (void) cls;
  4281. (void) service;
  4282. GNUNET_log_setup ("rps",
  4283. GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG),
  4284. NULL);
  4285. cfg = c;
  4286. /* Get own ID */
  4287. GNUNET_CRYPTO_get_peer_identity (cfg,
  4288. &own_identity); // TODO check return value
  4289. GNUNET_log (GNUNET_ERROR_TYPE_INFO,
  4290. "STARTING SERVICE (rps) for peer [%s]\n",
  4291. GNUNET_i2s (&own_identity));
  4292. #if ENABLE_MALICIOUS
  4293. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  4294. "Malicious execution compiled in.\n");
  4295. #endif /* ENABLE_MALICIOUS */
  4296. /* Get time interval from the configuration */
  4297. if (GNUNET_OK !=
  4298. GNUNET_CONFIGURATION_get_value_time (cfg,
  4299. "RPS",
  4300. "ROUNDINTERVAL",
  4301. &round_interval))
  4302. {
  4303. GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
  4304. "RPS", "ROUNDINTERVAL");
  4305. GNUNET_SCHEDULER_shutdown ();
  4306. return;
  4307. }
  4308. /* Get initial size of sampler/view from the configuration */
  4309. if (GNUNET_OK !=
  4310. GNUNET_CONFIGURATION_get_value_number (cfg,
  4311. "RPS",
  4312. "MINSIZE",
  4313. &sampler_size))
  4314. {
  4315. GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
  4316. "RPS", "MINSIZE");
  4317. GNUNET_SCHEDULER_shutdown ();
  4318. return;
  4319. }
  4320. cadet_handle = GNUNET_CADET_connect (cfg);
  4321. GNUNET_assert (NULL != cadet_handle);
  4322. core_handle = GNUNET_CORE_connect (cfg,
  4323. NULL, /* cls */
  4324. core_init, /* init */
  4325. core_connects, /* connects */
  4326. core_disconnects, /* disconnects */
  4327. NULL); /* handlers */
  4328. GNUNET_assert (NULL != core_handle);
  4329. alpha = 0.45;
  4330. beta = 0.45;
  4331. /* Set up main Sub */
  4332. GNUNET_CRYPTO_hash (hash_port_string,
  4333. strlen (hash_port_string),
  4334. &hash);
  4335. msub = new_sub (&hash,
  4336. sampler_size, /* Will be overwritten by config */
  4337. round_interval);
  4338. peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
  4339. /* connect to NSE */
  4340. nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
  4341. // LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
  4342. // GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, msub);
  4343. // TODO send push/pull to each of those peers?
  4344. LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
  4345. restore_valid_peers (msub);
  4346. get_valid_peers (msub->valid_peers, valid_peers_iterator, msub);
  4347. peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
  4348. GNUNET_NO,
  4349. process_peerinfo_peers,
  4350. msub);
  4351. LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
  4352. GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
  4353. stats = GNUNET_STATISTICS_create ("rps", cfg);
  4354. }
  4355. /**
  4356. * Define "main" method using service macro.
  4357. */
  4358. GNUNET_SERVICE_MAIN
  4359. ("rps",
  4360. GNUNET_SERVICE_OPTION_NONE,
  4361. &run,
  4362. &client_connect_cb,
  4363. &client_disconnect_cb,
  4364. NULL,
  4365. GNUNET_MQ_hd_var_size (client_seed,
  4366. GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
  4367. struct GNUNET_RPS_CS_SeedMessage,
  4368. NULL),
  4369. #if ENABLE_MALICIOUS
  4370. GNUNET_MQ_hd_var_size (client_act_malicious,
  4371. GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
  4372. struct GNUNET_RPS_CS_ActMaliciousMessage,
  4373. NULL),
  4374. #endif /* ENABLE_MALICIOUS */
  4375. GNUNET_MQ_hd_fixed_size (client_view_request,
  4376. GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
  4377. struct GNUNET_RPS_CS_DEBUG_ViewRequest,
  4378. NULL),
  4379. GNUNET_MQ_hd_fixed_size (client_view_cancel,
  4380. GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL,
  4381. struct GNUNET_MessageHeader,
  4382. NULL),
  4383. GNUNET_MQ_hd_fixed_size (client_stream_request,
  4384. GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST,
  4385. struct GNUNET_RPS_CS_DEBUG_StreamRequest,
  4386. NULL),
  4387. GNUNET_MQ_hd_fixed_size (client_stream_cancel,
  4388. GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL,
  4389. struct GNUNET_MessageHeader,
  4390. NULL),
  4391. GNUNET_MQ_hd_fixed_size (client_start_sub,
  4392. GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START,
  4393. struct GNUNET_RPS_CS_SubStartMessage,
  4394. NULL),
  4395. GNUNET_MQ_hd_fixed_size (client_stop_sub,
  4396. GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP,
  4397. struct GNUNET_RPS_CS_SubStopMessage,
  4398. NULL),
  4399. GNUNET_MQ_handler_end ());
  4400. /* end of gnunet-service-rps.c */