test_rps.c 83 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183
  1. /*
  2. This file is part of GNUnet.
  3. Copyright (C) 2009, 2012 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file rps/test_rps.c
  18. * @brief Testcase for the random peer sampling service. Starts
  19. * a peergroup with a given number of peers, then waits to
  20. * receive size pushes/pulls from each peer. Expects to wait
  21. * for one message from each peer.
  22. */
  23. #include "platform.h"
  24. #include "gnunet_util_lib.h"
  25. #include "gnunet_testbed_service.h"
  26. #include "gnunet_rps_service.h"
  27. #include "rps-test_util.h"
  28. #include "gnunet-service-rps_sampler_elem.h"
  29. #include <inttypes.h>
  30. /**
  31. * How many peers do we start?
  32. */
  33. static uint32_t num_peers;
  34. /**
  35. * How long do we run the test?
  36. * In seconds.
  37. */
  38. static uint32_t timeout_s;
  39. /**
  40. * How long do we run the test?
  41. */
  42. // #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
  43. static struct GNUNET_TIME_Relative timeout;
  44. /**
  45. * Portion of malicious peers
  46. */
  47. static double portion = .1;
  48. /**
  49. * Type of malicious peer to test
  50. */
  51. static unsigned int mal_type = 0;
  52. /**
  53. * Handles to all of the running peers
  54. */
  55. static struct GNUNET_TESTBED_Peer **testbed_peers;
  56. /**
  57. * @brief Indicates whether peer should go off- or online
  58. */
  59. enum PEER_ONLINE_DELTA
  60. {
  61. /**
  62. * @brief Indicates peer going online
  63. */
  64. PEER_GO_ONLINE = 1,
  65. /**
  66. * @brief Indicates peer going offline
  67. */
  68. PEER_GO_OFFLINE = -1,
  69. };
  70. /**
  71. * Operation map entry
  72. */
  73. struct OpListEntry
  74. {
  75. /**
  76. * DLL next ptr
  77. */
  78. struct OpListEntry *next;
  79. /**
  80. * DLL prev ptr
  81. */
  82. struct OpListEntry *prev;
  83. /**
  84. * The testbed operation
  85. */
  86. struct GNUNET_TESTBED_Operation *op;
  87. /**
  88. * Depending on whether we start or stop RPS service at the peer, set this to
  89. * #PEER_GO_ONLINE (1) or #PEER_GO_OFFLINE (-1)
  90. */
  91. enum PEER_ONLINE_DELTA delta;
  92. /**
  93. * Index of the regarding peer
  94. */
  95. unsigned int index;
  96. };
  97. /**
  98. * OpList DLL head
  99. */
  100. static struct OpListEntry *oplist_head;
  101. /**
  102. * OpList DLL tail
  103. */
  104. static struct OpListEntry *oplist_tail;
  105. /**
  106. * A pending reply: A request was sent and the reply is pending.
  107. */
  108. struct PendingReply
  109. {
  110. /**
  111. * DLL next,prev ptr
  112. */
  113. struct PendingReply *next;
  114. struct PendingReply *prev;
  115. /**
  116. * Handle to the request we are waiting for
  117. */
  118. struct GNUNET_RPS_Request_Handle *req_handle;
  119. /**
  120. * The peer that requested
  121. */
  122. struct RPSPeer *rps_peer;
  123. };
  124. /**
  125. * A pending request: A request was not made yet but is scheduled for later.
  126. */
  127. struct PendingRequest
  128. {
  129. /**
  130. * DLL next,prev ptr
  131. */
  132. struct PendingRequest *next;
  133. struct PendingRequest *prev;
  134. /**
  135. * Handle to the request we are waiting for
  136. */
  137. struct GNUNET_SCHEDULER_Task *request_task;
  138. /**
  139. * The peer that requested
  140. */
  141. struct RPSPeer *rps_peer;
  142. };
  143. /**
  144. * Information we track for each peer.
  145. */
  146. struct RPSPeer
  147. {
  148. /**
  149. * Index of the peer.
  150. */
  151. unsigned int index;
  152. /**
  153. * Handle for RPS connect operation.
  154. */
  155. struct GNUNET_TESTBED_Operation *op;
  156. /**
  157. * Handle to RPS service.
  158. */
  159. struct GNUNET_RPS_Handle *rps_handle;
  160. /**
  161. * Handle to stream requests
  162. */
  163. struct GNUNET_RPS_StreamRequestHandle *rps_srh;
  164. /**
  165. * ID of the peer.
  166. */
  167. struct GNUNET_PeerIdentity *peer_id;
  168. /**
  169. * A request handle to check for an request
  170. */
  171. // struct GNUNET_RPS_Request_Handle *req_handle;
  172. /**
  173. * Peer on- or offline?
  174. */
  175. int online;
  176. /**
  177. * Number of Peer IDs to request during the whole test
  178. */
  179. unsigned int num_ids_to_request;
  180. /**
  181. * Pending requests DLL
  182. */
  183. struct PendingRequest *pending_req_head;
  184. struct PendingRequest *pending_req_tail;
  185. /**
  186. * Number of pending requests
  187. */
  188. unsigned int num_pending_reqs;
  189. /**
  190. * Pending replies DLL
  191. */
  192. struct PendingReply *pending_rep_head;
  193. struct PendingReply *pending_rep_tail;
  194. /**
  195. * Number of pending replies
  196. */
  197. unsigned int num_pending_reps;
  198. /**
  199. * Number of received PeerIDs
  200. */
  201. unsigned int num_recv_ids;
  202. /**
  203. * Pending operation on that peer
  204. */
  205. const struct OpListEntry *entry_op_manage;
  206. /**
  207. * Testbed operation to connect to statistics service
  208. */
  209. struct GNUNET_TESTBED_Operation *stat_op;
  210. /**
  211. * Handle to the statistics service
  212. */
  213. struct GNUNET_STATISTICS_Handle *stats_h;
  214. /**
  215. * @brief flags to indicate which statistics values have been already
  216. * collected from the statistics service.
  217. * Used to check whether we are able to shutdown.
  218. */
  219. uint32_t stat_collected_flags;
  220. /**
  221. * @brief File name of the file the stats are finally written to
  222. */
  223. const char *file_name_stats;
  224. /**
  225. * @brief File name of the file the stats are finally written to
  226. */
  227. const char *file_name_probs;
  228. /**
  229. * @brief The current view
  230. */
  231. struct GNUNET_PeerIdentity *cur_view;
  232. /**
  233. * @brief Number of peers in the #cur_view.
  234. */
  235. uint32_t cur_view_count;
  236. /**
  237. * @brief Number of occurrences in other peer's view
  238. */
  239. uint32_t count_in_views;
  240. /**
  241. * @brief statistics values
  242. */
  243. uint64_t num_rounds;
  244. uint64_t num_blocks;
  245. uint64_t num_blocks_many_push;
  246. uint64_t num_blocks_no_push;
  247. uint64_t num_blocks_no_pull;
  248. uint64_t num_blocks_many_push_no_pull;
  249. uint64_t num_blocks_no_push_no_pull;
  250. uint64_t num_issued_push;
  251. uint64_t num_issued_pull_req;
  252. uint64_t num_issued_pull_rep;
  253. uint64_t num_sent_push;
  254. uint64_t num_sent_pull_req;
  255. uint64_t num_sent_pull_rep;
  256. uint64_t num_recv_push;
  257. uint64_t num_recv_pull_req;
  258. uint64_t num_recv_pull_rep;
  259. };
  260. enum STAT_TYPE
  261. {
  262. STAT_TYPE_ROUNDS = 0x1, /* 1 */
  263. STAT_TYPE_BLOCKS = 0x2, /* 2 */
  264. STAT_TYPE_BLOCKS_MANY_PUSH = 0x4, /* 3 */
  265. STAT_TYPE_BLOCKS_NO_PUSH = 0x8, /* 4 */
  266. STAT_TYPE_BLOCKS_NO_PULL = 0x10, /* 5 */
  267. STAT_TYPE_BLOCKS_MANY_PUSH_NO_PULL = 0x20, /* 6 */
  268. STAT_TYPE_BLOCKS_NO_PUSH_NO_PULL = 0x40, /* 7 */
  269. STAT_TYPE_ISSUED_PUSH_SEND = 0x80, /* 8 */
  270. STAT_TYPE_ISSUED_PULL_REQ = 0x100, /* 9 */
  271. STAT_TYPE_ISSUED_PULL_REP = 0x200, /* 10 */
  272. STAT_TYPE_SENT_PUSH_SEND = 0x400, /* 11 */
  273. STAT_TYPE_SENT_PULL_REQ = 0x800, /* 12 */
  274. STAT_TYPE_SENT_PULL_REP = 0x1000, /* 13 */
  275. STAT_TYPE_RECV_PUSH_SEND = 0x2000, /* 14 */
  276. STAT_TYPE_RECV_PULL_REQ = 0x4000, /* 15 */
  277. STAT_TYPE_RECV_PULL_REP = 0x8000, /* 16 */
  278. STAT_TYPE_MAX = 0x80000000, /* 32 */
  279. };
  280. struct STATcls
  281. {
  282. struct RPSPeer *rps_peer;
  283. enum STAT_TYPE stat_type;
  284. };
  285. /**
  286. * Information for all the peers.
  287. */
  288. static struct RPSPeer *rps_peers;
  289. /**
  290. * Peermap to get the index of a given peer ID quick.
  291. */
  292. static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
  293. /**
  294. * IDs of the peers.
  295. */
  296. static struct GNUNET_PeerIdentity *rps_peer_ids;
  297. /**
  298. * ID of the targeted peer.
  299. */
  300. static struct GNUNET_PeerIdentity *target_peer;
  301. /**
  302. * ID of the peer that requests for the evaluation.
  303. */
  304. static struct RPSPeer *eval_peer;
  305. /**
  306. * Number of online peers.
  307. */
  308. static unsigned int num_peers_online;
  309. /**
  310. * @brief The added sizes of the peer's views
  311. */
  312. static unsigned int view_sizes;
  313. /**
  314. * Return value from 'main'.
  315. */
  316. static int ok;
  317. /**
  318. * Identifier for the churn task that runs periodically
  319. */
  320. static struct GNUNET_SCHEDULER_Task *post_test_task;
  321. /**
  322. * Identifier for the churn task that runs periodically
  323. */
  324. static struct GNUNET_SCHEDULER_Task *shutdown_task;
  325. /**
  326. * Identifier for the churn task that runs periodically
  327. */
  328. static struct GNUNET_SCHEDULER_Task *churn_task;
  329. /**
  330. * Called to initialise the given RPSPeer
  331. */
  332. typedef void (*InitPeer) (struct RPSPeer *rps_peer);
  333. /**
  334. * @brief Called directly after connecting to the service
  335. *
  336. * @param rps_peer Specific peer the function is called on
  337. * @param h the handle to the rps service
  338. */
  339. typedef void (*PreTest) (struct RPSPeer *rps_peer, struct GNUNET_RPS_Handle *h);
  340. /**
  341. * @brief Executes functions to test the api/service for a given peer
  342. *
  343. * Called from within #rps_connect_complete_cb ()
  344. * Implemented by #churn_test_cb, #profiler_cb, #mal_cb, #single_req_cb,
  345. * #delay_req_cb, #seed_big_cb, #single_peer_seed_cb, #seed_cb, #req_cancel_cb
  346. *
  347. * @param rps_peer the peer the task runs on
  348. */
  349. typedef void (*MainTest) (struct RPSPeer *rps_peer);
  350. /**
  351. * Callback called once the requested random peers are available
  352. */
  353. typedef void (*ReplyHandle) (void *cls,
  354. uint64_t n,
  355. const struct GNUNET_PeerIdentity *recv_peers);
  356. /**
  357. * Called directly before disconnecting from the service
  358. */
  359. typedef void (*PostTest) (struct RPSPeer *peer);
  360. /**
  361. * Function called after disconnect to evaluate test success
  362. */
  363. typedef int (*EvaluationCallback) (void);
  364. /**
  365. * @brief Do we have Churn?
  366. */
  367. enum OPTION_CHURN
  368. {
  369. /**
  370. * @brief If we have churn this is set
  371. */
  372. HAVE_CHURN,
  373. /**
  374. * @brief If we have no churn this is set
  375. */
  376. HAVE_NO_CHURN,
  377. };
  378. /**
  379. * @brief Is it ok to quit the test before the timeout?
  380. */
  381. enum OPTION_QUICK_QUIT
  382. {
  383. /**
  384. * @brief It is ok for the test to quit before the timeout triggers
  385. */
  386. HAVE_QUICK_QUIT,
  387. /**
  388. * @brief It is NOT ok for the test to quit before the timeout triggers
  389. */
  390. HAVE_NO_QUICK_QUIT,
  391. };
  392. /**
  393. * @brief Do we collect statistics at the end?
  394. */
  395. enum OPTION_COLLECT_STATISTICS
  396. {
  397. /**
  398. * @brief We collect statistics at the end
  399. */
  400. COLLECT_STATISTICS,
  401. /**
  402. * @brief We do not collect statistics at the end
  403. */
  404. NO_COLLECT_STATISTICS,
  405. };
  406. /**
  407. * @brief Do we collect views during run?
  408. */
  409. enum OPTION_COLLECT_VIEW
  410. {
  411. /**
  412. * @brief We collect view during run
  413. */
  414. COLLECT_VIEW,
  415. /**
  416. * @brief We do not collect the view during run
  417. */
  418. NO_COLLECT_VIEW,
  419. };
  420. /**
  421. * Structure to define a single test
  422. */
  423. struct SingleTestRun
  424. {
  425. /**
  426. * Name of the test
  427. */
  428. char *name;
  429. /**
  430. * Called with a single peer in order to initialise that peer
  431. */
  432. InitPeer init_peer;
  433. /**
  434. * Called directly after connecting to the service
  435. */
  436. PreTest pre_test;
  437. /**
  438. * Main function for each peer
  439. */
  440. MainTest main_test;
  441. /**
  442. * Callback called once the requested peers are available
  443. */
  444. ReplyHandle reply_handle;
  445. /**
  446. * Called directly before disconnecting from the service
  447. */
  448. PostTest post_test;
  449. /**
  450. * Function to evaluate the test results
  451. */
  452. EvaluationCallback eval_cb;
  453. /**
  454. * Request interval
  455. */
  456. uint32_t request_interval;
  457. /**
  458. * Number of Requests to make.
  459. */
  460. uint32_t num_requests;
  461. /**
  462. * Run with (-out) churn
  463. */
  464. enum OPTION_CHURN have_churn;
  465. /**
  466. * Quit test before timeout?
  467. */
  468. enum OPTION_QUICK_QUIT have_quick_quit;
  469. /**
  470. * Collect statistics at the end?
  471. */
  472. enum OPTION_COLLECT_STATISTICS have_collect_statistics;
  473. /**
  474. * Collect view during run?
  475. */
  476. enum OPTION_COLLECT_VIEW have_collect_view;
  477. /**
  478. * @brief Mark which values from the statistics service to collect at the end
  479. * of the run
  480. */
  481. uint32_t stat_collect_flags;
  482. } cur_test_run;
  483. /**
  484. * Did we finish the test?
  485. */
  486. static int post_test;
  487. /**
  488. * Are we shutting down?
  489. */
  490. static int in_shutdown;
  491. /**
  492. * Append arguments to file
  493. */
  494. static void
  495. tofile_ (const char *file_name, const char *line)
  496. {
  497. struct GNUNET_DISK_FileHandle *f;
  498. /* char output_buffer[512]; */
  499. size_t size;
  500. /* int size; */
  501. size_t size2;
  502. if (NULL == (f = GNUNET_DISK_file_open (file_name,
  503. GNUNET_DISK_OPEN_APPEND
  504. | GNUNET_DISK_OPEN_WRITE
  505. | GNUNET_DISK_OPEN_CREATE,
  506. GNUNET_DISK_PERM_USER_READ
  507. | GNUNET_DISK_PERM_USER_WRITE
  508. | GNUNET_DISK_PERM_GROUP_READ
  509. | GNUNET_DISK_PERM_OTHER_READ)))
  510. {
  511. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  512. "Not able to open file %s\n",
  513. file_name);
  514. return;
  515. }
  516. /* size = GNUNET_snprintf (output_buffer,
  517. sizeof (output_buffer),
  518. "%llu %s\n",
  519. GNUNET_TIME_absolute_get ().abs_value_us,
  520. line);
  521. if (0 > size)
  522. {
  523. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  524. "Failed to write string to buffer (size: %i)\n",
  525. size);
  526. return;
  527. } */size = strlen (line) * sizeof(char);
  528. size2 = GNUNET_DISK_file_write (f, line, size);
  529. if (size != size2)
  530. {
  531. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  532. "Unable to write to file! (Size: %lu, size2: %lu)\n",
  533. size,
  534. size2);
  535. if (GNUNET_YES != GNUNET_DISK_file_close (f))
  536. {
  537. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  538. "Unable to close file\n");
  539. }
  540. return;
  541. }
  542. if (GNUNET_YES != GNUNET_DISK_file_close (f))
  543. {
  544. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  545. "Unable to close file\n");
  546. }
  547. }
  548. /**
  549. * This function is used to facilitate writing important information to disk
  550. */
  551. #define tofile(file_name, ...) do { \
  552. char tmp_buf[512]; \
  553. int size; \
  554. size = GNUNET_snprintf (tmp_buf, sizeof(tmp_buf), __VA_ARGS__); \
  555. if (0 > size) \
  556. GNUNET_log (GNUNET_ERROR_TYPE_WARNING, \
  557. "Failed to create tmp_buf\n"); \
  558. else \
  559. tofile_ (file_name, tmp_buf); \
  560. } while (0);
  561. /**
  562. * Write the ids and their according index in the given array to a file
  563. * Unused
  564. */
  565. /* static void
  566. ids_to_file (char *file_name,
  567. struct GNUNET_PeerIdentity *peer_ids,
  568. unsigned int num_peer_ids)
  569. {
  570. unsigned int i;
  571. for (i=0 ; i < num_peer_ids ; i++)
  572. {
  573. to_file (file_name,
  574. "%u\t%s",
  575. i,
  576. GNUNET_i2s_full (&peer_ids[i]));
  577. }
  578. } */
  579. /**
  580. * Task run on timeout to collect statistics and potentially shut down.
  581. */
  582. static void
  583. post_test_op (void *cls);
  584. /**
  585. * Test the success of a single test
  586. */
  587. static int
  588. evaluate (void)
  589. {
  590. unsigned int i;
  591. int tmp_ok;
  592. tmp_ok = 1;
  593. for (i = 0; i < num_peers; i++)
  594. {
  595. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  596. "%u. peer [%s] received %u of %u expected peer_ids: %i\n",
  597. i,
  598. GNUNET_i2s (rps_peers[i].peer_id),
  599. rps_peers[i].num_recv_ids,
  600. rps_peers[i].num_ids_to_request,
  601. (rps_peers[i].num_ids_to_request == rps_peers[i].num_recv_ids));
  602. tmp_ok &= (rps_peers[i].num_ids_to_request == rps_peers[i].num_recv_ids);
  603. }
  604. return tmp_ok ? 0 : 1;
  605. }
  606. /**
  607. * Creates an oplist entry and adds it to the oplist DLL
  608. */
  609. static struct OpListEntry *
  610. make_oplist_entry ()
  611. {
  612. struct OpListEntry *entry;
  613. entry = GNUNET_new (struct OpListEntry);
  614. GNUNET_CONTAINER_DLL_insert_tail (oplist_head, oplist_tail, entry);
  615. return entry;
  616. }
  617. /**
  618. * @brief Checks if given peer already received its statistics value from the
  619. * statistics service.
  620. *
  621. * @param rps_peer the peer to check for
  622. *
  623. * @return #GNUNET_YES if so
  624. * #GNUNET_NO otherwise
  625. */
  626. static int
  627. check_statistics_collect_completed_single_peer (
  628. const struct RPSPeer *rps_peer)
  629. {
  630. if (cur_test_run.stat_collect_flags !=
  631. (cur_test_run.stat_collect_flags
  632. & rps_peer->stat_collected_flags))
  633. {
  634. return GNUNET_NO;
  635. }
  636. return GNUNET_YES;
  637. }
  638. /**
  639. * @brief Checks if all peers already received their statistics value from the
  640. * statistics service.
  641. *
  642. * @return #GNUNET_YES if so
  643. * #GNUNET_NO otherwise
  644. */
  645. static int
  646. check_statistics_collect_completed ()
  647. {
  648. uint32_t i;
  649. for (i = 0; i < num_peers; i++)
  650. {
  651. if (GNUNET_NO == check_statistics_collect_completed_single_peer (
  652. &rps_peers[i]))
  653. {
  654. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  655. "At least Peer %" PRIu32
  656. " did not yet receive all statistics values\n",
  657. i);
  658. return GNUNET_NO;
  659. }
  660. }
  661. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  662. "All peers received their statistics values\n");
  663. return GNUNET_YES;
  664. }
  665. /**
  666. * Task run on timeout to shut everything down.
  667. */
  668. static void
  669. shutdown_op (void *cls)
  670. {
  671. unsigned int i;
  672. (void) cls;
  673. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  674. "Shutdown task scheduled, going down.\n");
  675. in_shutdown = GNUNET_YES;
  676. if (NULL != post_test_task)
  677. {
  678. GNUNET_SCHEDULER_cancel (post_test_task);
  679. post_test_op (NULL);
  680. }
  681. if (NULL != churn_task)
  682. {
  683. GNUNET_SCHEDULER_cancel (churn_task);
  684. churn_task = NULL;
  685. }
  686. for (i = 0; i < num_peers; i++)
  687. {
  688. if (NULL != rps_peers[i].rps_handle)
  689. {
  690. GNUNET_RPS_disconnect (rps_peers[i].rps_handle);
  691. }
  692. if (NULL != rps_peers[i].op)
  693. {
  694. GNUNET_TESTBED_operation_done (rps_peers[i].op);
  695. }
  696. }
  697. }
  698. /**
  699. * Task run on timeout to collect statistics and potentially shut down.
  700. */
  701. static void
  702. post_test_op (void *cls)
  703. {
  704. unsigned int i;
  705. (void) cls;
  706. post_test_task = NULL;
  707. post_test = GNUNET_YES;
  708. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  709. "Post test task scheduled, going down.\n");
  710. if (NULL != churn_task)
  711. {
  712. GNUNET_SCHEDULER_cancel (churn_task);
  713. churn_task = NULL;
  714. }
  715. for (i = 0; i < num_peers; i++)
  716. {
  717. if (NULL != cur_test_run.post_test)
  718. {
  719. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing post_test for peer %u\n",
  720. i);
  721. cur_test_run.post_test (&rps_peers[i]);
  722. }
  723. if (NULL != rps_peers[i].op)
  724. {
  725. GNUNET_TESTBED_operation_done (rps_peers[i].op);
  726. rps_peers[i].op = NULL;
  727. }
  728. }
  729. /* If we do not collect statistics, shut down directly */
  730. if ((NO_COLLECT_STATISTICS == cur_test_run.have_collect_statistics) ||
  731. (GNUNET_YES == check_statistics_collect_completed ()) )
  732. {
  733. GNUNET_SCHEDULER_shutdown ();
  734. }
  735. }
  736. /**
  737. * Seed peers.
  738. */
  739. static void
  740. seed_peers (void *cls)
  741. {
  742. struct RPSPeer *peer = cls;
  743. unsigned int amount;
  744. unsigned int i;
  745. if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
  746. {
  747. return;
  748. }
  749. GNUNET_assert (NULL != peer->rps_handle);
  750. // TODO if malicious don't seed mal peers
  751. amount = round (.5 * num_peers);
  752. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Seeding peers:\n");
  753. for (i = 0; i < amount; i++)
  754. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Seeding %u. peer: %s\n",
  755. i,
  756. GNUNET_i2s (&rps_peer_ids[i]));
  757. GNUNET_RPS_seed_ids (peer->rps_handle, amount, rps_peer_ids);
  758. }
  759. /**
  760. * Seed peers.
  761. */
  762. static void
  763. seed_peers_big (void *cls)
  764. {
  765. struct RPSPeer *peer = cls;
  766. unsigned int seed_msg_size;
  767. uint32_t num_peers_max;
  768. unsigned int amount;
  769. unsigned int i;
  770. seed_msg_size = 8; /* sizeof (struct GNUNET_RPS_CS_SeedMessage) */
  771. num_peers_max = (GNUNET_MAX_MESSAGE_SIZE - seed_msg_size)
  772. / sizeof(struct GNUNET_PeerIdentity);
  773. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  774. "Peers that fit in one seed msg; %u\n",
  775. num_peers_max);
  776. amount = num_peers_max + (0.5 * num_peers_max);
  777. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  778. "Seeding many (%u) peers:\n",
  779. amount);
  780. struct GNUNET_PeerIdentity ids_to_seed[amount];
  781. for (i = 0; i < amount; i++)
  782. {
  783. ids_to_seed[i] = *peer->peer_id;
  784. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Seeding %u. peer: %s\n",
  785. i,
  786. GNUNET_i2s (&ids_to_seed[i]));
  787. }
  788. GNUNET_RPS_seed_ids (peer->rps_handle, amount, ids_to_seed);
  789. }
  790. /**
  791. * Get the id of peer i.
  792. */
  793. void
  794. info_cb (void *cb_cls,
  795. struct GNUNET_TESTBED_Operation *op,
  796. const struct GNUNET_TESTBED_PeerInformation *pinfo,
  797. const char *emsg)
  798. {
  799. struct OpListEntry *entry = (struct OpListEntry *) cb_cls;
  800. (void) op;
  801. if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
  802. {
  803. return;
  804. }
  805. if ((NULL == pinfo) || (NULL != emsg))
  806. {
  807. GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Got Error: %s\n", emsg);
  808. GNUNET_TESTBED_operation_done (entry->op);
  809. return;
  810. }
  811. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  812. "Peer %u is %s\n",
  813. entry->index,
  814. GNUNET_i2s (pinfo->result.id));
  815. rps_peer_ids[entry->index] = *(pinfo->result.id);
  816. rps_peers[entry->index].peer_id = &rps_peer_ids[entry->index];
  817. GNUNET_assert (GNUNET_OK ==
  818. GNUNET_CONTAINER_multipeermap_put (peer_map,
  819. &rps_peer_ids[entry->index],
  820. &rps_peers[entry->index],
  821. GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
  822. tofile ("/tmp/rps/peer_ids",
  823. "%u\t%s\n",
  824. entry->index,
  825. GNUNET_i2s_full (&rps_peer_ids[entry->index]));
  826. GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry);
  827. GNUNET_TESTBED_operation_done (entry->op);
  828. GNUNET_free (entry);
  829. }
  830. /**
  831. * Callback to be called when RPS service connect operation is completed
  832. *
  833. * @param cls the callback closure from functions generating an operation
  834. * @param op the operation that has been finished
  835. * @param ca_result the RPS service handle returned from rps_connect_adapter
  836. * @param emsg error message in case the operation has failed; will be NULL if
  837. * operation has executed successfully.
  838. */
  839. static void
  840. rps_connect_complete_cb (void *cls,
  841. struct GNUNET_TESTBED_Operation *op,
  842. void *ca_result,
  843. const char *emsg)
  844. {
  845. struct RPSPeer *rps_peer = cls;
  846. struct GNUNET_RPS_Handle *rps = ca_result;
  847. GNUNET_assert (NULL != ca_result);
  848. if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
  849. {
  850. return;
  851. }
  852. rps_peer->rps_handle = rps;
  853. rps_peer->online = GNUNET_YES;
  854. num_peers_online++;
  855. GNUNET_assert (op == rps_peer->op);
  856. if (NULL != emsg)
  857. {
  858. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  859. "Failed to connect to RPS service: %s\n",
  860. emsg);
  861. ok = 1;
  862. GNUNET_SCHEDULER_shutdown ();
  863. return;
  864. }
  865. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Started client successfully\n");
  866. cur_test_run.main_test (rps_peer);
  867. }
  868. /**
  869. * Adapter function called to establish a connection to
  870. * the RPS service.
  871. *
  872. * @param cls closure
  873. * @param cfg configuration of the peer to connect to; will be available until
  874. * GNUNET_TESTBED_operation_done() is called on the operation returned
  875. * from GNUNET_TESTBED_service_connect()
  876. * @return service handle to return in 'op_result', NULL on error
  877. */
  878. static void *
  879. rps_connect_adapter (void *cls,
  880. const struct GNUNET_CONFIGURATION_Handle *cfg)
  881. {
  882. struct GNUNET_RPS_Handle *h;
  883. h = GNUNET_RPS_connect (cfg);
  884. GNUNET_assert (NULL != h);
  885. if (NULL != cur_test_run.pre_test)
  886. cur_test_run.pre_test (cls, h);
  887. GNUNET_assert (NULL != h);
  888. return h;
  889. }
  890. /**
  891. * Called to open a connection to the peer's statistics
  892. *
  893. * @param cls peer context
  894. * @param cfg configuration of the peer to connect to; will be available until
  895. * GNUNET_TESTBED_operation_done() is called on the operation returned
  896. * from GNUNET_TESTBED_service_connect()
  897. * @return service handle to return in 'op_result', NULL on error
  898. */
  899. static void *
  900. stat_connect_adapter (void *cls,
  901. const struct GNUNET_CONFIGURATION_Handle *cfg)
  902. {
  903. struct RPSPeer *peer = cls;
  904. peer->stats_h = GNUNET_STATISTICS_create ("rps-profiler", cfg);
  905. return peer->stats_h;
  906. }
  907. /**
  908. * Called to disconnect from peer's statistics service
  909. *
  910. * @param cls peer context
  911. * @param op_result service handle returned from the connect adapter
  912. */
  913. static void
  914. stat_disconnect_adapter (void *cls, void *op_result)
  915. {
  916. struct RPSPeer *peer = cls;
  917. // GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch_cancel
  918. // (peer->stats_h, "core", "# peers connected",
  919. // stat_iterator, peer));
  920. // GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch_cancel
  921. // (peer->stats_h, "nse", "# peers connected",
  922. // stat_iterator, peer));
  923. GNUNET_STATISTICS_destroy (op_result, GNUNET_NO);
  924. peer->stats_h = NULL;
  925. }
  926. /**
  927. * Called after successfully opening a connection to a peer's statistics
  928. * service; we register statistics monitoring for CORE and NSE here.
  929. *
  930. * @param cls the callback closure from functions generating an operation
  931. * @param op the operation that has been finished
  932. * @param ca_result the service handle returned from GNUNET_TESTBED_ConnectAdapter()
  933. * @param emsg error message in case the operation has failed; will be NULL if
  934. * operation has executed successfully.
  935. */
  936. static void
  937. stat_complete_cb (void *cls, struct GNUNET_TESTBED_Operation *op,
  938. void *ca_result, const char *emsg)
  939. {
  940. // struct GNUNET_STATISTICS_Handle *sh = ca_result;
  941. // struct RPSPeer *peer = (struct RPSPeer *) cls;
  942. (void) cls;
  943. (void) op;
  944. (void) ca_result;
  945. if (NULL != emsg)
  946. {
  947. GNUNET_break (0);
  948. return;
  949. }
  950. // GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch
  951. // (sh, "core", "# peers connected",
  952. // stat_iterator, peer));
  953. // GNUNET_break (GNUNET_OK == GNUNET_STATISTICS_watch
  954. // (sh, "nse", "# peers connected",
  955. // stat_iterator, peer));
  956. }
  957. /**
  958. * Adapter function called to destroy connection to
  959. * RPS service.
  960. *
  961. * @param cls closure
  962. * @param op_result service handle returned from the connect adapter
  963. */
  964. static void
  965. rps_disconnect_adapter (void *cls,
  966. void *op_result)
  967. {
  968. struct RPSPeer *peer = cls;
  969. struct GNUNET_RPS_Handle *h = op_result;
  970. if (NULL != peer->rps_srh)
  971. {
  972. GNUNET_RPS_stream_cancel (peer->rps_srh);
  973. peer->rps_srh = NULL;
  974. }
  975. GNUNET_assert (NULL != peer);
  976. GNUNET_RPS_disconnect (h);
  977. peer->rps_handle = NULL;
  978. }
  979. /***********************************************************************
  980. * Definition of tests
  981. ***********************************************************************/
  982. // TODO check whether tests can be stopped earlier
  983. static int
  984. default_eval_cb (void)
  985. {
  986. return evaluate ();
  987. }
  988. static int
  989. no_eval (void)
  990. {
  991. return 0;
  992. }
  993. /**
  994. * Initialise given RPSPeer
  995. */
  996. static void
  997. default_init_peer (struct RPSPeer *rps_peer)
  998. {
  999. rps_peer->num_ids_to_request = 1;
  1000. }
  1001. /**
  1002. * Callback to call on receipt of a reply
  1003. *
  1004. * @param cls closure
  1005. * @param n number of peers
  1006. * @param recv_peers the received peers
  1007. */
  1008. static void
  1009. default_reply_handle (void *cls,
  1010. uint64_t n,
  1011. const struct GNUNET_PeerIdentity *recv_peers)
  1012. {
  1013. struct RPSPeer *rps_peer;
  1014. struct PendingReply *pending_rep = (struct PendingReply *) cls;
  1015. unsigned int i;
  1016. rps_peer = pending_rep->rps_peer;
  1017. GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head,
  1018. rps_peer->pending_rep_tail,
  1019. pending_rep);
  1020. rps_peer->num_pending_reps--;
  1021. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1022. "[%s] got %" PRIu64 " peers:\n",
  1023. GNUNET_i2s (rps_peer->peer_id),
  1024. n);
  1025. for (i = 0; i < n; i++)
  1026. {
  1027. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1028. "%u: %s\n",
  1029. i,
  1030. GNUNET_i2s (&recv_peers[i]));
  1031. rps_peer->num_recv_ids++;
  1032. }
  1033. if ((0 == evaluate ()) && (HAVE_QUICK_QUIT == cur_test_run.have_quick_quit))
  1034. {
  1035. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test succeeded before timeout\n");
  1036. GNUNET_assert (NULL != post_test_task);
  1037. GNUNET_SCHEDULER_cancel (post_test_task);
  1038. post_test_task = GNUNET_SCHEDULER_add_now (&post_test_op, NULL);
  1039. GNUNET_assert (NULL != post_test_task);
  1040. }
  1041. }
  1042. /**
  1043. * Request random peers.
  1044. */
  1045. static void
  1046. request_peers (void *cls)
  1047. {
  1048. struct PendingRequest *pending_req = cls;
  1049. struct RPSPeer *rps_peer;
  1050. struct PendingReply *pending_rep;
  1051. if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
  1052. return;
  1053. rps_peer = pending_req->rps_peer;
  1054. GNUNET_assert (1 <= rps_peer->num_pending_reqs);
  1055. GNUNET_CONTAINER_DLL_remove (rps_peer->pending_req_head,
  1056. rps_peer->pending_req_tail,
  1057. pending_req);
  1058. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1059. "Requesting one peer\n");
  1060. pending_rep = GNUNET_new (struct PendingReply);
  1061. pending_rep->rps_peer = rps_peer;
  1062. pending_rep->req_handle = GNUNET_RPS_request_peers (rps_peer->rps_handle,
  1063. 1,
  1064. cur_test_run.reply_handle,
  1065. pending_rep);
  1066. GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_rep_head,
  1067. rps_peer->pending_rep_tail,
  1068. pending_rep);
  1069. rps_peer->num_pending_reps++;
  1070. rps_peer->num_pending_reqs--;
  1071. }
  1072. static void
  1073. cancel_pending_req (struct PendingRequest *pending_req)
  1074. {
  1075. struct RPSPeer *rps_peer;
  1076. rps_peer = pending_req->rps_peer;
  1077. GNUNET_CONTAINER_DLL_remove (rps_peer->pending_req_head,
  1078. rps_peer->pending_req_tail,
  1079. pending_req);
  1080. rps_peer->num_pending_reqs--;
  1081. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1082. "Cancelling pending request\n");
  1083. GNUNET_SCHEDULER_cancel (pending_req->request_task);
  1084. GNUNET_free (pending_req);
  1085. }
  1086. static void
  1087. cancel_request (struct PendingReply *pending_rep)
  1088. {
  1089. struct RPSPeer *rps_peer;
  1090. rps_peer = pending_rep->rps_peer;
  1091. GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head,
  1092. rps_peer->pending_rep_tail,
  1093. pending_rep);
  1094. rps_peer->num_pending_reps--;
  1095. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1096. "Cancelling request\n");
  1097. GNUNET_RPS_request_cancel (pending_rep->req_handle);
  1098. GNUNET_free (pending_rep);
  1099. }
  1100. /**
  1101. * Cancel a request.
  1102. */
  1103. static void
  1104. cancel_request_cb (void *cls)
  1105. {
  1106. struct RPSPeer *rps_peer = cls;
  1107. struct PendingReply *pending_rep;
  1108. if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
  1109. return;
  1110. pending_rep = rps_peer->pending_rep_head;
  1111. GNUNET_assert (1 <= rps_peer->num_pending_reps);
  1112. cancel_request (pending_rep);
  1113. }
  1114. /**
  1115. * Schedule requests for peer @a rps_peer that have neither been scheduled, nor
  1116. * issued, nor replied
  1117. */
  1118. void
  1119. schedule_missing_requests (struct RPSPeer *rps_peer)
  1120. {
  1121. unsigned int i;
  1122. struct PendingRequest *pending_req;
  1123. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1124. "Scheduling %u - %u missing requests\n",
  1125. rps_peer->num_ids_to_request,
  1126. rps_peer->num_pending_reqs + rps_peer->num_pending_reps);
  1127. GNUNET_assert (rps_peer->num_pending_reqs + rps_peer->num_pending_reps <=
  1128. rps_peer->num_ids_to_request);
  1129. for (i = rps_peer->num_pending_reqs + rps_peer->num_pending_reps;
  1130. i < rps_peer->num_ids_to_request; i++)
  1131. {
  1132. pending_req = GNUNET_new (struct PendingRequest);
  1133. pending_req->rps_peer = rps_peer;
  1134. pending_req->request_task = GNUNET_SCHEDULER_add_delayed (
  1135. GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
  1136. cur_test_run.request_interval * i),
  1137. request_peers,
  1138. pending_req);
  1139. GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_req_head,
  1140. rps_peer->pending_req_tail,
  1141. pending_req);
  1142. rps_peer->num_pending_reqs++;
  1143. }
  1144. }
  1145. void
  1146. cancel_pending_req_rep (struct RPSPeer *rps_peer)
  1147. {
  1148. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1149. "Cancelling all (pending) requests.\n");
  1150. while (NULL != rps_peer->pending_req_head)
  1151. cancel_pending_req (rps_peer->pending_req_head);
  1152. GNUNET_assert (0 == rps_peer->num_pending_reqs);
  1153. while (NULL != rps_peer->pending_rep_head)
  1154. cancel_request (rps_peer->pending_rep_head);
  1155. GNUNET_assert (0 == rps_peer->num_pending_reps);
  1156. }
  1157. /***********************************
  1158. * MALICIOUS
  1159. ***********************************/
  1160. /**
  1161. * Initialise only non-mal RPSPeers
  1162. */
  1163. static void
  1164. mal_init_peer (struct RPSPeer *rps_peer)
  1165. {
  1166. if (rps_peer->index >= round (portion * num_peers))
  1167. rps_peer->num_ids_to_request = 1;
  1168. }
  1169. /**
  1170. * @brief Set peers to (non-)malicious before execution
  1171. *
  1172. * Of signature #PreTest
  1173. *
  1174. * @param rps_peer the peer to set (non-) malicious
  1175. * @param h the handle to the service
  1176. */
  1177. static void
  1178. mal_pre (struct RPSPeer *rps_peer, struct GNUNET_RPS_Handle *h)
  1179. {
  1180. #if ENABLE_MALICIOUS
  1181. uint32_t num_mal_peers;
  1182. GNUNET_assert ((1 >= portion) &&
  1183. (0 < portion));
  1184. num_mal_peers = round (portion * num_peers);
  1185. if (rps_peer->index < num_mal_peers)
  1186. {
  1187. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1188. "%u. peer [%s] of %" PRIu32
  1189. " malicious peers turning malicious\n",
  1190. rps_peer->index,
  1191. GNUNET_i2s (rps_peer->peer_id),
  1192. num_mal_peers);
  1193. GNUNET_RPS_act_malicious (h, mal_type, num_mal_peers,
  1194. rps_peer_ids, target_peer);
  1195. }
  1196. #endif /* ENABLE_MALICIOUS */
  1197. }
  1198. static void
  1199. mal_cb (struct RPSPeer *rps_peer)
  1200. {
  1201. uint32_t num_mal_peers;
  1202. if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
  1203. {
  1204. return;
  1205. }
  1206. #if ENABLE_MALICIOUS
  1207. GNUNET_assert ((1 >= portion) &&
  1208. (0 < portion));
  1209. num_mal_peers = round (portion * num_peers);
  1210. if (rps_peer->index >= num_mal_peers)
  1211. { /* It's useless to ask a malicious peer about a random sample -
  1212. it's not sampling */
  1213. GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (
  1214. GNUNET_TIME_UNIT_SECONDS, 2),
  1215. seed_peers, rps_peer);
  1216. schedule_missing_requests (rps_peer);
  1217. }
  1218. #endif /* ENABLE_MALICIOUS */
  1219. }
  1220. /***********************************
  1221. * SINGLE_REQUEST
  1222. ***********************************/
  1223. static void
  1224. single_req_cb (struct RPSPeer *rps_peer)
  1225. {
  1226. if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
  1227. {
  1228. return;
  1229. }
  1230. schedule_missing_requests (rps_peer);
  1231. }
  1232. /***********************************
  1233. * DELAYED_REQUESTS
  1234. ***********************************/
  1235. static void
  1236. delay_req_cb (struct RPSPeer *rps_peer)
  1237. {
  1238. if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
  1239. {
  1240. return;
  1241. }
  1242. schedule_missing_requests (rps_peer);
  1243. }
  1244. /***********************************
  1245. * SEED
  1246. ***********************************/
  1247. static void
  1248. seed_cb (struct RPSPeer *rps_peer)
  1249. {
  1250. if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
  1251. {
  1252. return;
  1253. }
  1254. GNUNET_SCHEDULER_add_delayed (
  1255. GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10),
  1256. seed_peers, rps_peer);
  1257. }
  1258. /***********************************
  1259. * SEED_BIG
  1260. ***********************************/
  1261. static void
  1262. seed_big_cb (struct RPSPeer *rps_peer)
  1263. {
  1264. if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
  1265. {
  1266. return;
  1267. }
  1268. // TODO test seeding > GNUNET_MAX_MESSAGE_SIZE peers
  1269. GNUNET_SCHEDULER_add_delayed (
  1270. GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2),
  1271. seed_peers_big, rps_peer);
  1272. }
  1273. /***********************************
  1274. * SINGLE_PEER_SEED
  1275. ***********************************/
  1276. static void
  1277. single_peer_seed_cb (struct RPSPeer *rps_peer)
  1278. {
  1279. (void) rps_peer;
  1280. // TODO
  1281. }
  1282. /***********************************
  1283. * SEED_REQUEST
  1284. ***********************************/
  1285. static void
  1286. seed_req_cb (struct RPSPeer *rps_peer)
  1287. {
  1288. if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
  1289. {
  1290. return;
  1291. }
  1292. GNUNET_SCHEDULER_add_delayed (
  1293. GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2),
  1294. seed_peers, rps_peer);
  1295. schedule_missing_requests (rps_peer);
  1296. }
  1297. // TODO start big mal
  1298. /***********************************
  1299. * REQUEST_CANCEL
  1300. ***********************************/
  1301. static void
  1302. req_cancel_cb (struct RPSPeer *rps_peer)
  1303. {
  1304. if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
  1305. {
  1306. return;
  1307. }
  1308. schedule_missing_requests (rps_peer);
  1309. GNUNET_SCHEDULER_add_delayed (
  1310. GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
  1311. (cur_test_run.request_interval + 1)),
  1312. cancel_request_cb, rps_peer);
  1313. }
  1314. /***********************************
  1315. * CHURN
  1316. ***********************************/
  1317. static void
  1318. churn (void *cls);
  1319. /**
  1320. * @brief Starts churn
  1321. *
  1322. * Has signature of #MainTest
  1323. *
  1324. * This is not implemented too nicely as this is called for each peer, but we
  1325. * only need to call it once. (Yes we check that we only schedule the task
  1326. * once.)
  1327. *
  1328. * @param rps_peer The peer it's called for
  1329. */
  1330. static void
  1331. churn_test_cb (struct RPSPeer *rps_peer)
  1332. {
  1333. if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
  1334. {
  1335. return;
  1336. }
  1337. /* Start churn */
  1338. if ((HAVE_CHURN == cur_test_run.have_churn) && (NULL == churn_task))
  1339. {
  1340. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1341. "Starting churn task\n");
  1342. churn_task = GNUNET_SCHEDULER_add_delayed (
  1343. GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
  1344. churn,
  1345. NULL);
  1346. }
  1347. else
  1348. {
  1349. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1350. "Not starting churn task\n");
  1351. }
  1352. schedule_missing_requests (rps_peer);
  1353. }
  1354. /***********************************
  1355. * SUB
  1356. ***********************************/
  1357. static void
  1358. got_stream_peer_cb (void *cls,
  1359. uint64_t num_peers,
  1360. const struct GNUNET_PeerIdentity *peers)
  1361. {
  1362. const struct RPSPeer *rps_peer = cls;
  1363. for (uint64_t i = 0; i < num_peers; i++)
  1364. {
  1365. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1366. "Peer %" PRIu32 " received [%s] from stream.\n",
  1367. rps_peer->index,
  1368. GNUNET_i2s (&peers[i]));
  1369. if ((0 != rps_peer->index) &&
  1370. (0 == memcmp (&peers[i],
  1371. &rps_peers[0].peer_id,
  1372. sizeof(struct GNUNET_PeerIdentity))))
  1373. {
  1374. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  1375. "Received a peer id outside sub\n");
  1376. ok = 1;
  1377. }
  1378. else if ((0 == rps_peer->index) &&
  1379. (0 != memcmp (&peers[i],
  1380. &rps_peers[0].peer_id,
  1381. sizeof(struct GNUNET_PeerIdentity))))
  1382. {
  1383. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  1384. "Received a peer id outside sub (lonely)\n");
  1385. ok = 1;
  1386. }
  1387. }
  1388. }
  1389. static void
  1390. sub_post (struct RPSPeer *rps_peer)
  1391. {
  1392. if (0 != rps_peer->index)
  1393. GNUNET_RPS_sub_stop (rps_peer->rps_handle, "test");
  1394. else
  1395. GNUNET_RPS_sub_stop (rps_peer->rps_handle, "lonely");
  1396. }
  1397. static void
  1398. sub_pre (struct RPSPeer *rps_peer, struct GNUNET_RPS_Handle *h)
  1399. {
  1400. (void) rps_peer;
  1401. if (0 != rps_peer->index)
  1402. GNUNET_RPS_sub_start (h, "test");
  1403. else
  1404. GNUNET_RPS_sub_start (h, "lonely"); /* have a group of one */
  1405. rps_peer->rps_srh = GNUNET_RPS_stream_request (h,
  1406. &got_stream_peer_cb,
  1407. rps_peer);
  1408. }
  1409. /***********************************
  1410. * PROFILER
  1411. ***********************************/
  1412. /**
  1413. * Callback to be called when RPS service is started or stopped at peers
  1414. *
  1415. * @param cls NULL
  1416. * @param op the operation handle
  1417. * @param emsg NULL on success; otherwise an error description
  1418. */
  1419. static void
  1420. churn_cb (void *cls,
  1421. struct GNUNET_TESTBED_Operation *op,
  1422. const char *emsg)
  1423. {
  1424. (void) op;
  1425. // FIXME
  1426. struct OpListEntry *entry = cls;
  1427. if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
  1428. {
  1429. return;
  1430. }
  1431. GNUNET_TESTBED_operation_done (entry->op);
  1432. if (NULL != emsg)
  1433. {
  1434. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  1435. "Failed to start/stop RPS at a peer\n");
  1436. GNUNET_SCHEDULER_shutdown ();
  1437. return;
  1438. }
  1439. GNUNET_assert (0 != entry->delta);
  1440. num_peers_online += entry->delta;
  1441. if (PEER_GO_OFFLINE == entry->delta)
  1442. { /* Peer hopefully just went offline */
  1443. if (GNUNET_YES != rps_peers[entry->index].online)
  1444. {
  1445. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  1446. "peer %s was expected to go offline but is still marked as online\n",
  1447. GNUNET_i2s (rps_peers[entry->index].peer_id));
  1448. GNUNET_break (0);
  1449. }
  1450. else
  1451. {
  1452. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1453. "peer %s probably went offline as expected\n",
  1454. GNUNET_i2s (rps_peers[entry->index].peer_id));
  1455. }
  1456. rps_peers[entry->index].online = GNUNET_NO;
  1457. }
  1458. else if (PEER_GO_ONLINE < entry->delta)
  1459. { /* Peer hopefully just went online */
  1460. if (GNUNET_NO != rps_peers[entry->index].online)
  1461. {
  1462. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  1463. "peer %s was expected to go online but is still marked as offline\n",
  1464. GNUNET_i2s (rps_peers[entry->index].peer_id));
  1465. GNUNET_break (0);
  1466. }
  1467. else
  1468. {
  1469. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1470. "peer %s probably went online as expected\n",
  1471. GNUNET_i2s (rps_peers[entry->index].peer_id));
  1472. if (NULL != cur_test_run.pre_test)
  1473. {
  1474. cur_test_run.pre_test (&rps_peers[entry->index],
  1475. rps_peers[entry->index].rps_handle);
  1476. schedule_missing_requests (&rps_peers[entry->index]);
  1477. }
  1478. }
  1479. rps_peers[entry->index].online = GNUNET_YES;
  1480. }
  1481. else
  1482. {
  1483. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  1484. "Invalid value for delta: %i\n", entry->delta);
  1485. GNUNET_break (0);
  1486. }
  1487. GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry);
  1488. rps_peers[entry->index].entry_op_manage = NULL;
  1489. GNUNET_free (entry);
  1490. // if (num_peers_in_round[current_round] == peers_running)
  1491. // run_round ();
  1492. }
  1493. /**
  1494. * @brief Set the rps-service up or down for a specific peer
  1495. *
  1496. * @param i index of action
  1497. * @param j index of peer
  1498. * @param delta (#PEER_ONLINE_DELTA) down (-1) or up (1)
  1499. * @param prob_go_on_off the probability of the action
  1500. */
  1501. static void
  1502. manage_service_wrapper (unsigned int i, unsigned int j,
  1503. enum PEER_ONLINE_DELTA delta,
  1504. double prob_go_on_off)
  1505. {
  1506. struct OpListEntry *entry = NULL;
  1507. uint32_t prob;
  1508. /* make sure that management operation is not already scheduled */
  1509. if (NULL != rps_peers[j].entry_op_manage)
  1510. {
  1511. return;
  1512. }
  1513. prob = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
  1514. UINT32_MAX);
  1515. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1516. "%u. selected peer (%u: %s) is %s.\n",
  1517. i,
  1518. j,
  1519. GNUNET_i2s (rps_peers[j].peer_id),
  1520. (PEER_GO_ONLINE == delta) ? "online" : "offline");
  1521. if (prob < prob_go_on_off * UINT32_MAX)
  1522. {
  1523. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1524. "%s goes %s\n",
  1525. GNUNET_i2s (rps_peers[j].peer_id),
  1526. (PEER_GO_OFFLINE == delta) ? "offline" : "online");
  1527. if (PEER_GO_OFFLINE == delta)
  1528. cancel_pending_req_rep (&rps_peers[j]);
  1529. entry = make_oplist_entry ();
  1530. entry->delta = delta;
  1531. entry->index = j;
  1532. entry->op = GNUNET_TESTBED_peer_manage_service (NULL,
  1533. testbed_peers[j],
  1534. "rps",
  1535. &churn_cb,
  1536. entry,
  1537. (PEER_GO_OFFLINE == delta) ?
  1538. 0 : 1);
  1539. rps_peers[j].entry_op_manage = entry;
  1540. }
  1541. }
  1542. static void
  1543. churn (void *cls)
  1544. {
  1545. (void) cls;
  1546. unsigned int i;
  1547. unsigned int j;
  1548. double portion_online;
  1549. unsigned int *permut;
  1550. double prob_go_offline;
  1551. double portion_go_online;
  1552. double portion_go_offline;
  1553. if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
  1554. {
  1555. return;
  1556. }
  1557. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1558. "Churn function executing\n");
  1559. churn_task = NULL; /* Should be invalid by now */
  1560. /* Compute the probability for an online peer to go offline
  1561. * this round */
  1562. portion_online = num_peers_online * 1.0 / num_peers;
  1563. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1564. "Portion online: %f\n",
  1565. portion_online);
  1566. portion_go_online = ((1 - portion_online) * .5 * .66);
  1567. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1568. "Portion that should go online: %f\n",
  1569. portion_go_online);
  1570. portion_go_offline = (portion_online + portion_go_online) - .75;
  1571. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1572. "Portion that probably goes offline: %f\n",
  1573. portion_go_offline);
  1574. prob_go_offline = portion_go_offline / (portion_online * .5);
  1575. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1576. "Probability of a selected online peer to go offline: %f\n",
  1577. prob_go_offline);
  1578. permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_WEAK,
  1579. (unsigned int) num_peers);
  1580. /* Go over 50% randomly chosen peers */
  1581. for (i = 0; i < .5 * num_peers; i++)
  1582. {
  1583. j = permut[i];
  1584. /* If online, shut down with certain probability */
  1585. if (GNUNET_YES == rps_peers[j].online)
  1586. {
  1587. manage_service_wrapper (i, j, -1, prob_go_offline);
  1588. }
  1589. /* If offline, restart with certain probability */
  1590. else if (GNUNET_NO == rps_peers[j].online)
  1591. {
  1592. manage_service_wrapper (i, j, 1, 0.66);
  1593. }
  1594. }
  1595. GNUNET_free (permut);
  1596. churn_task = GNUNET_SCHEDULER_add_delayed (
  1597. GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2),
  1598. churn,
  1599. NULL);
  1600. }
  1601. /**
  1602. * Initialise given RPSPeer
  1603. */
  1604. static void
  1605. profiler_init_peer (struct RPSPeer *rps_peer)
  1606. {
  1607. if (num_peers - 1 == rps_peer->index)
  1608. rps_peer->num_ids_to_request = cur_test_run.num_requests;
  1609. }
  1610. /**
  1611. * Callback to call on receipt of a reply
  1612. *
  1613. * @param cls closure
  1614. * @param n number of peers
  1615. * @param recv_peers the received peers
  1616. */
  1617. static void
  1618. profiler_reply_handle (void *cls,
  1619. uint64_t n,
  1620. const struct GNUNET_PeerIdentity *recv_peers)
  1621. {
  1622. struct RPSPeer *rps_peer;
  1623. struct RPSPeer *rcv_rps_peer;
  1624. char *file_name;
  1625. char *file_name_dh;
  1626. unsigned int i;
  1627. struct PendingReply *pending_rep = (struct PendingReply *) cls;
  1628. rps_peer = pending_rep->rps_peer;
  1629. file_name = "/tmp/rps/received_ids";
  1630. file_name_dh = "/tmp/rps/diehard_input";
  1631. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1632. "[%s] got %" PRIu64 " peers:\n",
  1633. GNUNET_i2s (rps_peer->peer_id),
  1634. n);
  1635. for (i = 0; i < n; i++)
  1636. {
  1637. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1638. "%u: %s\n",
  1639. i,
  1640. GNUNET_i2s (&recv_peers[i]));
  1641. tofile (file_name,
  1642. "%s\n",
  1643. GNUNET_i2s_full (&recv_peers[i]));
  1644. rcv_rps_peer = GNUNET_CONTAINER_multipeermap_get (peer_map, &recv_peers[i]);
  1645. GNUNET_assert (NULL != rcv_rps_peer);
  1646. tofile (file_name_dh,
  1647. "%" PRIu32 "\n",
  1648. (uint32_t) rcv_rps_peer->index);
  1649. }
  1650. default_reply_handle (cls, n, recv_peers);
  1651. }
  1652. static void
  1653. profiler_cb (struct RPSPeer *rps_peer)
  1654. {
  1655. if ((GNUNET_YES == in_shutdown) || (GNUNET_YES == post_test))
  1656. {
  1657. return;
  1658. }
  1659. /* Start churn */
  1660. if ((HAVE_CHURN == cur_test_run.have_churn) && (NULL == churn_task))
  1661. {
  1662. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1663. "Starting churn task\n");
  1664. churn_task = GNUNET_SCHEDULER_add_delayed (
  1665. GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5),
  1666. churn,
  1667. NULL);
  1668. }
  1669. else
  1670. {
  1671. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1672. "Not starting churn task\n");
  1673. }
  1674. /* Only request peer ids at one peer.
  1675. * (It's the before-last because last one is target of the focussed attack.)
  1676. */
  1677. if (eval_peer == rps_peer)
  1678. schedule_missing_requests (rps_peer);
  1679. }
  1680. /**
  1681. * Function called from #profiler_eval with a filename.
  1682. *
  1683. * @param cls closure
  1684. * @param filename complete filename (absolute path)
  1685. * @return #GNUNET_OK to continue to iterate,
  1686. * #GNUNET_NO to stop iteration with no error,
  1687. * #GNUNET_SYSERR to abort iteration with error!
  1688. */
  1689. int
  1690. file_name_cb (void *cls, const char *filename)
  1691. {
  1692. (void) cls;
  1693. if (NULL != strstr (filename, "sampler_el"))
  1694. {
  1695. struct RPS_SamplerElement *s_elem;
  1696. struct GNUNET_CRYPTO_AuthKey auth_key;
  1697. const char *key_char;
  1698. uint32_t i;
  1699. key_char = filename + 20; /* Length of "/tmp/rps/sampler_el-" */
  1700. tofile (filename, "--------------------------\n");
  1701. auth_key = string_to_auth_key (key_char);
  1702. s_elem = RPS_sampler_elem_create ();
  1703. RPS_sampler_elem_set (s_elem, auth_key);
  1704. for (i = 0; i < num_peers; i++)
  1705. {
  1706. RPS_sampler_elem_next (s_elem, &rps_peer_ids[i]);
  1707. }
  1708. RPS_sampler_elem_destroy (s_elem);
  1709. }
  1710. return GNUNET_OK;
  1711. }
  1712. /**
  1713. * This is run after the test finished.
  1714. *
  1715. * Compute all perfect samples.
  1716. */
  1717. int
  1718. profiler_eval (void)
  1719. {
  1720. /* Compute perfect sample for each sampler element */
  1721. if (-1 == GNUNET_DISK_directory_scan ("/tmp/rps/", file_name_cb, NULL))
  1722. {
  1723. GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Scan of directory failed\n");
  1724. }
  1725. return evaluate ();
  1726. }
  1727. /**
  1728. * @brief is b in view of a?
  1729. *
  1730. * @param a
  1731. * @param b
  1732. *
  1733. * @return
  1734. */
  1735. static int
  1736. is_in_view (uint32_t a, uint32_t b)
  1737. {
  1738. uint32_t i;
  1739. for (i = 0; i < rps_peers[a].cur_view_count; i++)
  1740. {
  1741. if (0 == memcmp (rps_peers[b].peer_id,
  1742. &rps_peers[a].cur_view[i],
  1743. sizeof(struct GNUNET_PeerIdentity)))
  1744. {
  1745. return GNUNET_YES;
  1746. }
  1747. }
  1748. return GNUNET_NO;
  1749. }
  1750. static uint32_t
  1751. get_idx_of_pid (const struct GNUNET_PeerIdentity *pid)
  1752. {
  1753. uint32_t i;
  1754. for (i = 0; i < num_peers; i++)
  1755. {
  1756. if (0 == memcmp (pid,
  1757. rps_peers[i].peer_id,
  1758. sizeof(struct GNUNET_PeerIdentity)))
  1759. {
  1760. return i;
  1761. }
  1762. }
  1763. // return 0; /* Should not happen - make compiler happy */
  1764. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  1765. "No known _PeerIdentity %s!\n",
  1766. GNUNET_i2s_full (pid));
  1767. GNUNET_assert (0);
  1768. }
  1769. /**
  1770. * @brief Counts number of peers in view of a that have b in their view
  1771. *
  1772. * @param a
  1773. * @param uint32_tb
  1774. *
  1775. * @return
  1776. */
  1777. static uint32_t
  1778. count_containing_views (uint32_t a, uint32_t b)
  1779. {
  1780. uint32_t i;
  1781. uint32_t peer_idx;
  1782. uint32_t count = 0;
  1783. for (i = 0; i < rps_peers[a].cur_view_count; i++)
  1784. {
  1785. peer_idx = get_idx_of_pid (&rps_peers[a].cur_view[i]);
  1786. if (GNUNET_YES == is_in_view (peer_idx, b))
  1787. {
  1788. count++;
  1789. }
  1790. }
  1791. return count;
  1792. }
  1793. /**
  1794. * @brief Computes the probability for each other peer to be selected by the
  1795. * sampling process based on the views of all peers
  1796. *
  1797. * @param peer_idx index of the peer that is about to sample
  1798. */
  1799. static void
  1800. compute_probabilities (uint32_t peer_idx)
  1801. {
  1802. // double probs[num_peers] = { 0 };
  1803. double probs[num_peers];
  1804. size_t probs_as_str_size = (num_peers * 10 + 1) * sizeof(char);
  1805. char *probs_as_str = GNUNET_malloc (probs_as_str_size);
  1806. char *probs_as_str_cpy;
  1807. uint32_t i;
  1808. double prob_push;
  1809. double prob_pull;
  1810. uint32_t view_size;
  1811. uint32_t cont_views;
  1812. uint32_t number_of_being_in_pull_events;
  1813. int tmp;
  1814. uint32_t count_non_zero_prob = 0;
  1815. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1816. "Computing probabilities for peer %" PRIu32 "\n", peer_idx);
  1817. /* Firstly without knowledge of old views */
  1818. for (i = 0; i < num_peers; i++)
  1819. {
  1820. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1821. "\tfor peer %" PRIu32 ":\n", i);
  1822. view_size = rps_peers[i].cur_view_count;
  1823. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1824. "\t\tview_size: %" PRIu32 "\n", view_size);
  1825. /* For peer i the probability of being sampled is
  1826. * evenly distributed among all possibly observed peers. */
  1827. /* We could have observed a peer in three cases:
  1828. * 1. peer sent a push
  1829. * 2. peer was contained in a pull reply
  1830. * 3. peer was in history (sampler) - ignored for now */
  1831. /* 1. Probability of having received a push from peer i */
  1832. if ((GNUNET_YES == is_in_view (i, peer_idx)) &&
  1833. (1 <= (0.45 * view_size)))
  1834. {
  1835. prob_push = 1.0 * binom (0.45 * view_size, 1)
  1836. /
  1837. binom (view_size, 0.45 * view_size);
  1838. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1839. "\t\t%" PRIu32 " is in %" PRIu32 "'s view, prob: %f\n",
  1840. peer_idx,
  1841. i,
  1842. prob_push);
  1843. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1844. "\t\tposs choices from view: %" PRIu32 ", containing i: %"
  1845. PRIu32 "\n",
  1846. binom (view_size, 0.45 * view_size),
  1847. binom (0.45 * view_size, 1));
  1848. }
  1849. else
  1850. {
  1851. prob_push = 0;
  1852. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1853. "\t\t%" PRIu32 " is not in %" PRIu32 "'s view, prob: 0\n",
  1854. peer_idx,
  1855. i);
  1856. }
  1857. /* 2. Probability of peer i being contained in pulls */
  1858. view_size = rps_peers[peer_idx].cur_view_count;
  1859. cont_views = count_containing_views (peer_idx, i);
  1860. number_of_being_in_pull_events =
  1861. (binom (view_size, 0.45 * view_size)
  1862. - binom (view_size - cont_views, 0.45 * view_size));
  1863. if (0 != number_of_being_in_pull_events)
  1864. {
  1865. prob_pull = number_of_being_in_pull_events
  1866. /
  1867. (1.0 * binom (view_size, 0.45 * view_size));
  1868. }
  1869. else
  1870. {
  1871. prob_pull = 0;
  1872. }
  1873. probs[i] = prob_push + prob_pull - (prob_push * prob_pull);
  1874. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1875. "\t\t%" PRIu32 " has %" PRIu32 " of %" PRIu32
  1876. " peers in its view who know %" PRIu32 " prob: %f\n",
  1877. peer_idx,
  1878. cont_views,
  1879. view_size,
  1880. i,
  1881. prob_pull);
  1882. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1883. "\t\tnumber of possible pull combinations: %" PRIu32 "\n",
  1884. binom (view_size, 0.45 * view_size));
  1885. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1886. "\t\tnumber of possible pull combinations without %" PRIu32
  1887. ": %" PRIu32 "\n",
  1888. i,
  1889. binom (view_size - cont_views, 0.45 * view_size));
  1890. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  1891. "\t\tnumber of possible pull combinations with %" PRIu32
  1892. ": %" PRIu32 "\n",
  1893. i,
  1894. number_of_being_in_pull_events);
  1895. if (0 != probs[i])
  1896. count_non_zero_prob++;
  1897. }
  1898. /* normalize */
  1899. if (0 != count_non_zero_prob)
  1900. {
  1901. for (i = 0; i < num_peers; i++)
  1902. {
  1903. probs[i] = probs[i] * (1.0 / count_non_zero_prob);
  1904. }
  1905. }
  1906. else
  1907. {
  1908. for (i = 0; i < num_peers; i++)
  1909. {
  1910. probs[i] = 0;
  1911. }
  1912. }
  1913. /* str repr */
  1914. for (i = 0; i < num_peers; i++)
  1915. {
  1916. probs_as_str_cpy = GNUNET_strndup (probs_as_str, probs_as_str_size);
  1917. tmp = GNUNET_snprintf (probs_as_str,
  1918. probs_as_str_size,
  1919. "%s %7.6f", probs_as_str_cpy, probs[i]);
  1920. GNUNET_free (probs_as_str_cpy);
  1921. GNUNET_assert (0 <= tmp);
  1922. }
  1923. to_file_w_len (rps_peers[peer_idx].file_name_probs,
  1924. probs_as_str_size,
  1925. probs_as_str);
  1926. GNUNET_free (probs_as_str);
  1927. }
  1928. /**
  1929. * @brief This counts the number of peers in which views a given peer occurs.
  1930. *
  1931. * It also stores this value in the rps peer.
  1932. *
  1933. * @param peer_idx the index of the peer to count the representation
  1934. *
  1935. * @return the number of occurrences
  1936. */
  1937. static uint32_t
  1938. count_peer_in_views_2 (uint32_t peer_idx)
  1939. {
  1940. uint32_t i, j;
  1941. uint32_t count = 0;
  1942. for (i = 0; i < num_peers; i++) /* Peer in which view is counted */
  1943. {
  1944. for (j = 0; j < rps_peers[i].cur_view_count; j++) /* entry in view */
  1945. {
  1946. if (0 == memcmp (rps_peers[peer_idx].peer_id,
  1947. &rps_peers[i].cur_view[j],
  1948. sizeof(struct GNUNET_PeerIdentity)))
  1949. {
  1950. count++;
  1951. break;
  1952. }
  1953. }
  1954. }
  1955. rps_peers[peer_idx].count_in_views = count;
  1956. return count;
  1957. }
  1958. static uint32_t
  1959. cumulated_view_sizes ()
  1960. {
  1961. uint32_t i;
  1962. view_sizes = 0;
  1963. for (i = 0; i < num_peers; i++) /* Peer in which view is counted */
  1964. {
  1965. view_sizes += rps_peers[i].cur_view_count;
  1966. }
  1967. return view_sizes;
  1968. }
  1969. static void
  1970. count_peer_in_views (uint32_t *count_peers)
  1971. {
  1972. uint32_t i, j;
  1973. for (i = 0; i < num_peers; i++) /* Peer in which view is counted */
  1974. {
  1975. for (j = 0; j < rps_peers[i].cur_view_count; j++) /* entry in view */
  1976. {
  1977. if (0 == memcmp (rps_peers[i].peer_id,
  1978. &rps_peers[i].cur_view[j],
  1979. sizeof(struct GNUNET_PeerIdentity)))
  1980. {
  1981. count_peers[i]++;
  1982. }
  1983. }
  1984. }
  1985. }
  1986. void
  1987. compute_diversity ()
  1988. {
  1989. uint32_t i;
  1990. /* ith entry represents the numer of occurrences in other peer's views */
  1991. uint32_t *count_peers = GNUNET_new_array (num_peers, uint32_t);
  1992. uint32_t views_total_size;
  1993. double expected;
  1994. /* deviation from expected number of peers */
  1995. double *deviation = GNUNET_new_array (num_peers, double);
  1996. views_total_size = 0;
  1997. expected = 0;
  1998. /* For each peer count its representation in other peer's views*/
  1999. for (i = 0; i < num_peers; i++) /* Peer to count */
  2000. {
  2001. views_total_size += rps_peers[i].cur_view_count;
  2002. count_peer_in_views (count_peers);
  2003. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2004. "Counted representation of %" PRIu32 "th peer [%s]: %" PRIu32
  2005. "\n",
  2006. i,
  2007. GNUNET_i2s (rps_peers[i].peer_id),
  2008. count_peers[i]);
  2009. }
  2010. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2011. "size of all views combined: %" PRIu32 "\n",
  2012. views_total_size);
  2013. expected = ((double) 1 / num_peers) * views_total_size;
  2014. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2015. "Expected number of occurrences of each peer in all views: %f\n",
  2016. expected);
  2017. for (i = 0; i < num_peers; i++) /* Peer to count */
  2018. {
  2019. deviation[i] = expected - count_peers[i];
  2020. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2021. "Deviation from expectation: %f\n", deviation[i]);
  2022. }
  2023. GNUNET_free (count_peers);
  2024. GNUNET_free (deviation);
  2025. }
  2026. void
  2027. print_view_sizes ()
  2028. {
  2029. uint32_t i;
  2030. for (i = 0; i < num_peers; i++) /* Peer to count */
  2031. {
  2032. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2033. "View size of %" PRIu32 ". [%s] is %" PRIu32 "\n",
  2034. i,
  2035. GNUNET_i2s (rps_peers[i].peer_id),
  2036. rps_peers[i].cur_view_count);
  2037. }
  2038. }
  2039. void
  2040. all_views_updated_cb ()
  2041. {
  2042. compute_diversity ();
  2043. print_view_sizes ();
  2044. }
  2045. void
  2046. view_update_cb (void *cls,
  2047. uint64_t view_size,
  2048. const struct GNUNET_PeerIdentity *peers)
  2049. {
  2050. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2051. "View was updated (%" PRIu64 ")\n", view_size);
  2052. struct RPSPeer *rps_peer = (struct RPSPeer *) cls;
  2053. to_file ("/tmp/rps/view_sizes.txt",
  2054. "%" PRIu64 " %" PRIu32 "",
  2055. rps_peer->index,
  2056. view_size);
  2057. for (int i = 0; i < view_size; i++)
  2058. {
  2059. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2060. "\t%s\n", GNUNET_i2s (&peers[i]));
  2061. }
  2062. GNUNET_array_grow (rps_peer->cur_view,
  2063. rps_peer->cur_view_count,
  2064. view_size);
  2065. // *rps_peer->cur_view = *peers;
  2066. GNUNET_memcpy (rps_peer->cur_view,
  2067. peers,
  2068. view_size * sizeof(struct GNUNET_PeerIdentity));
  2069. to_file ("/tmp/rps/count_in_views.txt",
  2070. "%" PRIu64 " %" PRIu32 "",
  2071. rps_peer->index,
  2072. count_peer_in_views_2 (rps_peer->index));
  2073. cumulated_view_sizes ();
  2074. if (0 != view_size)
  2075. {
  2076. to_file ("/tmp/rps/repr.txt",
  2077. "%" PRIu64 /* index */
  2078. " %" PRIu32 /* occurrence in views */
  2079. " %" PRIu32 /* view sizes */
  2080. " %f" /* fraction of repr in views */
  2081. " %f" /* average view size */
  2082. " %f" /* prob of occurrence in view slot */
  2083. " %f" "", /* exp frac of repr in views */
  2084. rps_peer->index,
  2085. count_peer_in_views_2 (rps_peer->index),
  2086. view_sizes,
  2087. count_peer_in_views_2 (rps_peer->index) / (view_size * 1.0), /* fraction of representation in views */
  2088. view_sizes / (view_size * 1.0), /* average view size */
  2089. 1.0 / view_size, /* prob of occurrence in view slot */
  2090. (1.0 / view_size) * (view_sizes / view_size) /* expected fraction of repr in views */
  2091. );
  2092. }
  2093. compute_probabilities (rps_peer->index);
  2094. all_views_updated_cb ();
  2095. }
  2096. static void
  2097. pre_profiler (struct RPSPeer *rps_peer, struct GNUNET_RPS_Handle *h)
  2098. {
  2099. rps_peer->file_name_probs =
  2100. store_prefix_file_name (rps_peer->peer_id, "probs");
  2101. GNUNET_RPS_view_request (h, 0, view_update_cb, rps_peer);
  2102. }
  2103. void
  2104. write_final_stats (void)
  2105. {
  2106. uint32_t i;
  2107. for (i = 0; i < num_peers; i++)
  2108. {
  2109. to_file ("/tmp/rps/final_stats.dat",
  2110. "%" PRIu32 " " /* index */
  2111. "%s %" /* id */
  2112. PRIu64 " %" /* rounds */
  2113. PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64
  2114. " %" /* blocking */
  2115. PRIu64 " %" PRIu64 " %" PRIu64 " %" /* issued */
  2116. PRIu64 " %" PRIu64 " %" PRIu64 " %" /* sent */
  2117. PRIu64 " %" PRIu64 " %" PRIu64 /* recv */,
  2118. i,
  2119. GNUNET_i2s (rps_peers[i].peer_id),
  2120. rps_peers[i].num_rounds,
  2121. rps_peers[i].num_blocks,
  2122. rps_peers[i].num_blocks_many_push,
  2123. rps_peers[i].num_blocks_no_push,
  2124. rps_peers[i].num_blocks_no_pull,
  2125. rps_peers[i].num_blocks_many_push_no_pull,
  2126. rps_peers[i].num_blocks_no_push_no_pull,
  2127. rps_peers[i].num_issued_push,
  2128. rps_peers[i].num_issued_pull_req,
  2129. rps_peers[i].num_issued_pull_rep,
  2130. rps_peers[i].num_sent_push,
  2131. rps_peers[i].num_sent_pull_req,
  2132. rps_peers[i].num_sent_pull_rep,
  2133. rps_peers[i].num_recv_push,
  2134. rps_peers[i].num_recv_pull_req,
  2135. rps_peers[i].num_recv_pull_rep);
  2136. }
  2137. }
  2138. /**
  2139. * Continuation called by #GNUNET_STATISTICS_get() functions.
  2140. *
  2141. * Remembers that this specific statistics value was received for this peer.
  2142. * Checks whether all peers received their statistics yet.
  2143. * Issues the shutdown.
  2144. *
  2145. * @param cls closure
  2146. * @param success #GNUNET_OK if statistics were
  2147. * successfully obtained, #GNUNET_SYSERR if not.
  2148. */
  2149. void
  2150. post_test_shutdown_ready_cb (void *cls,
  2151. int success)
  2152. {
  2153. struct STATcls *stat_cls = (struct STATcls *) cls;
  2154. struct RPSPeer *rps_peer = stat_cls->rps_peer;
  2155. if (GNUNET_OK == success)
  2156. {
  2157. /* set flag that we we got the value */
  2158. rps_peer->stat_collected_flags |= stat_cls->stat_type;
  2159. }
  2160. else
  2161. {
  2162. GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
  2163. "Peer %u did not receive statistics value\n",
  2164. rps_peer->index);
  2165. GNUNET_free (stat_cls);
  2166. GNUNET_break (0);
  2167. }
  2168. if ((NULL != rps_peer->stat_op) &&
  2169. (GNUNET_YES == check_statistics_collect_completed_single_peer (
  2170. rps_peer)) )
  2171. {
  2172. GNUNET_TESTBED_operation_done (rps_peer->stat_op);
  2173. }
  2174. write_final_stats ();
  2175. if (GNUNET_YES == check_statistics_collect_completed ())
  2176. {
  2177. // write_final_stats ();
  2178. GNUNET_free (stat_cls);
  2179. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2180. "Shutting down\n");
  2181. GNUNET_SCHEDULER_shutdown ();
  2182. }
  2183. else
  2184. {
  2185. GNUNET_free (stat_cls);
  2186. }
  2187. }
  2188. /**
  2189. * @brief Converts string representation to the corresponding #STAT_TYPE enum.
  2190. *
  2191. * @param stat_str string representation of statistics specifier
  2192. *
  2193. * @return corresponding enum
  2194. */
  2195. enum STAT_TYPE
  2196. stat_str_2_type (const char *stat_str)
  2197. {
  2198. if (0 == strncmp ("# rounds blocked - no pull replies", stat_str, strlen (
  2199. "# rounds blocked - no pull replies")))
  2200. {
  2201. return STAT_TYPE_BLOCKS_NO_PULL;
  2202. }
  2203. else if (0 == strncmp ("# rounds blocked - too many pushes, no pull replies",
  2204. stat_str, strlen (
  2205. "# rounds blocked - too many pushes, no pull replies")))
  2206. {
  2207. return STAT_TYPE_BLOCKS_MANY_PUSH_NO_PULL;
  2208. }
  2209. else if (0 == strncmp ("# rounds blocked - too many pushes", stat_str,
  2210. strlen ("# rounds blocked - too many pushes")))
  2211. {
  2212. return STAT_TYPE_BLOCKS_MANY_PUSH;
  2213. }
  2214. else if (0 == strncmp ("# rounds blocked - no pushes, no pull replies",
  2215. stat_str, strlen (
  2216. "# rounds blocked - no pushes, no pull replies")))
  2217. {
  2218. return STAT_TYPE_BLOCKS_NO_PUSH_NO_PULL;
  2219. }
  2220. else if (0 == strncmp ("# rounds blocked - no pushes", stat_str, strlen (
  2221. "# rounds blocked - no pushes")))
  2222. {
  2223. return STAT_TYPE_BLOCKS_NO_PUSH;
  2224. }
  2225. else if (0 == strncmp ("# rounds blocked", stat_str, strlen (
  2226. "# rounds blocked")))
  2227. {
  2228. return STAT_TYPE_BLOCKS;
  2229. }
  2230. else if (0 == strncmp ("# rounds", stat_str, strlen ("# rounds")))
  2231. {
  2232. return STAT_TYPE_ROUNDS;
  2233. }
  2234. else if (0 == strncmp ("# push send issued", stat_str, strlen (
  2235. "# push send issued")))
  2236. {
  2237. return STAT_TYPE_ISSUED_PUSH_SEND;
  2238. }
  2239. else if (0 == strncmp ("# pull request send issued", stat_str, strlen (
  2240. "# pull request send issued")))
  2241. {
  2242. return STAT_TYPE_ISSUED_PULL_REQ;
  2243. }
  2244. else if (0 == strncmp ("# pull reply send issued", stat_str, strlen (
  2245. "# pull reply send issued")))
  2246. {
  2247. return STAT_TYPE_ISSUED_PULL_REP;
  2248. }
  2249. else if (0 == strncmp ("# pushes sent", stat_str, strlen ("# pushes sent")))
  2250. {
  2251. return STAT_TYPE_SENT_PUSH_SEND;
  2252. }
  2253. else if (0 == strncmp ("# pull requests sent", stat_str, strlen (
  2254. "# pull requests sent")))
  2255. {
  2256. return STAT_TYPE_SENT_PULL_REQ;
  2257. }
  2258. else if (0 == strncmp ("# pull replys sent", stat_str, strlen (
  2259. "# pull replys sent")))
  2260. {
  2261. return STAT_TYPE_SENT_PULL_REP;
  2262. }
  2263. else if (0 == strncmp ("# push message received", stat_str, strlen (
  2264. "# push message received")))
  2265. {
  2266. return STAT_TYPE_RECV_PUSH_SEND;
  2267. }
  2268. else if (0 == strncmp ("# pull request message received", stat_str, strlen (
  2269. "# pull request message received")))
  2270. {
  2271. return STAT_TYPE_RECV_PULL_REQ;
  2272. }
  2273. else if (0 == strncmp ("# pull reply messages received", stat_str, strlen (
  2274. "# pull reply messages received")))
  2275. {
  2276. return STAT_TYPE_RECV_PULL_REP;
  2277. }
  2278. return STAT_TYPE_MAX;
  2279. }
  2280. /**
  2281. * @brief Converts #STAT_TYPE enum to the equivalent string representation that
  2282. * is stored with the statistics service.
  2283. *
  2284. * @param stat_type #STAT_TYPE enum
  2285. *
  2286. * @return string representation that matches statistics value
  2287. */
  2288. char*
  2289. stat_type_2_str (enum STAT_TYPE stat_type)
  2290. {
  2291. switch (stat_type)
  2292. {
  2293. case STAT_TYPE_ROUNDS:
  2294. return "# rounds";
  2295. case STAT_TYPE_BLOCKS:
  2296. return "# rounds blocked";
  2297. case STAT_TYPE_BLOCKS_MANY_PUSH:
  2298. return "# rounds blocked - too many pushes";
  2299. case STAT_TYPE_BLOCKS_NO_PUSH:
  2300. return "# rounds blocked - no pushes";
  2301. case STAT_TYPE_BLOCKS_NO_PULL:
  2302. return "# rounds blocked - no pull replies";
  2303. case STAT_TYPE_BLOCKS_MANY_PUSH_NO_PULL:
  2304. return "# rounds blocked - too many pushes, no pull replies";
  2305. case STAT_TYPE_BLOCKS_NO_PUSH_NO_PULL:
  2306. return "# rounds blocked - no pushes, no pull replies";
  2307. case STAT_TYPE_ISSUED_PUSH_SEND:
  2308. return "# push send issued";
  2309. case STAT_TYPE_ISSUED_PULL_REQ:
  2310. return "# pull request send issued";
  2311. case STAT_TYPE_ISSUED_PULL_REP:
  2312. return "# pull reply send issued";
  2313. case STAT_TYPE_SENT_PUSH_SEND:
  2314. return "# pushes sent";
  2315. case STAT_TYPE_SENT_PULL_REQ:
  2316. return "# pull requests sent";
  2317. case STAT_TYPE_SENT_PULL_REP:
  2318. return "# pull replys sent";
  2319. case STAT_TYPE_RECV_PUSH_SEND:
  2320. return "# push message received";
  2321. case STAT_TYPE_RECV_PULL_REQ:
  2322. return "# pull request message received";
  2323. case STAT_TYPE_RECV_PULL_REP:
  2324. return "# pull reply messages received";
  2325. case STAT_TYPE_MAX:
  2326. default:
  2327. return "ERROR";
  2328. ;
  2329. }
  2330. }
  2331. /**
  2332. * Callback function to process statistic values.
  2333. *
  2334. * @param cls closure
  2335. * @param subsystem name of subsystem that created the statistic
  2336. * @param name the name of the datum
  2337. * @param value the current value
  2338. * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if not
  2339. * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
  2340. */
  2341. int
  2342. stat_iterator (void *cls,
  2343. const char *subsystem,
  2344. const char *name,
  2345. uint64_t value,
  2346. int is_persistent)
  2347. {
  2348. (void) subsystem;
  2349. (void) is_persistent;
  2350. const struct STATcls *stat_cls = (const struct STATcls *) cls;
  2351. struct RPSPeer *rps_peer = (struct RPSPeer *) stat_cls->rps_peer;
  2352. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got stat value: %s - %" PRIu64 "\n",
  2353. // stat_type_2_str (stat_cls->stat_type),
  2354. name,
  2355. value);
  2356. to_file (rps_peer->file_name_stats,
  2357. "%s: %" PRIu64 "\n",
  2358. name,
  2359. value);
  2360. switch (stat_str_2_type (name))
  2361. {
  2362. case STAT_TYPE_ROUNDS:
  2363. rps_peer->num_rounds = value;
  2364. break;
  2365. case STAT_TYPE_BLOCKS:
  2366. rps_peer->num_blocks = value;
  2367. break;
  2368. case STAT_TYPE_BLOCKS_MANY_PUSH:
  2369. rps_peer->num_blocks_many_push = value;
  2370. break;
  2371. case STAT_TYPE_BLOCKS_NO_PUSH:
  2372. rps_peer->num_blocks_no_push = value;
  2373. break;
  2374. case STAT_TYPE_BLOCKS_NO_PULL:
  2375. rps_peer->num_blocks_no_pull = value;
  2376. break;
  2377. case STAT_TYPE_BLOCKS_MANY_PUSH_NO_PULL:
  2378. rps_peer->num_blocks_many_push_no_pull = value;
  2379. break;
  2380. case STAT_TYPE_BLOCKS_NO_PUSH_NO_PULL:
  2381. rps_peer->num_blocks_no_push_no_pull = value;
  2382. break;
  2383. case STAT_TYPE_ISSUED_PUSH_SEND:
  2384. rps_peer->num_issued_push = value;
  2385. break;
  2386. case STAT_TYPE_ISSUED_PULL_REQ:
  2387. rps_peer->num_issued_pull_req = value;
  2388. break;
  2389. case STAT_TYPE_ISSUED_PULL_REP:
  2390. rps_peer->num_issued_pull_rep = value;
  2391. break;
  2392. case STAT_TYPE_SENT_PUSH_SEND:
  2393. rps_peer->num_sent_push = value;
  2394. break;
  2395. case STAT_TYPE_SENT_PULL_REQ:
  2396. rps_peer->num_sent_pull_req = value;
  2397. break;
  2398. case STAT_TYPE_SENT_PULL_REP:
  2399. rps_peer->num_sent_pull_rep = value;
  2400. break;
  2401. case STAT_TYPE_RECV_PUSH_SEND:
  2402. rps_peer->num_recv_push = value;
  2403. break;
  2404. case STAT_TYPE_RECV_PULL_REQ:
  2405. rps_peer->num_recv_pull_req = value;
  2406. break;
  2407. case STAT_TYPE_RECV_PULL_REP:
  2408. rps_peer->num_recv_pull_rep = value;
  2409. break;
  2410. case STAT_TYPE_MAX:
  2411. default:
  2412. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  2413. "Unknown statistics string: %s\n",
  2414. name);
  2415. break;
  2416. }
  2417. return GNUNET_OK;
  2418. }
  2419. void
  2420. post_profiler (struct RPSPeer *rps_peer)
  2421. {
  2422. if (COLLECT_STATISTICS != cur_test_run.have_collect_statistics)
  2423. {
  2424. return;
  2425. }
  2426. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2427. "Going to request statistic values with mask 0x%" PRIx32 "\n",
  2428. cur_test_run.stat_collect_flags);
  2429. struct STATcls *stat_cls;
  2430. uint32_t stat_type;
  2431. for (stat_type = STAT_TYPE_ROUNDS;
  2432. stat_type < STAT_TYPE_MAX;
  2433. stat_type = stat_type << 1)
  2434. {
  2435. if (stat_type & cur_test_run.stat_collect_flags)
  2436. {
  2437. stat_cls = GNUNET_malloc (sizeof(struct STATcls));
  2438. stat_cls->rps_peer = rps_peer;
  2439. stat_cls->stat_type = stat_type;
  2440. rps_peer->file_name_stats =
  2441. store_prefix_file_name (rps_peer->peer_id, "stats");
  2442. GNUNET_STATISTICS_get (rps_peer->stats_h,
  2443. "rps",
  2444. stat_type_2_str (stat_type),
  2445. post_test_shutdown_ready_cb,
  2446. stat_iterator,
  2447. (struct STATcls *) stat_cls);
  2448. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2449. "Requested statistics for %s (peer %" PRIu32 ")\n",
  2450. stat_type_2_str (stat_type),
  2451. rps_peer->index);
  2452. }
  2453. }
  2454. }
  2455. /***********************************************************************
  2456. * /Definition of tests
  2457. ***********************************************************************/
  2458. /**
  2459. * Actual "main" function for the testcase.
  2460. *
  2461. * @param cls closure
  2462. * @param h the run handle
  2463. * @param n_peers number of peers in 'peers'
  2464. * @param peers handle to peers run in the testbed
  2465. * @param links_succeeded the number of overlay link connection attempts that
  2466. * succeeded
  2467. * @param links_failed the number of overlay link connection attempts that
  2468. * failed
  2469. */
  2470. static void
  2471. run (void *cls,
  2472. struct GNUNET_TESTBED_RunHandle *h,
  2473. unsigned int n_peers,
  2474. struct GNUNET_TESTBED_Peer **peers,
  2475. unsigned int links_succeeded,
  2476. unsigned int links_failed)
  2477. {
  2478. (void) cls;
  2479. (void) h;
  2480. (void) links_failed;
  2481. unsigned int i;
  2482. struct OpListEntry *entry;
  2483. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "RUN was called\n");
  2484. /* Check whether we timed out */
  2485. if ((n_peers != num_peers) ||
  2486. (NULL == peers) ||
  2487. (0 == links_succeeded) )
  2488. {
  2489. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2490. "Going down due to args (eg. timeout)\n");
  2491. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "\tn_peers: %u\n", n_peers);
  2492. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "\tnum_peers: %" PRIu32 "\n",
  2493. num_peers);
  2494. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "\tpeers: %p\n", peers);
  2495. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "\tlinks_succeeded: %u\n",
  2496. links_succeeded);
  2497. GNUNET_SCHEDULER_shutdown ();
  2498. return;
  2499. }
  2500. /* Initialize peers */
  2501. testbed_peers = peers;
  2502. num_peers_online = 0;
  2503. for (i = 0; i < num_peers; i++)
  2504. {
  2505. entry = make_oplist_entry ();
  2506. entry->index = i;
  2507. rps_peers[i].index = i;
  2508. if (NULL != cur_test_run.init_peer)
  2509. cur_test_run.init_peer (&rps_peers[i]);
  2510. if (NO_COLLECT_VIEW == cur_test_run.have_collect_view)
  2511. {
  2512. rps_peers->cur_view_count = 0;
  2513. rps_peers->cur_view = NULL;
  2514. }
  2515. entry->op = GNUNET_TESTBED_peer_get_information (peers[i],
  2516. GNUNET_TESTBED_PIT_IDENTITY,
  2517. &info_cb,
  2518. entry);
  2519. }
  2520. /* Bring peers up */
  2521. GNUNET_assert (num_peers == n_peers);
  2522. for (i = 0; i < n_peers; i++)
  2523. {
  2524. rps_peers[i].index = i;
  2525. rps_peers[i].op =
  2526. GNUNET_TESTBED_service_connect (&rps_peers[i],
  2527. peers[i],
  2528. "rps",
  2529. &rps_connect_complete_cb,
  2530. &rps_peers[i],
  2531. &rps_connect_adapter,
  2532. &rps_disconnect_adapter,
  2533. &rps_peers[i]);
  2534. /* Connect all peers to statistics service */
  2535. if (COLLECT_STATISTICS == cur_test_run.have_collect_statistics)
  2536. {
  2537. rps_peers[i].stat_op =
  2538. GNUNET_TESTBED_service_connect (NULL,
  2539. peers[i],
  2540. "statistics",
  2541. stat_complete_cb,
  2542. &rps_peers[i],
  2543. &stat_connect_adapter,
  2544. &stat_disconnect_adapter,
  2545. &rps_peers[i]);
  2546. }
  2547. }
  2548. if (NULL != churn_task)
  2549. GNUNET_SCHEDULER_cancel (churn_task);
  2550. post_test_task = GNUNET_SCHEDULER_add_delayed (timeout, &post_test_op, NULL);
  2551. timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
  2552. (timeout_s * 1.2) + 0.1 * num_peers);
  2553. shutdown_task = GNUNET_SCHEDULER_add_delayed (timeout, &shutdown_op, NULL);
  2554. }
  2555. /**
  2556. * Entry point for the testcase, sets up the testbed.
  2557. *
  2558. * @param argc unused
  2559. * @param argv unused
  2560. * @return 0 on success
  2561. */
  2562. int
  2563. main (int argc, char *argv[])
  2564. {
  2565. int ret_value;
  2566. (void) argc;
  2567. /* Defaults for tests */
  2568. num_peers = 5;
  2569. cur_test_run.name = "test-rps-default";
  2570. cur_test_run.init_peer = default_init_peer;
  2571. cur_test_run.pre_test = NULL;
  2572. cur_test_run.reply_handle = default_reply_handle;
  2573. cur_test_run.eval_cb = default_eval_cb;
  2574. cur_test_run.post_test = NULL;
  2575. cur_test_run.have_churn = HAVE_CHURN;
  2576. cur_test_run.have_collect_statistics = NO_COLLECT_STATISTICS;
  2577. cur_test_run.stat_collect_flags = 0;
  2578. cur_test_run.have_collect_view = NO_COLLECT_VIEW;
  2579. churn_task = NULL;
  2580. timeout_s = 30;
  2581. if (strstr (argv[0], "malicious") != NULL)
  2582. {
  2583. cur_test_run.pre_test = mal_pre;
  2584. cur_test_run.main_test = mal_cb;
  2585. cur_test_run.init_peer = mal_init_peer;
  2586. timeout_s = 40;
  2587. if (strstr (argv[0], "_1") != NULL)
  2588. {
  2589. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test malicious peer type 1\n");
  2590. cur_test_run.name = "test-rps-malicious_1";
  2591. mal_type = 1;
  2592. }
  2593. else if (strstr (argv[0], "_2") != NULL)
  2594. {
  2595. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test malicious peer type 2\n");
  2596. cur_test_run.name = "test-rps-malicious_2";
  2597. mal_type = 2;
  2598. }
  2599. else if (strstr (argv[0], "_3") != NULL)
  2600. {
  2601. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test malicious peer type 3\n");
  2602. cur_test_run.name = "test-rps-malicious_3";
  2603. mal_type = 3;
  2604. }
  2605. }
  2606. else if (strstr (argv[0], "_single_req") != NULL)
  2607. {
  2608. GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Test single request\n");
  2609. cur_test_run.name = "test-rps-single-req";
  2610. cur_test_run.main_test = single_req_cb;
  2611. cur_test_run.have_churn = HAVE_NO_CHURN;
  2612. }
  2613. else if (strstr (argv[0], "_delayed_reqs") != NULL)
  2614. {
  2615. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test delayed requests\n");
  2616. cur_test_run.name = "test-rps-delayed-reqs";
  2617. cur_test_run.main_test = delay_req_cb;
  2618. cur_test_run.have_churn = HAVE_NO_CHURN;
  2619. }
  2620. else if (strstr (argv[0], "_seed_big") != NULL)
  2621. {
  2622. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2623. "Test seeding (num_peers > GNUNET_MAX_MESSAGE_SIZE)\n");
  2624. num_peers = 1;
  2625. cur_test_run.name = "test-rps-seed-big";
  2626. cur_test_run.main_test = seed_big_cb;
  2627. cur_test_run.eval_cb = no_eval;
  2628. cur_test_run.have_churn = HAVE_NO_CHURN;
  2629. timeout_s = 10;
  2630. }
  2631. else if (strstr (argv[0], "_single_peer_seed") != NULL)
  2632. {
  2633. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2634. "Test seeding and requesting on a single peer\n");
  2635. cur_test_run.name = "test-rps-single-peer-seed";
  2636. cur_test_run.main_test = single_peer_seed_cb;
  2637. cur_test_run.have_churn = HAVE_NO_CHURN;
  2638. }
  2639. else if (strstr (argv[0], "_seed_request") != NULL)
  2640. {
  2641. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  2642. "Test seeding and requesting on multiple peers\n");
  2643. cur_test_run.name = "test-rps-seed-request";
  2644. cur_test_run.main_test = seed_req_cb;
  2645. cur_test_run.have_churn = HAVE_NO_CHURN;
  2646. }
  2647. else if (strstr (argv[0], "_seed") != NULL)
  2648. {
  2649. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test seeding\n");
  2650. cur_test_run.name = "test-rps-seed";
  2651. cur_test_run.main_test = seed_cb;
  2652. cur_test_run.eval_cb = no_eval;
  2653. cur_test_run.have_churn = HAVE_NO_CHURN;
  2654. }
  2655. else if (strstr (argv[0], "_req_cancel") != NULL)
  2656. {
  2657. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test cancelling a request\n");
  2658. cur_test_run.name = "test-rps-req-cancel";
  2659. num_peers = 1;
  2660. cur_test_run.main_test = req_cancel_cb;
  2661. cur_test_run.eval_cb = no_eval;
  2662. cur_test_run.have_churn = HAVE_NO_CHURN;
  2663. timeout_s = 10;
  2664. }
  2665. else if (strstr (argv[0], "_churn") != NULL)
  2666. {
  2667. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test churn\n");
  2668. cur_test_run.name = "test-rps-churn";
  2669. num_peers = 5;
  2670. cur_test_run.init_peer = default_init_peer;
  2671. cur_test_run.main_test = churn_test_cb;
  2672. cur_test_run.reply_handle = default_reply_handle;
  2673. cur_test_run.eval_cb = default_eval_cb;
  2674. cur_test_run.have_churn = HAVE_NO_CHURN;
  2675. cur_test_run.have_quick_quit = HAVE_NO_QUICK_QUIT;
  2676. timeout_s = 40;
  2677. }
  2678. else if (strstr (argv[0], "_sub") != NULL)
  2679. {
  2680. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test subs\n");
  2681. cur_test_run.name = "test-rps-sub";
  2682. num_peers = 5;
  2683. // cur_test_run.init_peer = &default_init_peer;
  2684. cur_test_run.pre_test = &sub_pre;
  2685. cur_test_run.main_test = &single_req_cb;
  2686. // cur_test_run.reply_handle = default_reply_handle;
  2687. cur_test_run.post_test = &sub_post;
  2688. // cur_test_run.eval_cb = default_eval_cb;
  2689. cur_test_run.have_churn = HAVE_NO_CHURN;
  2690. cur_test_run.have_quick_quit = HAVE_QUICK_QUIT;
  2691. }
  2692. else if (strstr (argv[0], "profiler") != NULL)
  2693. {
  2694. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "This is the profiler\n");
  2695. cur_test_run.name = "test-rps-profiler";
  2696. num_peers = 16;
  2697. mal_type = 3;
  2698. cur_test_run.init_peer = profiler_init_peer;
  2699. // cur_test_run.pre_test = mal_pre;
  2700. cur_test_run.pre_test = pre_profiler;
  2701. cur_test_run.main_test = profiler_cb;
  2702. cur_test_run.reply_handle = profiler_reply_handle;
  2703. cur_test_run.eval_cb = profiler_eval;
  2704. cur_test_run.post_test = post_profiler;
  2705. cur_test_run.request_interval = 2;
  2706. cur_test_run.num_requests = 5;
  2707. // cur_test_run.have_churn = HAVE_CHURN;
  2708. cur_test_run.have_churn = HAVE_NO_CHURN;
  2709. cur_test_run.have_quick_quit = HAVE_NO_QUICK_QUIT;
  2710. cur_test_run.have_collect_statistics = COLLECT_STATISTICS;
  2711. cur_test_run.stat_collect_flags = STAT_TYPE_ROUNDS
  2712. | STAT_TYPE_BLOCKS
  2713. | STAT_TYPE_BLOCKS_MANY_PUSH
  2714. | STAT_TYPE_BLOCKS_NO_PUSH
  2715. | STAT_TYPE_BLOCKS_NO_PULL
  2716. | STAT_TYPE_BLOCKS_MANY_PUSH_NO_PULL
  2717. | STAT_TYPE_BLOCKS_NO_PUSH_NO_PULL
  2718. | STAT_TYPE_ISSUED_PUSH_SEND
  2719. | STAT_TYPE_ISSUED_PULL_REQ
  2720. | STAT_TYPE_ISSUED_PULL_REP
  2721. | STAT_TYPE_SENT_PUSH_SEND
  2722. | STAT_TYPE_SENT_PULL_REQ
  2723. | STAT_TYPE_SENT_PULL_REP
  2724. | STAT_TYPE_RECV_PUSH_SEND
  2725. | STAT_TYPE_RECV_PULL_REQ
  2726. | STAT_TYPE_RECV_PULL_REP;
  2727. cur_test_run.have_collect_view = COLLECT_VIEW;
  2728. timeout_s = 150;
  2729. /* 'Clean' directory */
  2730. (void) GNUNET_DISK_directory_remove ("/tmp/rps/");
  2731. GNUNET_DISK_directory_create ("/tmp/rps/");
  2732. }
  2733. timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, timeout_s);
  2734. rps_peers = GNUNET_new_array (num_peers, struct RPSPeer);
  2735. peer_map = GNUNET_CONTAINER_multipeermap_create (num_peers, GNUNET_NO);
  2736. rps_peer_ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
  2737. if ((2 == mal_type) ||
  2738. (3 == mal_type))
  2739. target_peer = &rps_peer_ids[num_peers - 2];
  2740. if (profiler_eval == cur_test_run.eval_cb)
  2741. eval_peer = &rps_peers[num_peers - 1]; /* FIXME: eval_peer could be a
  2742. malicious peer if not careful
  2743. with the malicious portion */
  2744. ok = 1;
  2745. ret_value = GNUNET_TESTBED_test_run (cur_test_run.name,
  2746. "test_rps.conf",
  2747. num_peers,
  2748. 0, NULL, NULL,
  2749. &run, NULL);
  2750. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  2751. "_test_run returned.\n");
  2752. if (GNUNET_OK != ret_value)
  2753. {
  2754. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  2755. "Test did not run successfully!\n");
  2756. }
  2757. ret_value = cur_test_run.eval_cb ();
  2758. if (NO_COLLECT_VIEW == cur_test_run.have_collect_view)
  2759. {
  2760. GNUNET_array_grow (rps_peers->cur_view,
  2761. rps_peers->cur_view_count,
  2762. 0);
  2763. }
  2764. GNUNET_free (rps_peers);
  2765. GNUNET_free (rps_peer_ids);
  2766. GNUNET_CONTAINER_multipeermap_destroy (peer_map);
  2767. return ret_value;
  2768. }
  2769. /* end of test_rps.c */