plugin_datastore_postgres.c 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979
  1. /*
  2. This file is part of GNUnet
  3. Copyright (C) 2009-2017 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file datastore/plugin_datastore_postgres.c
  18. * @brief postgres-based datastore backend
  19. * @author Christian Grothoff
  20. */
  21. #include "platform.h"
  22. #include "gnunet_datastore_plugin.h"
  23. #include "gnunet_pq_lib.h"
  24. /**
  25. * After how many ms "busy" should a DB operation fail for good?
  26. * A low value makes sure that we are more responsive to requests
  27. * (especially PUTs). A high value guarantees a higher success
  28. * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
  29. *
  30. * The default value of 1s should ensure that users do not experience
  31. * huge latencies while at the same time allowing operations to succeed
  32. * with reasonable probability.
  33. */
  34. #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
  35. /**
  36. * Context for all functions in this plugin.
  37. */
  38. struct Plugin
  39. {
  40. /**
  41. * Our execution environment.
  42. */
  43. struct GNUNET_DATASTORE_PluginEnvironment *env;
  44. /**
  45. * Native Postgres database handle.
  46. */
  47. PGconn *dbh;
  48. };
  49. /**
  50. * @brief Get a database handle
  51. *
  52. * @param plugin global context
  53. * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
  54. */
  55. static int
  56. init_connection (struct Plugin *plugin)
  57. {
  58. struct GNUNET_PQ_ExecuteStatement es[] = {
  59. /* FIXME: PostgreSQL does not have unsigned integers! This is ok for the type column because
  60. * we only test equality on it and can cast it to/from uint32_t. For repl, prio, and anonLevel
  61. * we do math or inequality tests, so we can't handle the entire range of uint32_t.
  62. * This will also cause problems for expiration times after 294247-01-10-04:00:54 UTC.
  63. * PostgreSQL also recommends against using WITH OIDS.
  64. */
  65. GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS gn090 ("
  66. " repl INTEGER NOT NULL DEFAULT 0,"
  67. " type INTEGER NOT NULL DEFAULT 0,"
  68. " prio INTEGER NOT NULL DEFAULT 0,"
  69. " anonLevel INTEGER NOT NULL DEFAULT 0,"
  70. " expire BIGINT NOT NULL DEFAULT 0,"
  71. " rvalue BIGINT NOT NULL DEFAULT 0,"
  72. " hash BYTEA NOT NULL DEFAULT '',"
  73. " vhash BYTEA NOT NULL DEFAULT '',"
  74. " value BYTEA NOT NULL DEFAULT '')"
  75. "WITH OIDS"),
  76. GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_hash ON gn090 (hash)"),
  77. GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_prio ON gn090 (prio)"),
  78. GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_expire ON gn090 (expire)"),
  79. GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_prio_anon ON gn090 (prio,anonLevel)"),
  80. GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)"),
  81. GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_repl_rvalue ON gn090 (repl,rvalue)"),
  82. GNUNET_PQ_make_try_execute ("CREATE INDEX IF NOT EXISTS idx_expire_hash ON gn090 (expire,hash)"),
  83. GNUNET_PQ_make_execute ("ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL"),
  84. GNUNET_PQ_make_execute ("ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN"),
  85. GNUNET_PQ_make_execute ("ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN"),
  86. GNUNET_PQ_EXECUTE_STATEMENT_END
  87. };
  88. #define RESULT_COLUMNS "repl, type, prio, anonLevel, expire, hash, value, oid"
  89. struct GNUNET_PQ_PreparedStatement ps[] = {
  90. GNUNET_PQ_make_prepare ("get",
  91. "SELECT " RESULT_COLUMNS " FROM gn090"
  92. " WHERE oid >= $1::bigint AND"
  93. " (rvalue >= $2 OR 0 = $3::smallint) AND"
  94. " (hash = $4 OR 0 = $5::smallint) AND"
  95. " (type = $6 OR 0 = $7::smallint)"
  96. " ORDER BY oid ASC LIMIT 1",
  97. 7),
  98. GNUNET_PQ_make_prepare ("put",
  99. "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
  100. "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
  101. 9),
  102. GNUNET_PQ_make_prepare ("update",
  103. "UPDATE gn090"
  104. " SET prio = prio + $1,"
  105. " repl = repl + $2,"
  106. " expire = GREATEST(expire, $3)"
  107. " WHERE hash = $4 AND vhash = $5",
  108. 5),
  109. GNUNET_PQ_make_prepare ("decrepl",
  110. "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
  111. "WHERE oid = $1",
  112. 1),
  113. GNUNET_PQ_make_prepare ("select_non_anonymous",
  114. "SELECT " RESULT_COLUMNS " FROM gn090 "
  115. "WHERE anonLevel = 0 AND type = $1 AND oid >= $2::bigint "
  116. "ORDER BY oid ASC LIMIT 1",
  117. 2),
  118. GNUNET_PQ_make_prepare ("select_expiration_order",
  119. "(SELECT " RESULT_COLUMNS " FROM gn090 "
  120. "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) "
  121. "UNION "
  122. "(SELECT " RESULT_COLUMNS " FROM gn090 "
  123. "ORDER BY prio ASC LIMIT 1) "
  124. "ORDER BY expire ASC LIMIT 1",
  125. 1),
  126. GNUNET_PQ_make_prepare ("select_replication_order",
  127. "SELECT " RESULT_COLUMNS " FROM gn090 "
  128. "ORDER BY repl DESC,RANDOM() LIMIT 1",
  129. 0),
  130. GNUNET_PQ_make_prepare ("delrow",
  131. "DELETE FROM gn090 "
  132. "WHERE oid=$1",
  133. 1),
  134. GNUNET_PQ_make_prepare ("remove",
  135. "DELETE FROM gn090"
  136. " WHERE hash = $1 AND"
  137. " value = $2",
  138. 2),
  139. GNUNET_PQ_make_prepare ("get_keys",
  140. "SELECT hash FROM gn090",
  141. 0),
  142. GNUNET_PQ_make_prepare ("estimate_size",
  143. "SELECT CASE WHEN NOT EXISTS"
  144. " (SELECT 1 FROM gn090)"
  145. " THEN 0"
  146. " ELSE (SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090)"
  147. "END AS total",
  148. 0),
  149. GNUNET_PQ_PREPARED_STATEMENT_END
  150. };
  151. #undef RESULT_COLUMNS
  152. plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->env->cfg,
  153. "datastore-postgres");
  154. if (NULL == plugin->dbh)
  155. return GNUNET_SYSERR;
  156. if ( (GNUNET_OK !=
  157. GNUNET_PQ_exec_statements (plugin->dbh,
  158. es)) ||
  159. (GNUNET_OK !=
  160. GNUNET_PQ_prepare_statements (plugin->dbh,
  161. ps)) )
  162. {
  163. PQfinish (plugin->dbh);
  164. plugin->dbh = NULL;
  165. return GNUNET_SYSERR;
  166. }
  167. return GNUNET_OK;
  168. }
  169. /**
  170. * Get an estimate of how much space the database is
  171. * currently using.
  172. *
  173. * @param cls our `struct Plugin *`
  174. * @return number of bytes used on disk
  175. */
  176. static void
  177. postgres_plugin_estimate_size (void *cls,
  178. unsigned long long *estimate)
  179. {
  180. struct Plugin *plugin = cls;
  181. uint64_t total;
  182. struct GNUNET_PQ_QueryParam params[] = {
  183. GNUNET_PQ_query_param_end
  184. };
  185. struct GNUNET_PQ_ResultSpec rs[] = {
  186. GNUNET_PQ_result_spec_uint64 ("total",
  187. &total),
  188. GNUNET_PQ_result_spec_end
  189. };
  190. enum GNUNET_DB_QueryStatus ret;
  191. if (NULL == estimate)
  192. return;
  193. ret = GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh,
  194. "estimate_size",
  195. params,
  196. rs);
  197. if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != ret)
  198. {
  199. *estimate = 0LL;
  200. return;
  201. }
  202. *estimate = total;
  203. }
  204. /**
  205. * Store an item in the datastore.
  206. *
  207. * @param cls closure with the `struct Plugin`
  208. * @param key key for the item
  209. * @param absent true if the key was not found in the bloom filter
  210. * @param size number of bytes in data
  211. * @param data content stored
  212. * @param type type of the content
  213. * @param priority priority of the content
  214. * @param anonymity anonymity-level for the content
  215. * @param replication replication-level for the content
  216. * @param expiration expiration time for the content
  217. * @param cont continuation called with success or failure status
  218. * @param cont_cls continuation closure
  219. */
  220. static void
  221. postgres_plugin_put (void *cls,
  222. const struct GNUNET_HashCode *key,
  223. bool absent,
  224. uint32_t size,
  225. const void *data,
  226. enum GNUNET_BLOCK_Type type,
  227. uint32_t priority,
  228. uint32_t anonymity,
  229. uint32_t replication,
  230. struct GNUNET_TIME_Absolute expiration,
  231. PluginPutCont cont,
  232. void *cont_cls)
  233. {
  234. struct Plugin *plugin = cls;
  235. struct GNUNET_HashCode vhash;
  236. enum GNUNET_DB_QueryStatus ret;
  237. GNUNET_CRYPTO_hash (data,
  238. size,
  239. &vhash);
  240. if (! absent)
  241. {
  242. struct GNUNET_PQ_QueryParam params[] = {
  243. GNUNET_PQ_query_param_uint32 (&priority),
  244. GNUNET_PQ_query_param_uint32 (&replication),
  245. GNUNET_PQ_query_param_absolute_time (&expiration),
  246. GNUNET_PQ_query_param_auto_from_type (key),
  247. GNUNET_PQ_query_param_auto_from_type (&vhash),
  248. GNUNET_PQ_query_param_end
  249. };
  250. ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
  251. "update",
  252. params);
  253. if (0 > ret)
  254. {
  255. cont (cont_cls,
  256. key,
  257. size,
  258. GNUNET_SYSERR,
  259. _("Postgress exec failure"));
  260. return;
  261. }
  262. bool affected = (0 != ret);
  263. if (affected)
  264. {
  265. cont (cont_cls,
  266. key,
  267. size,
  268. GNUNET_NO,
  269. NULL);
  270. return;
  271. }
  272. }
  273. {
  274. uint32_t utype = (uint32_t) type;
  275. uint64_t rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
  276. UINT64_MAX);
  277. struct GNUNET_PQ_QueryParam params[] = {
  278. GNUNET_PQ_query_param_uint32 (&replication),
  279. GNUNET_PQ_query_param_uint32 (&utype),
  280. GNUNET_PQ_query_param_uint32 (&priority),
  281. GNUNET_PQ_query_param_uint32 (&anonymity),
  282. GNUNET_PQ_query_param_absolute_time (&expiration),
  283. GNUNET_PQ_query_param_uint64 (&rvalue),
  284. GNUNET_PQ_query_param_auto_from_type (key),
  285. GNUNET_PQ_query_param_auto_from_type (&vhash),
  286. GNUNET_PQ_query_param_fixed_size (data, size),
  287. GNUNET_PQ_query_param_end
  288. };
  289. ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
  290. "put",
  291. params);
  292. if (0 > ret)
  293. {
  294. cont (cont_cls,
  295. key,
  296. size,
  297. GNUNET_SYSERR,
  298. "Postgress exec failure");
  299. return;
  300. }
  301. }
  302. plugin->env->duc (plugin->env->cls,
  303. size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
  304. GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
  305. "datastore-postgres",
  306. "Stored %u bytes in database\n",
  307. (unsigned int) size);
  308. cont (cont_cls,
  309. key,
  310. size,
  311. GNUNET_OK,
  312. NULL);
  313. }
  314. /**
  315. * Closure for #process_result.
  316. */
  317. struct ProcessResultContext
  318. {
  319. /**
  320. * The plugin handle.
  321. */
  322. struct Plugin *plugin;
  323. /**
  324. * Function to call on each result.
  325. */
  326. PluginDatumProcessor proc;
  327. /**
  328. * Closure for @e proc.
  329. */
  330. void *proc_cls;
  331. };
  332. /**
  333. * Function invoked to process the result and call the processor of @a
  334. * cls.
  335. *
  336. * @param cls our `struct ProcessResultContext`
  337. * @param res result from exec
  338. * @param num_results number of results in @a res
  339. */
  340. static void
  341. process_result (void *cls,
  342. PGresult *res,
  343. unsigned int num_results)
  344. {
  345. struct ProcessResultContext *prc = cls;
  346. struct Plugin *plugin = prc->plugin;
  347. if (0 == num_results)
  348. {
  349. /* no result */
  350. GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
  351. "datastore-postgres",
  352. "Ending iteration (no more results)\n");
  353. prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
  354. GNUNET_TIME_UNIT_ZERO_ABS, 0);
  355. return;
  356. }
  357. if (1 != num_results)
  358. {
  359. GNUNET_break (0);
  360. prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
  361. GNUNET_TIME_UNIT_ZERO_ABS, 0);
  362. return;
  363. }
  364. /* Technically we don't need the loop here, but nicer in case
  365. we ever relax the condition above. */
  366. for (unsigned int i=0;i<num_results;i++)
  367. {
  368. int iret;
  369. uint32_t rowid;
  370. uint32_t utype;
  371. uint32_t anonymity;
  372. uint32_t replication;
  373. uint32_t priority;
  374. size_t size;
  375. void *data;
  376. struct GNUNET_TIME_Absolute expiration_time;
  377. struct GNUNET_HashCode key;
  378. struct GNUNET_PQ_ResultSpec rs[] = {
  379. GNUNET_PQ_result_spec_uint32 ("repl", &replication),
  380. GNUNET_PQ_result_spec_uint32 ("type", &utype),
  381. GNUNET_PQ_result_spec_uint32 ("prio", &priority),
  382. GNUNET_PQ_result_spec_uint32 ("anonLevel", &anonymity),
  383. GNUNET_PQ_result_spec_absolute_time ("expire", &expiration_time),
  384. GNUNET_PQ_result_spec_auto_from_type ("hash", &key),
  385. GNUNET_PQ_result_spec_variable_size ("value", &data, &size),
  386. GNUNET_PQ_result_spec_uint32 ("oid", &rowid),
  387. GNUNET_PQ_result_spec_end
  388. };
  389. if (GNUNET_OK !=
  390. GNUNET_PQ_extract_result (res,
  391. rs,
  392. i))
  393. {
  394. GNUNET_break (0);
  395. prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
  396. GNUNET_TIME_UNIT_ZERO_ABS, 0);
  397. return;
  398. }
  399. GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
  400. "datastore-postgres",
  401. "Found result of size %u bytes and type %u in database\n",
  402. (unsigned int) size,
  403. (unsigned int) utype);
  404. iret = prc->proc (prc->proc_cls,
  405. &key,
  406. size,
  407. data,
  408. (enum GNUNET_BLOCK_Type) utype,
  409. priority,
  410. anonymity,
  411. replication,
  412. expiration_time,
  413. rowid);
  414. if (iret == GNUNET_NO)
  415. {
  416. struct GNUNET_PQ_QueryParam param[] = {
  417. GNUNET_PQ_query_param_uint32 (&rowid),
  418. GNUNET_PQ_query_param_end
  419. };
  420. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  421. "Processor asked for item %u to be removed.\n",
  422. (unsigned int) rowid);
  423. if (0 <
  424. GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
  425. "delrow",
  426. param))
  427. {
  428. GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
  429. "datastore-postgres",
  430. "Deleting %u bytes from database\n",
  431. (unsigned int) size);
  432. plugin->env->duc (plugin->env->cls,
  433. - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
  434. GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
  435. "datastore-postgres",
  436. "Deleted %u bytes from database\n",
  437. (unsigned int) size);
  438. }
  439. }
  440. GNUNET_PQ_cleanup_result (rs);
  441. } /* for (i) */
  442. }
  443. /**
  444. * Get one of the results for a particular key in the datastore.
  445. *
  446. * @param cls closure with the `struct Plugin`
  447. * @param next_uid return the result with lowest uid >= next_uid
  448. * @param random if true, return a random result instead of using next_uid
  449. * @param key maybe NULL (to match all entries)
  450. * @param type entries of which type are relevant?
  451. * Use 0 for any type.
  452. * @param proc function to call on the matching value;
  453. * will be called with NULL if nothing matches
  454. * @param proc_cls closure for @a proc
  455. */
  456. static void
  457. postgres_plugin_get_key (void *cls,
  458. uint64_t next_uid,
  459. bool random,
  460. const struct GNUNET_HashCode *key,
  461. enum GNUNET_BLOCK_Type type,
  462. PluginDatumProcessor proc,
  463. void *proc_cls)
  464. {
  465. struct Plugin *plugin = cls;
  466. uint32_t utype = type;
  467. uint16_t use_rvalue = random;
  468. uint16_t use_key = NULL != key;
  469. uint16_t use_type = GNUNET_BLOCK_TYPE_ANY != type;
  470. uint64_t rvalue;
  471. struct GNUNET_PQ_QueryParam params[] = {
  472. GNUNET_PQ_query_param_uint64 (&next_uid),
  473. GNUNET_PQ_query_param_uint64 (&rvalue),
  474. GNUNET_PQ_query_param_uint16 (&use_rvalue),
  475. GNUNET_PQ_query_param_auto_from_type (key),
  476. GNUNET_PQ_query_param_uint16 (&use_key),
  477. GNUNET_PQ_query_param_uint32 (&utype),
  478. GNUNET_PQ_query_param_uint16 (&use_type),
  479. GNUNET_PQ_query_param_end
  480. };
  481. struct ProcessResultContext prc;
  482. enum GNUNET_DB_QueryStatus res;
  483. if (random)
  484. {
  485. rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
  486. UINT64_MAX);
  487. next_uid = 0;
  488. }
  489. else
  490. {
  491. rvalue = 0;
  492. }
  493. prc.plugin = plugin;
  494. prc.proc = proc;
  495. prc.proc_cls = proc_cls;
  496. res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
  497. "get",
  498. params,
  499. &process_result,
  500. &prc);
  501. if (0 > res)
  502. proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
  503. GNUNET_TIME_UNIT_ZERO_ABS, 0);
  504. }
  505. /**
  506. * Select a subset of the items in the datastore and call
  507. * the given iterator for each of them.
  508. *
  509. * @param cls our `struct Plugin *`
  510. * @param next_uid return the result with lowest uid >= next_uid
  511. * @param type entries of which type should be considered?
  512. * Must not be zero (ANY).
  513. * @param proc function to call on the matching value;
  514. * will be called with NULL if no value matches
  515. * @param proc_cls closure for @a proc
  516. */
  517. static void
  518. postgres_plugin_get_zero_anonymity (void *cls,
  519. uint64_t next_uid,
  520. enum GNUNET_BLOCK_Type type,
  521. PluginDatumProcessor proc,
  522. void *proc_cls)
  523. {
  524. struct Plugin *plugin = cls;
  525. uint32_t utype = type;
  526. struct GNUNET_PQ_QueryParam params[] = {
  527. GNUNET_PQ_query_param_uint32 (&utype),
  528. GNUNET_PQ_query_param_uint64 (&next_uid),
  529. GNUNET_PQ_query_param_end
  530. };
  531. struct ProcessResultContext prc;
  532. enum GNUNET_DB_QueryStatus res;
  533. prc.plugin = plugin;
  534. prc.proc = proc;
  535. prc.proc_cls = proc_cls;
  536. res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
  537. "select_non_anonymous",
  538. params,
  539. &process_result,
  540. &prc);
  541. if (0 > res)
  542. proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
  543. GNUNET_TIME_UNIT_ZERO_ABS, 0);
  544. }
  545. /**
  546. * Context for #repl_iter() function.
  547. */
  548. struct ReplCtx
  549. {
  550. /**
  551. * Plugin handle.
  552. */
  553. struct Plugin *plugin;
  554. /**
  555. * Function to call for the result (or the NULL).
  556. */
  557. PluginDatumProcessor proc;
  558. /**
  559. * Closure for @e proc.
  560. */
  561. void *proc_cls;
  562. };
  563. /**
  564. * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
  565. * Decrements the replication counter and calls the original
  566. * iterator.
  567. *
  568. * @param cls closure with the `struct ReplCtx *`
  569. * @param key key for the content
  570. * @param size number of bytes in @a data
  571. * @param data content stored
  572. * @param type type of the content
  573. * @param priority priority of the content
  574. * @param anonymity anonymity-level for the content
  575. * @param replication replication-level for the content
  576. * @param expiration expiration time for the content
  577. * @param uid unique identifier for the datum;
  578. * maybe 0 if no unique identifier is available
  579. * @return #GNUNET_SYSERR to abort the iteration,
  580. * #GNUNET_OK to continue
  581. * (continue on call to "next", of course),
  582. * #GNUNET_NO to delete the item and continue (if supported)
  583. */
  584. static int
  585. repl_proc (void *cls,
  586. const struct GNUNET_HashCode *key,
  587. uint32_t size,
  588. const void *data,
  589. enum GNUNET_BLOCK_Type type,
  590. uint32_t priority,
  591. uint32_t anonymity,
  592. uint32_t replication,
  593. struct GNUNET_TIME_Absolute expiration,
  594. uint64_t uid)
  595. {
  596. struct ReplCtx *rc = cls;
  597. struct Plugin *plugin = rc->plugin;
  598. int ret;
  599. uint32_t oid = (uint32_t) uid;
  600. struct GNUNET_PQ_QueryParam params[] = {
  601. GNUNET_PQ_query_param_uint32 (&oid),
  602. GNUNET_PQ_query_param_end
  603. };
  604. enum GNUNET_DB_QueryStatus qret;
  605. ret = rc->proc (rc->proc_cls,
  606. key,
  607. size,
  608. data,
  609. type,
  610. priority,
  611. anonymity,
  612. replication,
  613. expiration,
  614. uid);
  615. if (NULL == key)
  616. return ret;
  617. qret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
  618. "decrepl",
  619. params);
  620. if (0 > qret)
  621. return GNUNET_SYSERR;
  622. return ret;
  623. }
  624. /**
  625. * Get a random item for replication. Returns a single, not expired,
  626. * random item from those with the highest replication counters. The
  627. * item's replication counter is decremented by one IF it was positive
  628. * before. Call @a proc with all values ZERO or NULL if the datastore
  629. * is empty.
  630. *
  631. * @param cls closure with the `struct Plugin`
  632. * @param proc function to call the value (once only).
  633. * @param proc_cls closure for @a proc
  634. */
  635. static void
  636. postgres_plugin_get_replication (void *cls,
  637. PluginDatumProcessor proc,
  638. void *proc_cls)
  639. {
  640. struct Plugin *plugin = cls;
  641. struct GNUNET_PQ_QueryParam params[] = {
  642. GNUNET_PQ_query_param_end
  643. };
  644. struct ReplCtx rc;
  645. struct ProcessResultContext prc;
  646. enum GNUNET_DB_QueryStatus res;
  647. rc.plugin = plugin;
  648. rc.proc = proc;
  649. rc.proc_cls = proc_cls;
  650. prc.plugin = plugin;
  651. prc.proc = &repl_proc;
  652. prc.proc_cls = &rc;
  653. res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
  654. "select_replication_order",
  655. params,
  656. &process_result,
  657. &prc);
  658. if (0 > res)
  659. proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
  660. GNUNET_TIME_UNIT_ZERO_ABS, 0);
  661. }
  662. /**
  663. * Get a random item for expiration. Call @a proc with all values
  664. * ZERO or NULL if the datastore is empty.
  665. *
  666. * @param cls closure with the `struct Plugin`
  667. * @param proc function to call the value (once only).
  668. * @param proc_cls closure for @a proc
  669. */
  670. static void
  671. postgres_plugin_get_expiration (void *cls,
  672. PluginDatumProcessor proc,
  673. void *proc_cls)
  674. {
  675. struct Plugin *plugin = cls;
  676. struct GNUNET_TIME_Absolute now;
  677. struct GNUNET_PQ_QueryParam params[] = {
  678. GNUNET_PQ_query_param_absolute_time (&now),
  679. GNUNET_PQ_query_param_end
  680. };
  681. struct ProcessResultContext prc;
  682. now = GNUNET_TIME_absolute_get ();
  683. prc.plugin = plugin;
  684. prc.proc = proc;
  685. prc.proc_cls = proc_cls;
  686. (void) GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
  687. "select_expiration_order",
  688. params,
  689. &process_result,
  690. &prc);
  691. }
  692. /**
  693. * Closure for #process_keys.
  694. */
  695. struct ProcessKeysContext
  696. {
  697. /**
  698. * Function to call for each key.
  699. */
  700. PluginKeyProcessor proc;
  701. /**
  702. * Closure for @e proc.
  703. */
  704. void *proc_cls;
  705. };
  706. /**
  707. * Function to be called with the results of a SELECT statement
  708. * that has returned @a num_results results.
  709. *
  710. * @param cls closure with a `struct ProcessKeysContext`
  711. * @param result the postgres result
  712. * @param num_result the number of results in @a result
  713. */
  714. static void
  715. process_keys (void *cls,
  716. PGresult *result,
  717. unsigned int num_results)
  718. {
  719. struct ProcessKeysContext *pkc = cls;
  720. for (unsigned i=0;i<num_results;i++)
  721. {
  722. struct GNUNET_HashCode key;
  723. struct GNUNET_PQ_ResultSpec rs[] = {
  724. GNUNET_PQ_result_spec_auto_from_type ("hash",
  725. &key),
  726. GNUNET_PQ_result_spec_end
  727. };
  728. if (GNUNET_OK !=
  729. GNUNET_PQ_extract_result (result,
  730. rs,
  731. i))
  732. {
  733. GNUNET_break (0);
  734. continue;
  735. }
  736. pkc->proc (pkc->proc_cls,
  737. &key,
  738. 1);
  739. GNUNET_PQ_cleanup_result (rs);
  740. }
  741. }
  742. /**
  743. * Get all of the keys in the datastore.
  744. *
  745. * @param cls closure with the `struct Plugin *`
  746. * @param proc function to call on each key
  747. * @param proc_cls closure for @a proc
  748. */
  749. static void
  750. postgres_plugin_get_keys (void *cls,
  751. PluginKeyProcessor proc,
  752. void *proc_cls)
  753. {
  754. struct Plugin *plugin = cls;
  755. struct GNUNET_PQ_QueryParam params[] = {
  756. GNUNET_PQ_query_param_end
  757. };
  758. struct ProcessKeysContext pkc;
  759. pkc.proc = proc;
  760. pkc.proc_cls = proc_cls;
  761. (void) GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
  762. "get_keys",
  763. params,
  764. &process_keys,
  765. &pkc);
  766. proc (proc_cls,
  767. NULL,
  768. 0);
  769. }
  770. /**
  771. * Drop database.
  772. *
  773. * @param cls closure with the `struct Plugin *`
  774. */
  775. static void
  776. postgres_plugin_drop (void *cls)
  777. {
  778. struct Plugin *plugin = cls;
  779. struct GNUNET_PQ_ExecuteStatement es[] = {
  780. GNUNET_PQ_make_execute ("DROP TABLE gn090"),
  781. GNUNET_PQ_EXECUTE_STATEMENT_END
  782. };
  783. if (GNUNET_OK !=
  784. GNUNET_PQ_exec_statements (plugin->dbh,
  785. es))
  786. GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
  787. "postgres",
  788. _("Failed to drop table from database.\n"));
  789. }
  790. /**
  791. * Remove a particular key in the datastore.
  792. *
  793. * @param cls closure
  794. * @param key key for the content
  795. * @param size number of bytes in data
  796. * @param data content stored
  797. * @param cont continuation called with success or failure status
  798. * @param cont_cls continuation closure for @a cont
  799. */
  800. static void
  801. postgres_plugin_remove_key (void *cls,
  802. const struct GNUNET_HashCode *key,
  803. uint32_t size,
  804. const void *data,
  805. PluginRemoveCont cont,
  806. void *cont_cls)
  807. {
  808. struct Plugin *plugin = cls;
  809. enum GNUNET_DB_QueryStatus ret;
  810. struct GNUNET_PQ_QueryParam params[] = {
  811. GNUNET_PQ_query_param_auto_from_type (key),
  812. GNUNET_PQ_query_param_fixed_size (data, size),
  813. GNUNET_PQ_query_param_end
  814. };
  815. ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
  816. "remove",
  817. params);
  818. if (0 > ret)
  819. {
  820. cont (cont_cls,
  821. key,
  822. size,
  823. GNUNET_SYSERR,
  824. _("Postgress exec failure"));
  825. return;
  826. }
  827. if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == ret)
  828. {
  829. cont (cont_cls,
  830. key,
  831. size,
  832. GNUNET_NO,
  833. NULL);
  834. return;
  835. }
  836. plugin->env->duc (plugin->env->cls,
  837. - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
  838. GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
  839. "datastore-postgres",
  840. "Deleted %u bytes from database\n",
  841. (unsigned int) size);
  842. cont (cont_cls,
  843. key,
  844. size,
  845. GNUNET_OK,
  846. NULL);
  847. }
  848. /**
  849. * Entry point for the plugin.
  850. *
  851. * @param cls the `struct GNUNET_DATASTORE_PluginEnvironment*`
  852. * @return our `struct Plugin *`
  853. */
  854. void *
  855. libgnunet_plugin_datastore_postgres_init (void *cls)
  856. {
  857. struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
  858. struct GNUNET_DATASTORE_PluginFunctions *api;
  859. struct Plugin *plugin;
  860. plugin = GNUNET_new (struct Plugin);
  861. plugin->env = env;
  862. if (GNUNET_OK != init_connection (plugin))
  863. {
  864. GNUNET_free (plugin);
  865. return NULL;
  866. }
  867. api = GNUNET_new (struct GNUNET_DATASTORE_PluginFunctions);
  868. api->cls = plugin;
  869. api->estimate_size = &postgres_plugin_estimate_size;
  870. api->put = &postgres_plugin_put;
  871. api->get_key = &postgres_plugin_get_key;
  872. api->get_replication = &postgres_plugin_get_replication;
  873. api->get_expiration = &postgres_plugin_get_expiration;
  874. api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
  875. api->get_keys = &postgres_plugin_get_keys;
  876. api->drop = &postgres_plugin_drop;
  877. api->remove_key = &postgres_plugin_remove_key;
  878. GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
  879. "datastore-postgres",
  880. _("Postgres database running\n"));
  881. return api;
  882. }
  883. /**
  884. * Exit point from the plugin.
  885. *
  886. * @param cls our `struct Plugin *`
  887. * @return always NULL
  888. */
  889. void *
  890. libgnunet_plugin_datastore_postgres_done (void *cls)
  891. {
  892. struct GNUNET_DATASTORE_PluginFunctions *api = cls;
  893. struct Plugin *plugin = api->cls;
  894. PQfinish (plugin->dbh);
  895. GNUNET_free (plugin);
  896. GNUNET_free (api);
  897. return NULL;
  898. }
  899. /* end of plugin_datastore_postgres.c */