gnunet-service-dht_clients.c 47 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518
  1. /*
  2. This file is part of GNUnet.
  3. (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
  4. GNUnet is free software; you can redistribute it and/or modify
  5. it under the terms of the GNU General Public License as published
  6. by the Free Software Foundation; either version 3, or (at your
  7. option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. General Public License for more details.
  12. You should have received a copy of the GNU General Public License
  13. along with GNUnet; see the file COPYING. If not, write to the
  14. Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  15. Boston, MA 02111-1307, USA.
  16. */
  17. /**
  18. * @file dht/gnunet-service-dht_clients.c
  19. * @brief GNUnet DHT service's client management code
  20. * @author Christian Grothoff
  21. * @author Nathan Evans
  22. */
  23. #include "platform.h"
  24. #include "gnunet_constants.h"
  25. #include "gnunet_protocols.h"
  26. #include "gnunet_statistics_service.h"
  27. #include "gnunet-service-dht.h"
  28. #include "gnunet-service-dht_clients.h"
  29. #include "gnunet-service-dht_datacache.h"
  30. #include "gnunet-service-dht_neighbours.h"
  31. #include "dht.h"
  32. /**
  33. * Should routing details be logged to stderr (for debugging)?
  34. */
  35. #define LOG_TRAFFIC(kind,...) GNUNET_log_from (kind, "dht-traffic",__VA_ARGS__)
  36. #define LOG(kind,...) GNUNET_log_from (kind, "dht-clients",__VA_ARGS__)
  37. /**
  38. * Linked list of messages to send to clients.
  39. */
  40. struct PendingMessage
  41. {
  42. /**
  43. * Pointer to next item in the list
  44. */
  45. struct PendingMessage *next;
  46. /**
  47. * Pointer to previous item in the list
  48. */
  49. struct PendingMessage *prev;
  50. /**
  51. * Actual message to be sent, allocated at the end of the struct:
  52. * // msg = (cast) &pm[1];
  53. * // memcpy (&pm[1], data, len);
  54. */
  55. const struct GNUNET_MessageHeader *msg;
  56. };
  57. /**
  58. * Struct containing information about a client,
  59. * handle to connect to it, and any pending messages
  60. * that need to be sent to it.
  61. */
  62. struct ClientList
  63. {
  64. /**
  65. * Linked list of active clients
  66. */
  67. struct ClientList *next;
  68. /**
  69. * Linked list of active clients
  70. */
  71. struct ClientList *prev;
  72. /**
  73. * The handle to this client
  74. */
  75. struct GNUNET_SERVER_Client *client_handle;
  76. /**
  77. * Handle to the current transmission request, NULL
  78. * if none pending.
  79. */
  80. struct GNUNET_SERVER_TransmitHandle *transmit_handle;
  81. /**
  82. * Linked list of pending messages for this client
  83. */
  84. struct PendingMessage *pending_head;
  85. /**
  86. * Tail of linked list of pending messages for this client
  87. */
  88. struct PendingMessage *pending_tail;
  89. };
  90. /**
  91. * Entry in the local forwarding map for a client's GET request.
  92. */
  93. struct ClientQueryRecord
  94. {
  95. /**
  96. * The key this request was about
  97. */
  98. struct GNUNET_HashCode key;
  99. /**
  100. * Client responsible for the request.
  101. */
  102. struct ClientList *client;
  103. /**
  104. * Extended query (see gnunet_block_lib.h), allocated at the end of this struct.
  105. */
  106. const void *xquery;
  107. /**
  108. * Replies we have already seen for this request.
  109. */
  110. struct GNUNET_HashCode *seen_replies;
  111. /**
  112. * Pointer to this nodes heap location in the retry-heap (for fast removal)
  113. */
  114. struct GNUNET_CONTAINER_HeapNode *hnode;
  115. /**
  116. * What's the delay between re-try operations that we currently use for this
  117. * request?
  118. */
  119. struct GNUNET_TIME_Relative retry_frequency;
  120. /**
  121. * What's the next time we should re-try this request?
  122. */
  123. struct GNUNET_TIME_Absolute retry_time;
  124. /**
  125. * The unique identifier of this request
  126. */
  127. uint64_t unique_id;
  128. /**
  129. * Number of bytes in xquery.
  130. */
  131. size_t xquery_size;
  132. /**
  133. * Number of entries in 'seen_replies'.
  134. */
  135. unsigned int seen_replies_count;
  136. /**
  137. * Desired replication level
  138. */
  139. uint32_t replication;
  140. /**
  141. * Any message options for this request
  142. */
  143. uint32_t msg_options;
  144. /**
  145. * The type for the data for the GET request.
  146. */
  147. enum GNUNET_BLOCK_Type type;
  148. };
  149. /**
  150. * Struct containing paremeters of monitoring requests.
  151. */
  152. struct ClientMonitorRecord
  153. {
  154. /**
  155. * Next element in DLL.
  156. */
  157. struct ClientMonitorRecord *next;
  158. /**
  159. * Previous element in DLL.
  160. */
  161. struct ClientMonitorRecord *prev;
  162. /**
  163. * Type of blocks that are of interest
  164. */
  165. enum GNUNET_BLOCK_Type type;
  166. /**
  167. * Key of data of interest, NULL for all.
  168. */
  169. struct GNUNET_HashCode *key;
  170. /**
  171. * Flag whether to notify about GET messages.
  172. */
  173. int16_t get;
  174. /**
  175. * Flag whether to notify about GET_REPONSE messages.
  176. */
  177. int16_t get_resp;
  178. /**
  179. * Flag whether to notify about PUT messages.
  180. */
  181. uint16_t put;
  182. /**
  183. * Client to notify of these requests.
  184. */
  185. struct ClientList *client;
  186. };
  187. /**
  188. * List of active clients.
  189. */
  190. static struct ClientList *client_head;
  191. /**
  192. * List of active clients.
  193. */
  194. static struct ClientList *client_tail;
  195. /**
  196. * List of active monitoring requests.
  197. */
  198. static struct ClientMonitorRecord *monitor_head;
  199. /**
  200. * List of active monitoring requests.
  201. */
  202. static struct ClientMonitorRecord *monitor_tail;
  203. /**
  204. * Hashmap for fast key based lookup, maps keys to `struct ClientQueryRecord` entries.
  205. */
  206. static struct GNUNET_CONTAINER_MultiHashMap *forward_map;
  207. /**
  208. * Heap with all of our client's request, sorted by retry time (earliest on top).
  209. */
  210. static struct GNUNET_CONTAINER_Heap *retry_heap;
  211. /**
  212. * Task that re-transmits requests (using retry_heap).
  213. */
  214. static GNUNET_SCHEDULER_TaskIdentifier retry_task;
  215. /**
  216. * Task run to check for messages that need to be sent to a client.
  217. *
  218. * @param client a ClientList, containing the client and any messages to be sent to it
  219. */
  220. static void
  221. process_pending_messages (struct ClientList *client);
  222. /**
  223. * Add a PendingMessage to the clients list of messages to be sent
  224. *
  225. * @param client the active client to send the message to
  226. * @param pending_message the actual message to send
  227. */
  228. static void
  229. add_pending_message (struct ClientList *client,
  230. struct PendingMessage *pending_message)
  231. {
  232. GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_tail,
  233. pending_message);
  234. process_pending_messages (client);
  235. }
  236. /**
  237. * Find a client if it exists, add it otherwise.
  238. *
  239. * @param client the server handle to the client
  240. *
  241. * @return the client if found, a new client otherwise
  242. */
  243. static struct ClientList *
  244. find_active_client (struct GNUNET_SERVER_Client *client)
  245. {
  246. struct ClientList *pos = client_head;
  247. struct ClientList *ret;
  248. while (pos != NULL)
  249. {
  250. if (pos->client_handle == client)
  251. return pos;
  252. pos = pos->next;
  253. }
  254. ret = GNUNET_new (struct ClientList);
  255. ret->client_handle = client;
  256. GNUNET_CONTAINER_DLL_insert (client_head, client_tail, ret);
  257. return ret;
  258. }
  259. /**
  260. * Iterator over hash map entries that frees all entries
  261. * associated with the given client.
  262. *
  263. * @param cls client to search for in source routes
  264. * @param key current key code (ignored)
  265. * @param value value in the hash map, a ClientQueryRecord
  266. * @return #GNUNET_YES (we should continue to iterate)
  267. */
  268. static int
  269. remove_client_records (void *cls, const struct GNUNET_HashCode * key, void *value)
  270. {
  271. struct ClientList *client = cls;
  272. struct ClientQueryRecord *record = value;
  273. if (record->client != client)
  274. return GNUNET_YES;
  275. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  276. "Removing client %p's record for key %s\n", client,
  277. GNUNET_h2s (key));
  278. GNUNET_assert (GNUNET_YES ==
  279. GNUNET_CONTAINER_multihashmap_remove (forward_map, key,
  280. record));
  281. if (NULL != record->hnode)
  282. GNUNET_CONTAINER_heap_remove_node (record->hnode);
  283. GNUNET_array_grow (record->seen_replies, record->seen_replies_count, 0);
  284. GNUNET_free (record);
  285. return GNUNET_YES;
  286. }
  287. /**
  288. * Functions with this signature are called whenever a client
  289. * is disconnected on the network level.
  290. *
  291. * @param cls closure (NULL for dht)
  292. * @param client identification of the client; NULL
  293. * for the last call when the server is destroyed
  294. */
  295. static void
  296. handle_client_disconnect (void *cls,
  297. struct GNUNET_SERVER_Client *client)
  298. {
  299. struct ClientList *pos;
  300. struct PendingMessage *reply;
  301. struct ClientMonitorRecord *monitor;
  302. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  303. "Local client %p disconnects\n",
  304. client);
  305. pos = find_active_client (client);
  306. GNUNET_CONTAINER_DLL_remove (client_head, client_tail, pos);
  307. if (pos->transmit_handle != NULL)
  308. GNUNET_SERVER_notify_transmit_ready_cancel (pos->transmit_handle);
  309. while (NULL != (reply = pos->pending_head))
  310. {
  311. GNUNET_CONTAINER_DLL_remove (pos->pending_head, pos->pending_tail, reply);
  312. GNUNET_free (reply);
  313. }
  314. monitor = monitor_head;
  315. while (NULL != monitor)
  316. {
  317. if (monitor->client == pos)
  318. {
  319. struct ClientMonitorRecord *next;
  320. GNUNET_free_non_null (monitor->key);
  321. next = monitor->next;
  322. GNUNET_CONTAINER_DLL_remove (monitor_head, monitor_tail, monitor);
  323. GNUNET_free (monitor);
  324. monitor = next;
  325. }
  326. else
  327. monitor = monitor->next;
  328. }
  329. GNUNET_CONTAINER_multihashmap_iterate (forward_map, &remove_client_records,
  330. pos);
  331. GNUNET_free (pos);
  332. }
  333. /**
  334. * Route the given request via the DHT. This includes updating
  335. * the bloom filter and retransmission times, building the P2P
  336. * message and initiating the routing operation.
  337. */
  338. static void
  339. transmit_request (struct ClientQueryRecord *cqr)
  340. {
  341. int32_t reply_bf_mutator;
  342. struct GNUNET_CONTAINER_BloomFilter *reply_bf;
  343. struct GNUNET_CONTAINER_BloomFilter *peer_bf;
  344. GNUNET_STATISTICS_update (GDS_stats,
  345. gettext_noop
  346. ("# GET requests from clients injected"), 1,
  347. GNUNET_NO);
  348. reply_bf_mutator =
  349. (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
  350. UINT32_MAX);
  351. reply_bf =
  352. GNUNET_BLOCK_construct_bloomfilter (reply_bf_mutator, cqr->seen_replies,
  353. cqr->seen_replies_count);
  354. peer_bf =
  355. GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
  356. GNUNET_CONSTANTS_BLOOMFILTER_K);
  357. LOG (GNUNET_ERROR_TYPE_DEBUG,
  358. "Initiating GET for %s, replication %u, already have %u replies\n",
  359. GNUNET_h2s (&cqr->key),
  360. cqr->replication,
  361. cqr->seen_replies_count);
  362. GDS_NEIGHBOURS_handle_get (cqr->type, cqr->msg_options, cqr->replication,
  363. 0 /* hop count */ ,
  364. &cqr->key, cqr->xquery, cqr->xquery_size, reply_bf,
  365. reply_bf_mutator, peer_bf);
  366. GNUNET_CONTAINER_bloomfilter_free (reply_bf);
  367. GNUNET_CONTAINER_bloomfilter_free (peer_bf);
  368. /* exponential back-off for retries.
  369. * max GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD (15 min) */
  370. cqr->retry_frequency = GNUNET_TIME_STD_BACKOFF (cqr->retry_frequency);
  371. cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency);
  372. }
  373. /**
  374. * Task that looks at the 'retry_heap' and transmits all of the requests
  375. * on the heap that are ready for transmission. Then re-schedules
  376. * itself (unless the heap is empty).
  377. *
  378. * @param cls unused
  379. * @param tc scheduler context
  380. */
  381. static void
  382. transmit_next_request_task (void *cls,
  383. const struct GNUNET_SCHEDULER_TaskContext *tc)
  384. {
  385. struct ClientQueryRecord *cqr;
  386. struct GNUNET_TIME_Relative delay;
  387. retry_task = GNUNET_SCHEDULER_NO_TASK;
  388. if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
  389. return;
  390. while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap)))
  391. {
  392. cqr->hnode = NULL;
  393. delay = GNUNET_TIME_absolute_get_remaining (cqr->retry_time);
  394. if (delay.rel_value_us > 0)
  395. {
  396. cqr->hnode =
  397. GNUNET_CONTAINER_heap_insert (retry_heap, cqr,
  398. cqr->retry_time.abs_value_us);
  399. retry_task =
  400. GNUNET_SCHEDULER_add_delayed (delay, &transmit_next_request_task,
  401. NULL);
  402. return;
  403. }
  404. transmit_request (cqr);
  405. cqr->hnode =
  406. GNUNET_CONTAINER_heap_insert (retry_heap, cqr,
  407. cqr->retry_time.abs_value_us);
  408. }
  409. }
  410. /**
  411. * Handler for PUT messages.
  412. *
  413. * @param cls closure for the service
  414. * @param client the client we received this message from
  415. * @param message the actual message received
  416. */
  417. static void
  418. handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
  419. const struct GNUNET_MessageHeader *message)
  420. {
  421. const struct GNUNET_DHT_ClientPutMessage *dht_msg;
  422. struct GNUNET_CONTAINER_BloomFilter *peer_bf;
  423. uint16_t size;
  424. struct PendingMessage *pm;
  425. struct GNUNET_DHT_ClientPutConfirmationMessage *conf;
  426. size = ntohs (message->size);
  427. if (size < sizeof (struct GNUNET_DHT_ClientPutMessage))
  428. {
  429. GNUNET_break (0);
  430. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  431. return;
  432. }
  433. GNUNET_STATISTICS_update (GDS_stats,
  434. gettext_noop
  435. ("# PUT requests received from clients"), 1,
  436. GNUNET_NO);
  437. dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message;
  438. LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, "R5N CLIENT-PUT %s\n",
  439. GNUNET_h2s_full (&dht_msg->key));
  440. /* give to local clients */
  441. LOG (GNUNET_ERROR_TYPE_DEBUG,
  442. "Handling local PUT of %u-bytes for query %s\n",
  443. size - sizeof (struct GNUNET_DHT_ClientPutMessage),
  444. GNUNET_h2s (&dht_msg->key));
  445. GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
  446. &dht_msg->key, 0, NULL, 0, NULL,
  447. ntohl (dht_msg->type),
  448. size - sizeof (struct GNUNET_DHT_ClientPutMessage),
  449. &dht_msg[1]);
  450. /* store locally */
  451. GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
  452. &dht_msg->key, 0, NULL, ntohl (dht_msg->type),
  453. size - sizeof (struct GNUNET_DHT_ClientPutMessage),
  454. &dht_msg[1]);
  455. /* route to other peers */
  456. peer_bf =
  457. GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
  458. GNUNET_CONSTANTS_BLOOMFILTER_K);
  459. GDS_NEIGHBOURS_handle_put (ntohl (dht_msg->type), ntohl (dht_msg->options),
  460. ntohl (dht_msg->desired_replication_level),
  461. GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
  462. 0 /* hop count */ ,
  463. peer_bf, &dht_msg->key, 0, NULL, &dht_msg[1],
  464. size -
  465. sizeof (struct GNUNET_DHT_ClientPutMessage));
  466. GDS_CLIENTS_process_put (ntohl (dht_msg->options),
  467. ntohl (dht_msg->type),
  468. 0,
  469. ntohl (dht_msg->desired_replication_level),
  470. 1,
  471. GDS_NEIGHBOURS_get_id(),
  472. GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
  473. &dht_msg->key,
  474. &dht_msg[1],
  475. size - sizeof (struct GNUNET_DHT_ClientPutMessage));
  476. GNUNET_CONTAINER_bloomfilter_free (peer_bf);
  477. pm = GNUNET_malloc (sizeof (struct PendingMessage) +
  478. sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage));
  479. conf = (struct GNUNET_DHT_ClientPutConfirmationMessage *) &pm[1];
  480. conf->header.size = htons (sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage));
  481. conf->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK);
  482. conf->reserved = htonl (0);
  483. conf->unique_id = dht_msg->unique_id;
  484. pm->msg = &conf->header;
  485. add_pending_message (find_active_client (client), pm);
  486. GNUNET_SERVER_receive_done (client, GNUNET_OK);
  487. }
  488. /**
  489. * Handler for DHT GET messages from the client.
  490. *
  491. * @param cls closure for the service
  492. * @param client the client we received this message from
  493. * @param message the actual message received
  494. */
  495. static void
  496. handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
  497. const struct GNUNET_MessageHeader *message)
  498. {
  499. const struct GNUNET_DHT_ClientGetMessage *get;
  500. struct ClientQueryRecord *cqr;
  501. size_t xquery_size;
  502. const char *xquery;
  503. uint16_t size;
  504. size = ntohs (message->size);
  505. if (size < sizeof (struct GNUNET_DHT_ClientGetMessage))
  506. {
  507. GNUNET_break (0);
  508. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  509. return;
  510. }
  511. xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage);
  512. get = (const struct GNUNET_DHT_ClientGetMessage *) message;
  513. xquery = (const char *) &get[1];
  514. GNUNET_STATISTICS_update (GDS_stats,
  515. gettext_noop
  516. ("# GET requests received from clients"), 1,
  517. GNUNET_NO);
  518. LOG (GNUNET_ERROR_TYPE_DEBUG,
  519. "Received GET request for %s from local client %p, xq: %.*s\n",
  520. GNUNET_h2s (&get->key), client, xquery_size, xquery);
  521. LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, "R5N CLIENT-GET %s\n",
  522. GNUNET_h2s_full (&get->key));
  523. cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size);
  524. cqr->key = get->key;
  525. cqr->client = find_active_client (client);
  526. cqr->xquery = (void *) &cqr[1];
  527. memcpy (&cqr[1], xquery, xquery_size);
  528. cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0);
  529. cqr->retry_frequency = GNUNET_TIME_UNIT_SECONDS;
  530. cqr->retry_time = GNUNET_TIME_absolute_get ();
  531. cqr->unique_id = get->unique_id;
  532. cqr->xquery_size = xquery_size;
  533. cqr->replication = ntohl (get->desired_replication_level);
  534. cqr->msg_options = ntohl (get->options);
  535. cqr->type = ntohl (get->type);
  536. // FIXME use cqr->key, set multihashmap create to GNUNET_YES
  537. GNUNET_CONTAINER_multihashmap_put (forward_map, &get->key, cqr,
  538. GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
  539. GDS_CLIENTS_process_get (ntohl (get->options),
  540. ntohl (get->type),
  541. 0,
  542. ntohl (get->desired_replication_level),
  543. 1,
  544. GDS_NEIGHBOURS_get_id(),
  545. &get->key);
  546. /* start remote requests */
  547. if (GNUNET_SCHEDULER_NO_TASK != retry_task)
  548. GNUNET_SCHEDULER_cancel (retry_task);
  549. retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task, NULL);
  550. /* perform local lookup */
  551. GDS_DATACACHE_handle_get (&get->key, cqr->type, cqr->xquery, xquery_size,
  552. NULL, 0);
  553. GNUNET_SERVER_receive_done (client, GNUNET_OK);
  554. }
  555. /**
  556. * Closure for 'find_by_unique_id'.
  557. */
  558. struct FindByUniqueIdContext
  559. {
  560. /**
  561. * Where to store the result, if found.
  562. */
  563. struct ClientQueryRecord *cqr;
  564. uint64_t unique_id;
  565. };
  566. /**
  567. * Function called for each existing DHT record for the given
  568. * query. Checks if it matches the UID given in the closure
  569. * and if so returns the entry as a result.
  570. *
  571. * @param cls the search context
  572. * @param key query for the lookup (not used)
  573. * @param value the 'struct ClientQueryRecord'
  574. * @return GNUNET_YES to continue iteration (result not yet found)
  575. */
  576. static int
  577. find_by_unique_id (void *cls,
  578. const struct GNUNET_HashCode *key,
  579. void *value)
  580. {
  581. struct FindByUniqueIdContext *fui_ctx = cls;
  582. struct ClientQueryRecord *cqr = value;
  583. if (cqr->unique_id != fui_ctx->unique_id)
  584. return GNUNET_YES;
  585. fui_ctx->cqr = cqr;
  586. return GNUNET_NO;
  587. }
  588. /**
  589. * Handler for "GET result seen" messages from the client.
  590. *
  591. * @param cls closure for the service
  592. * @param client the client we received this message from
  593. * @param message the actual message received
  594. */
  595. static void
  596. handle_dht_local_get_result_seen (void *cls, struct GNUNET_SERVER_Client *client,
  597. const struct GNUNET_MessageHeader *message)
  598. {
  599. const struct GNUNET_DHT_ClientGetResultSeenMessage *seen;
  600. uint16_t size;
  601. unsigned int hash_count;
  602. unsigned int old_count;
  603. const struct GNUNET_HashCode *hc;
  604. struct FindByUniqueIdContext fui_ctx;
  605. struct ClientQueryRecord *cqr;
  606. size = ntohs (message->size);
  607. if (size < sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage))
  608. {
  609. GNUNET_break (0);
  610. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  611. return;
  612. }
  613. seen = (const struct GNUNET_DHT_ClientGetResultSeenMessage *) message;
  614. hash_count = (size - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode);
  615. if (size != sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + hash_count * sizeof (struct GNUNET_HashCode))
  616. {
  617. GNUNET_break (0);
  618. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  619. return;
  620. }
  621. hc = (const struct GNUNET_HashCode*) &seen[1];
  622. fui_ctx.unique_id = seen->unique_id;
  623. fui_ctx.cqr = NULL;
  624. GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
  625. &seen->key,
  626. &find_by_unique_id,
  627. &fui_ctx);
  628. if (NULL == (cqr = fui_ctx.cqr))
  629. {
  630. GNUNET_break (0);
  631. GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
  632. return;
  633. }
  634. /* finally, update 'seen' list */
  635. old_count = cqr->seen_replies_count;
  636. GNUNET_array_grow (cqr->seen_replies,
  637. cqr->seen_replies_count,
  638. cqr->seen_replies_count + hash_count);
  639. memcpy (&cqr->seen_replies[old_count],
  640. hc,
  641. sizeof (struct GNUNET_HashCode) * hash_count);
  642. }
  643. /**
  644. * Closure for 'remove_by_unique_id'.
  645. */
  646. struct RemoveByUniqueIdContext
  647. {
  648. /**
  649. * Client that issued the removal request.
  650. */
  651. struct ClientList *client;
  652. /**
  653. * Unique ID of the request.
  654. */
  655. uint64_t unique_id;
  656. };
  657. /**
  658. * Iterator over hash map entries that frees all entries
  659. * that match the given client and unique ID.
  660. *
  661. * @param cls unique ID and client to search for in source routes
  662. * @param key current key code
  663. * @param value value in the hash map, a ClientQueryRecord
  664. * @return GNUNET_YES (we should continue to iterate)
  665. */
  666. static int
  667. remove_by_unique_id (void *cls, const struct GNUNET_HashCode * key, void *value)
  668. {
  669. const struct RemoveByUniqueIdContext *ctx = cls;
  670. struct ClientQueryRecord *record = value;
  671. if (record->unique_id != ctx->unique_id)
  672. return GNUNET_YES;
  673. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  674. "Removing client %p's record for key %s (by unique id)\n",
  675. ctx->client->client_handle, GNUNET_h2s (key));
  676. return remove_client_records (ctx->client, key, record);
  677. }
  678. /**
  679. * Handler for any generic DHT stop messages, calls the appropriate handler
  680. * depending on message type (if processed locally)
  681. *
  682. * @param cls closure for the service
  683. * @param client the client we received this message from
  684. * @param message the actual message received
  685. *
  686. */
  687. static void
  688. handle_dht_local_get_stop (void *cls, struct GNUNET_SERVER_Client *client,
  689. const struct GNUNET_MessageHeader *message)
  690. {
  691. const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg =
  692. (const struct GNUNET_DHT_ClientGetStopMessage *) message;
  693. struct RemoveByUniqueIdContext ctx;
  694. GNUNET_STATISTICS_update (GDS_stats,
  695. gettext_noop
  696. ("# GET STOP requests received from clients"), 1,
  697. GNUNET_NO);
  698. LOG (GNUNET_ERROR_TYPE_DEBUG,
  699. "Received GET STOP request for %s from local client %p\n",
  700. client, GNUNET_h2s (&dht_stop_msg->key));
  701. ctx.client = find_active_client (client);
  702. ctx.unique_id = dht_stop_msg->unique_id;
  703. GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, &dht_stop_msg->key,
  704. &remove_by_unique_id, &ctx);
  705. GNUNET_SERVER_receive_done (client, GNUNET_OK);
  706. }
  707. /**
  708. * Handler for monitor start messages
  709. *
  710. * @param cls closure for the service
  711. * @param client the client we received this message from
  712. * @param message the actual message received
  713. *
  714. */
  715. static void
  716. handle_dht_local_monitor (void *cls, struct GNUNET_SERVER_Client *client,
  717. const struct GNUNET_MessageHeader *message)
  718. {
  719. struct ClientMonitorRecord *r;
  720. const struct GNUNET_DHT_MonitorStartStopMessage *msg;
  721. msg = (struct GNUNET_DHT_MonitorStartStopMessage *) message;
  722. r = GNUNET_new (struct ClientMonitorRecord);
  723. r->client = find_active_client(client);
  724. r->type = ntohl(msg->type);
  725. r->get = ntohs(msg->get);
  726. r->get_resp = ntohs(msg->get_resp);
  727. r->put = ntohs(msg->put);
  728. if (0 == ntohs(msg->filter_key))
  729. r->key = NULL;
  730. else
  731. {
  732. r->key = GNUNET_new (struct GNUNET_HashCode);
  733. memcpy (r->key, &msg->key, sizeof (struct GNUNET_HashCode));
  734. }
  735. GNUNET_CONTAINER_DLL_insert (monitor_head, monitor_tail, r);
  736. GNUNET_SERVER_receive_done (client, GNUNET_OK);
  737. }
  738. /**
  739. * Handler for monitor stop messages
  740. *
  741. * @param cls closure for the service
  742. * @param client the client we received this message from
  743. * @param message the actual message received
  744. *
  745. */
  746. static void
  747. handle_dht_local_monitor_stop (void *cls, struct GNUNET_SERVER_Client *client,
  748. const struct GNUNET_MessageHeader *message)
  749. {
  750. struct ClientMonitorRecord *r;
  751. const struct GNUNET_DHT_MonitorStartStopMessage *msg;
  752. int keys_match;
  753. msg = (struct GNUNET_DHT_MonitorStartStopMessage *) message;
  754. r = monitor_head;
  755. while (NULL != r)
  756. {
  757. if (NULL == r->key)
  758. keys_match = (0 == ntohs(msg->filter_key));
  759. else
  760. {
  761. keys_match = (0 != ntohs(msg->filter_key)
  762. && !memcmp(r->key, &msg->key, sizeof(struct GNUNET_HashCode)));
  763. }
  764. if (find_active_client(client) == r->client
  765. && ntohl(msg->type) == r->type
  766. && r->get == msg->get
  767. && r->get_resp == msg->get_resp
  768. && r->put == msg->put
  769. && keys_match
  770. )
  771. {
  772. GNUNET_CONTAINER_DLL_remove (monitor_head, monitor_tail, r);
  773. GNUNET_free_non_null (r->key);
  774. GNUNET_free (r);
  775. GNUNET_SERVER_receive_done (client, GNUNET_OK);
  776. return; /* Delete only ONE entry */
  777. }
  778. r = r->next;
  779. }
  780. GNUNET_SERVER_receive_done (client, GNUNET_OK);
  781. }
  782. /**
  783. * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_ready
  784. * request. A ClientList is passed as closure, take the head of the list
  785. * and copy it into buf, which has the result of sending the message to the
  786. * client.
  787. *
  788. * @param cls closure to this call
  789. * @param size maximum number of bytes available to send
  790. * @param buf where to copy the actual message to
  791. *
  792. * @return the number of bytes actually copied, 0 indicates failure
  793. */
  794. static size_t
  795. send_reply_to_client (void *cls, size_t size, void *buf)
  796. {
  797. struct ClientList *client = cls;
  798. char *cbuf = buf;
  799. struct PendingMessage *reply;
  800. size_t off;
  801. size_t msize;
  802. client->transmit_handle = NULL;
  803. if (buf == NULL)
  804. {
  805. /* client disconnected */
  806. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  807. "Client %p disconnected, pending messages will be discarded\n",
  808. client->client_handle);
  809. return 0;
  810. }
  811. off = 0;
  812. while ((NULL != (reply = client->pending_head)) &&
  813. (size >= off + (msize = ntohs (reply->msg->size))))
  814. {
  815. GNUNET_CONTAINER_DLL_remove (client->pending_head, client->pending_tail,
  816. reply);
  817. memcpy (&cbuf[off], reply->msg, msize);
  818. GNUNET_free (reply);
  819. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u bytes to client %p\n",
  820. msize, client->client_handle);
  821. off += msize;
  822. }
  823. process_pending_messages (client);
  824. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitted %u/%u bytes to client %p\n",
  825. (unsigned int) off, (unsigned int) size, client->client_handle);
  826. return off;
  827. }
  828. /**
  829. * Task run to check for messages that need to be sent to a client.
  830. *
  831. * @param client a ClientList, containing the client and any messages to be sent to it
  832. */
  833. static void
  834. process_pending_messages (struct ClientList *client)
  835. {
  836. if ((client->pending_head == NULL) || (client->transmit_handle != NULL))
  837. {
  838. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  839. "Not asking for transmission to %p now: %s\n",
  840. client->client_handle,
  841. client->pending_head ==
  842. NULL ? "no more messages" : "request already pending");
  843. return;
  844. }
  845. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  846. "Asking for transmission of %u bytes to client %p\n",
  847. ntohs (client->pending_head->msg->size), client->client_handle);
  848. client->transmit_handle =
  849. GNUNET_SERVER_notify_transmit_ready (client->client_handle,
  850. ntohs (client->pending_head->
  851. msg->size),
  852. GNUNET_TIME_UNIT_FOREVER_REL,
  853. &send_reply_to_client, client);
  854. }
  855. /**
  856. * Closure for 'forward_reply'
  857. */
  858. struct ForwardReplyContext
  859. {
  860. /**
  861. * Actual message to send to matching clients.
  862. */
  863. struct PendingMessage *pm;
  864. /**
  865. * Embedded payload.
  866. */
  867. const void *data;
  868. /**
  869. * Type of the data.
  870. */
  871. enum GNUNET_BLOCK_Type type;
  872. /**
  873. * Number of bytes in data.
  874. */
  875. size_t data_size;
  876. /**
  877. * Do we need to copy 'pm' because it was already used?
  878. */
  879. int do_copy;
  880. };
  881. /**
  882. * Iterator over hash map entries that send a given reply to
  883. * each of the matching clients. With some tricky recycling
  884. * of the buffer.
  885. *
  886. * @param cls the 'struct ForwardReplyContext'
  887. * @param key current key
  888. * @param value value in the hash map, a ClientQueryRecord
  889. * @return GNUNET_YES (we should continue to iterate),
  890. * if the result is mal-formed, GNUNET_NO
  891. */
  892. static int
  893. forward_reply (void *cls, const struct GNUNET_HashCode * key, void *value)
  894. {
  895. struct ForwardReplyContext *frc = cls;
  896. struct ClientQueryRecord *record = value;
  897. struct PendingMessage *pm;
  898. struct GNUNET_DHT_ClientResultMessage *reply;
  899. enum GNUNET_BLOCK_EvaluationResult eval;
  900. int do_free;
  901. struct GNUNET_HashCode ch;
  902. unsigned int i;
  903. LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
  904. "R5N CLIENT-RESULT %s\n",
  905. GNUNET_h2s_full (key));
  906. if ((record->type != GNUNET_BLOCK_TYPE_ANY) && (record->type != frc->type))
  907. {
  908. LOG (GNUNET_ERROR_TYPE_DEBUG,
  909. "Record type missmatch, not passing request for key %s to local client\n",
  910. GNUNET_h2s (key));
  911. GNUNET_STATISTICS_update (GDS_stats,
  912. gettext_noop
  913. ("# Key match, type mismatches in REPLY to CLIENT"),
  914. 1, GNUNET_NO);
  915. return GNUNET_YES; /* type mismatch */
  916. }
  917. GNUNET_CRYPTO_hash (frc->data, frc->data_size, &ch);
  918. for (i = 0; i < record->seen_replies_count; i++)
  919. if (0 == memcmp (&record->seen_replies[i], &ch, sizeof (struct GNUNET_HashCode)))
  920. {
  921. LOG (GNUNET_ERROR_TYPE_DEBUG,
  922. "Duplicate reply, not passing request for key %s to local client\n",
  923. GNUNET_h2s (key));
  924. GNUNET_STATISTICS_update (GDS_stats,
  925. gettext_noop
  926. ("# Duplicate REPLIES to CLIENT request dropped"),
  927. 1, GNUNET_NO);
  928. return GNUNET_YES; /* duplicate */
  929. }
  930. eval =
  931. GNUNET_BLOCK_evaluate (GDS_block_context, record->type, key, NULL, 0,
  932. record->xquery, record->xquery_size, frc->data,
  933. frc->data_size);
  934. LOG (GNUNET_ERROR_TYPE_DEBUG,
  935. "Evaluation result is %d for key %s for local client's query\n",
  936. (int) eval, GNUNET_h2s (key));
  937. switch (eval)
  938. {
  939. case GNUNET_BLOCK_EVALUATION_OK_LAST:
  940. do_free = GNUNET_YES;
  941. break;
  942. case GNUNET_BLOCK_EVALUATION_OK_MORE:
  943. GNUNET_array_append (record->seen_replies, record->seen_replies_count, ch);
  944. do_free = GNUNET_NO;
  945. break;
  946. case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
  947. /* should be impossible to encounter here */
  948. GNUNET_break (0);
  949. return GNUNET_YES;
  950. case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
  951. GNUNET_break_op (0);
  952. return GNUNET_NO;
  953. case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
  954. GNUNET_break (0);
  955. return GNUNET_NO;
  956. case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
  957. GNUNET_break (0);
  958. return GNUNET_NO;
  959. case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
  960. return GNUNET_YES;
  961. case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
  962. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  963. _("Unsupported block type (%u) in request!\n"), record->type);
  964. return GNUNET_NO;
  965. default:
  966. GNUNET_break (0);
  967. return GNUNET_NO;
  968. }
  969. if (GNUNET_NO == frc->do_copy)
  970. {
  971. /* first time, we can use the original data */
  972. pm = frc->pm;
  973. frc->do_copy = GNUNET_YES;
  974. }
  975. else
  976. {
  977. /* two clients waiting for same reply, must copy for queueing */
  978. pm = GNUNET_malloc (sizeof (struct PendingMessage) +
  979. ntohs (frc->pm->msg->size));
  980. memcpy (pm, frc->pm,
  981. sizeof (struct PendingMessage) + ntohs (frc->pm->msg->size));
  982. pm->next = pm->prev = NULL;
  983. pm->msg = (struct GNUNET_MessageHeader *) &pm[1];
  984. }
  985. GNUNET_STATISTICS_update (GDS_stats,
  986. gettext_noop ("# RESULTS queued for clients"), 1,
  987. GNUNET_NO);
  988. reply = (struct GNUNET_DHT_ClientResultMessage *) &pm[1];
  989. reply->unique_id = record->unique_id;
  990. LOG (GNUNET_ERROR_TYPE_DEBUG,
  991. "Queueing reply to query %s for client %p\n",
  992. GNUNET_h2s (key),
  993. record->client->client_handle);
  994. add_pending_message (record->client, pm);
  995. if (GNUNET_YES == do_free)
  996. remove_client_records (record->client, key, record);
  997. return GNUNET_YES;
  998. }
  999. /**
  1000. * Handle a reply we've received from another peer. If the reply
  1001. * matches any of our pending queries, forward it to the respective
  1002. * client(s).
  1003. *
  1004. * @param expiration when will the reply expire
  1005. * @param key the query this reply is for
  1006. * @param get_path_length number of peers in @a get_path
  1007. * @param get_path path the reply took on get
  1008. * @param put_path_length number of peers in @a put_path
  1009. * @param put_path path the reply took on put
  1010. * @param type type of the reply
  1011. * @param data_size number of bytes in @a data
  1012. * @param data application payload data
  1013. */
  1014. void
  1015. GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration,
  1016. const struct GNUNET_HashCode *key,
  1017. unsigned int get_path_length,
  1018. const struct GNUNET_PeerIdentity *get_path,
  1019. unsigned int put_path_length,
  1020. const struct GNUNET_PeerIdentity *put_path,
  1021. enum GNUNET_BLOCK_Type type, size_t data_size,
  1022. const void *data)
  1023. {
  1024. struct ForwardReplyContext frc;
  1025. struct PendingMessage *pm;
  1026. struct GNUNET_DHT_ClientResultMessage *reply;
  1027. struct GNUNET_PeerIdentity *paths;
  1028. size_t msize;
  1029. LOG (GNUNET_ERROR_TYPE_DEBUG,
  1030. "reply for key %s\n",
  1031. GNUNET_h2s (key));
  1032. if (NULL == GNUNET_CONTAINER_multihashmap_get (forward_map, key))
  1033. {
  1034. GNUNET_STATISTICS_update (GDS_stats,
  1035. gettext_noop
  1036. ("# REPLIES ignored for CLIENTS (no match)"), 1,
  1037. GNUNET_NO);
  1038. return; /* no matching request, fast exit! */
  1039. }
  1040. msize =
  1041. sizeof (struct GNUNET_DHT_ClientResultMessage) + data_size +
  1042. (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
  1043. if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
  1044. {
  1045. GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
  1046. _("Could not pass reply to client, message too big!\n"));
  1047. return;
  1048. }
  1049. pm = GNUNET_malloc (msize + sizeof (struct PendingMessage));
  1050. reply = (struct GNUNET_DHT_ClientResultMessage *) &pm[1];
  1051. pm->msg = &reply->header;
  1052. reply->header.size = htons ((uint16_t) msize);
  1053. reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
  1054. reply->type = htonl (type);
  1055. reply->get_path_length = htonl (get_path_length);
  1056. reply->put_path_length = htonl (put_path_length);
  1057. reply->unique_id = 0; /* filled in later */
  1058. reply->expiration = GNUNET_TIME_absolute_hton (expiration);
  1059. reply->key = *key;
  1060. paths = (struct GNUNET_PeerIdentity *) &reply[1];
  1061. memcpy (paths, put_path,
  1062. sizeof (struct GNUNET_PeerIdentity) * put_path_length);
  1063. memcpy (&paths[put_path_length], get_path,
  1064. sizeof (struct GNUNET_PeerIdentity) * get_path_length);
  1065. memcpy (&paths[get_path_length + put_path_length], data, data_size);
  1066. frc.do_copy = GNUNET_NO;
  1067. frc.pm = pm;
  1068. frc.data = data;
  1069. frc.data_size = data_size;
  1070. frc.type = type;
  1071. GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, key, &forward_reply,
  1072. &frc);
  1073. if (GNUNET_NO == frc.do_copy)
  1074. {
  1075. /* did not match any of the requests, free! */
  1076. GNUNET_STATISTICS_update (GDS_stats,
  1077. gettext_noop
  1078. ("# REPLIES ignored for CLIENTS (no match)"), 1,
  1079. GNUNET_NO);
  1080. GNUNET_free (pm);
  1081. }
  1082. }
  1083. /**
  1084. * Check if some client is monitoring GET messages and notify
  1085. * them in that case.
  1086. *
  1087. * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
  1088. * @param type The type of data in the request.
  1089. * @param hop_count Hop count so far.
  1090. * @param path_length number of entries in path (or 0 if not recorded).
  1091. * @param path peers on the GET path (or NULL if not recorded).
  1092. * @param desired_replication_level Desired replication level.
  1093. * @param key Key of the requested data.
  1094. */
  1095. void
  1096. GDS_CLIENTS_process_get (uint32_t options,
  1097. enum GNUNET_BLOCK_Type type,
  1098. uint32_t hop_count,
  1099. uint32_t desired_replication_level,
  1100. unsigned int path_length,
  1101. const struct GNUNET_PeerIdentity *path,
  1102. const struct GNUNET_HashCode * key)
  1103. {
  1104. struct ClientMonitorRecord *m;
  1105. struct ClientList **cl;
  1106. unsigned int cl_size;
  1107. cl = NULL;
  1108. cl_size = 0;
  1109. for (m = monitor_head; NULL != m; m = m->next)
  1110. {
  1111. if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
  1112. (NULL == m->key ||
  1113. memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0))
  1114. {
  1115. struct PendingMessage *pm;
  1116. struct GNUNET_DHT_MonitorGetMessage *mmsg;
  1117. struct GNUNET_PeerIdentity *msg_path;
  1118. size_t msize;
  1119. unsigned int i;
  1120. /* Don't send duplicates */
  1121. for (i = 0; i < cl_size; i++)
  1122. if (cl[i] == m->client)
  1123. break;
  1124. if (i < cl_size)
  1125. continue;
  1126. GNUNET_array_append (cl, cl_size, m->client);
  1127. msize = path_length * sizeof (struct GNUNET_PeerIdentity);
  1128. msize += sizeof (struct GNUNET_DHT_MonitorGetMessage);
  1129. msize += sizeof (struct PendingMessage);
  1130. pm = GNUNET_malloc (msize);
  1131. mmsg = (struct GNUNET_DHT_MonitorGetMessage *) &pm[1];
  1132. pm->msg = &mmsg->header;
  1133. mmsg->header.size = htons (msize - sizeof (struct PendingMessage));
  1134. mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
  1135. mmsg->options = htonl(options);
  1136. mmsg->type = htonl(type);
  1137. mmsg->hop_count = htonl(hop_count);
  1138. mmsg->desired_replication_level = htonl(desired_replication_level);
  1139. mmsg->get_path_length = htonl(path_length);
  1140. memcpy (&mmsg->key, key, sizeof (struct GNUNET_HashCode));
  1141. msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
  1142. if (path_length > 0)
  1143. memcpy (msg_path, path,
  1144. path_length * sizeof (struct GNUNET_PeerIdentity));
  1145. add_pending_message (m->client, pm);
  1146. }
  1147. }
  1148. GNUNET_free_non_null (cl);
  1149. }
  1150. /**
  1151. * Check if some client is monitoring GET RESP messages and notify
  1152. * them in that case.
  1153. *
  1154. * @param type The type of data in the result.
  1155. * @param get_path Peers on GET path (or NULL if not recorded).
  1156. * @param get_path_length number of entries in get_path.
  1157. * @param put_path peers on the PUT path (or NULL if not recorded).
  1158. * @param put_path_length number of entries in get_path.
  1159. * @param exp Expiration time of the data.
  1160. * @param key Key of the data.
  1161. * @param data Pointer to the result data.
  1162. * @param size Number of bytes in data.
  1163. */
  1164. void
  1165. GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
  1166. const struct GNUNET_PeerIdentity *get_path,
  1167. unsigned int get_path_length,
  1168. const struct GNUNET_PeerIdentity *put_path,
  1169. unsigned int put_path_length,
  1170. struct GNUNET_TIME_Absolute exp,
  1171. const struct GNUNET_HashCode * key,
  1172. const void *data,
  1173. size_t size)
  1174. {
  1175. struct ClientMonitorRecord *m;
  1176. struct ClientList **cl;
  1177. unsigned int cl_size;
  1178. cl = NULL;
  1179. cl_size = 0;
  1180. for (m = monitor_head; NULL != m; m = m->next)
  1181. {
  1182. if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
  1183. (NULL == m->key ||
  1184. memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0))
  1185. {
  1186. struct PendingMessage *pm;
  1187. struct GNUNET_DHT_MonitorGetRespMessage *mmsg;
  1188. struct GNUNET_PeerIdentity *path;
  1189. size_t msize;
  1190. unsigned int i;
  1191. /* Don't send duplicates */
  1192. for (i = 0; i < cl_size; i++)
  1193. if (cl[i] == m->client)
  1194. break;
  1195. if (i < cl_size)
  1196. continue;
  1197. GNUNET_array_append (cl, cl_size, m->client);
  1198. msize = size;
  1199. msize += (get_path_length + put_path_length)
  1200. * sizeof (struct GNUNET_PeerIdentity);
  1201. msize += sizeof (struct GNUNET_DHT_MonitorGetRespMessage);
  1202. msize += sizeof (struct PendingMessage);
  1203. pm = GNUNET_malloc (msize);
  1204. mmsg = (struct GNUNET_DHT_MonitorGetRespMessage *) &pm[1];
  1205. pm->msg = (struct GNUNET_MessageHeader *) mmsg;
  1206. mmsg->header.size = htons (msize - sizeof (struct PendingMessage));
  1207. mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP);
  1208. mmsg->type = htonl(type);
  1209. mmsg->put_path_length = htonl(put_path_length);
  1210. mmsg->get_path_length = htonl(get_path_length);
  1211. path = (struct GNUNET_PeerIdentity *) &mmsg[1];
  1212. if (put_path_length > 0)
  1213. {
  1214. memcpy (path, put_path,
  1215. put_path_length * sizeof (struct GNUNET_PeerIdentity));
  1216. path = &path[put_path_length];
  1217. }
  1218. if (get_path_length > 0)
  1219. memcpy (path, get_path,
  1220. get_path_length * sizeof (struct GNUNET_PeerIdentity));
  1221. mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp);
  1222. memcpy (&mmsg->key, key, sizeof (struct GNUNET_HashCode));
  1223. if (size > 0)
  1224. memcpy (&path[get_path_length], data, size);
  1225. add_pending_message (m->client, pm);
  1226. }
  1227. }
  1228. GNUNET_free_non_null (cl);
  1229. }
  1230. /**
  1231. * Check if some client is monitoring PUT messages and notify
  1232. * them in that case.
  1233. *
  1234. * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
  1235. * @param type The type of data in the request.
  1236. * @param hop_count Hop count so far.
  1237. * @param path_length number of entries in path (or 0 if not recorded).
  1238. * @param path peers on the PUT path (or NULL if not recorded).
  1239. * @param desired_replication_level Desired replication level.
  1240. * @param exp Expiration time of the data.
  1241. * @param key Key under which data is to be stored.
  1242. * @param data Pointer to the data carried.
  1243. * @param size Number of bytes in data.
  1244. */
  1245. void
  1246. GDS_CLIENTS_process_put (uint32_t options,
  1247. enum GNUNET_BLOCK_Type type,
  1248. uint32_t hop_count,
  1249. uint32_t desired_replication_level,
  1250. unsigned int path_length,
  1251. const struct GNUNET_PeerIdentity *path,
  1252. struct GNUNET_TIME_Absolute exp,
  1253. const struct GNUNET_HashCode * key,
  1254. const void *data,
  1255. size_t size)
  1256. {
  1257. struct ClientMonitorRecord *m;
  1258. struct ClientList **cl;
  1259. unsigned int cl_size;
  1260. cl = NULL;
  1261. cl_size = 0;
  1262. for (m = monitor_head; NULL != m; m = m->next)
  1263. {
  1264. if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
  1265. (NULL == m->key ||
  1266. memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0))
  1267. {
  1268. struct PendingMessage *pm;
  1269. struct GNUNET_DHT_MonitorPutMessage *mmsg;
  1270. struct GNUNET_PeerIdentity *msg_path;
  1271. size_t msize;
  1272. unsigned int i;
  1273. /* Don't send duplicates */
  1274. for (i = 0; i < cl_size; i++)
  1275. if (cl[i] == m->client)
  1276. break;
  1277. if (i < cl_size)
  1278. continue;
  1279. GNUNET_array_append (cl, cl_size, m->client);
  1280. msize = size;
  1281. msize += path_length * sizeof (struct GNUNET_PeerIdentity);
  1282. msize += sizeof (struct GNUNET_DHT_MonitorPutMessage);
  1283. msize += sizeof (struct PendingMessage);
  1284. pm = GNUNET_malloc (msize);
  1285. mmsg = (struct GNUNET_DHT_MonitorPutMessage *) &pm[1];
  1286. pm->msg = (struct GNUNET_MessageHeader *) mmsg;
  1287. mmsg->header.size = htons (msize - sizeof (struct PendingMessage));
  1288. mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT);
  1289. mmsg->options = htonl(options);
  1290. mmsg->type = htonl(type);
  1291. mmsg->hop_count = htonl(hop_count);
  1292. mmsg->desired_replication_level = htonl(desired_replication_level);
  1293. mmsg->put_path_length = htonl(path_length);
  1294. msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
  1295. if (path_length > 0)
  1296. {
  1297. memcpy (msg_path, path,
  1298. path_length * sizeof (struct GNUNET_PeerIdentity));
  1299. }
  1300. mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp);
  1301. memcpy (&mmsg->key, key, sizeof (struct GNUNET_HashCode));
  1302. if (size > 0)
  1303. memcpy (&msg_path[path_length], data, size);
  1304. add_pending_message (m->client, pm);
  1305. }
  1306. }
  1307. GNUNET_free_non_null (cl);
  1308. }
  1309. /**
  1310. * Initialize client subsystem.
  1311. *
  1312. * @param server the initialized server
  1313. */
  1314. void
  1315. GDS_CLIENTS_init (struct GNUNET_SERVER_Handle *server)
  1316. {
  1317. static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
  1318. {&handle_dht_local_put, NULL,
  1319. GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, 0},
  1320. {&handle_dht_local_get, NULL,
  1321. GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, 0},
  1322. {&handle_dht_local_get_stop, NULL,
  1323. GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP,
  1324. sizeof (struct GNUNET_DHT_ClientGetStopMessage)},
  1325. {&handle_dht_local_monitor, NULL,
  1326. GNUNET_MESSAGE_TYPE_DHT_MONITOR_START,
  1327. sizeof (struct GNUNET_DHT_MonitorStartStopMessage)},
  1328. {&handle_dht_local_monitor_stop, NULL,
  1329. GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP,
  1330. sizeof (struct GNUNET_DHT_MonitorStartStopMessage)},
  1331. {&handle_dht_local_get_result_seen, NULL,
  1332. GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN, 0},
  1333. {NULL, NULL, 0, 0}
  1334. };
  1335. forward_map = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_NO);
  1336. retry_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
  1337. GNUNET_SERVER_add_handlers (server, plugin_handlers);
  1338. GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
  1339. }
  1340. /**
  1341. * Shutdown client subsystem.
  1342. */
  1343. void
  1344. GDS_CLIENTS_done ()
  1345. {
  1346. GNUNET_assert (client_head == NULL);
  1347. GNUNET_assert (client_tail == NULL);
  1348. if (GNUNET_SCHEDULER_NO_TASK != retry_task)
  1349. {
  1350. GNUNET_SCHEDULER_cancel (retry_task);
  1351. retry_task = GNUNET_SCHEDULER_NO_TASK;
  1352. }
  1353. if (NULL != retry_heap)
  1354. {
  1355. GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap));
  1356. GNUNET_CONTAINER_heap_destroy (retry_heap);
  1357. retry_heap = NULL;
  1358. }
  1359. if (NULL != forward_map)
  1360. {
  1361. GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map));
  1362. GNUNET_CONTAINER_multihashmap_destroy (forward_map);
  1363. forward_map = NULL;
  1364. }
  1365. }
  1366. /* end of gnunet-service-dht_clients.c */