plugin_datastore_postgres.c 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980
  1. /*
  2. This file is part of GNUnet
  3. Copyright (C) 2009-2017 GNUnet e.V.
  4. GNUnet is free software: you can redistribute it and/or modify it
  5. under the terms of the GNU Affero General Public License as published
  6. by the Free Software Foundation, either version 3 of the License,
  7. or (at your option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Affero General Public License for more details.
  12. You should have received a copy of the GNU Affero General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. SPDX-License-Identifier: AGPL3.0-or-later
  15. */
  16. /**
  17. * @file datastore/plugin_datastore_postgres.c
  18. * @brief postgres-based datastore backend
  19. * @author Christian Grothoff
  20. */
  21. #include "platform.h"
  22. #include "gnunet_datastore_plugin.h"
  23. #include "gnunet_pq_lib.h"
  24. /**
  25. * After how many ms "busy" should a DB operation fail for good?
  26. * A low value makes sure that we are more responsive to requests
  27. * (especially PUTs). A high value guarantees a higher success
  28. * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
  29. *
  30. * The default value of 1s should ensure that users do not experience
  31. * huge latencies while at the same time allowing operations to succeed
  32. * with reasonable probability.
  33. */
  34. #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
  35. /**
  36. * Context for all functions in this plugin.
  37. */
  38. struct Plugin
  39. {
  40. /**
  41. * Our execution environment.
  42. */
  43. struct GNUNET_DATASTORE_PluginEnvironment *env;
  44. /**
  45. * Native Postgres database handle.
  46. */
  47. struct GNUNET_PQ_Context *dbh;
  48. };
  49. /**
  50. * @brief Get a database handle
  51. *
  52. * @param plugin global context
  53. * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
  54. */
  55. static int
  56. init_connection (struct Plugin *plugin)
  57. {
  58. struct GNUNET_PQ_ExecuteStatement es[] = {
  59. /* FIXME: PostgreSQL does not have unsigned integers! This is ok for the type column because
  60. * we only test equality on it and can cast it to/from uint32_t. For repl, prio, and anonLevel
  61. * we do math or inequality tests, so we can't handle the entire range of uint32_t.
  62. * This will also cause problems for expiration times after 294247-01-10-04:00:54 UTC.
  63. */
  64. GNUNET_PQ_make_try_execute (
  65. "CREATE SEQUENCE IF NOT EXISTS gn090_oid_seq"),
  66. GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS gn090 ("
  67. " repl INTEGER NOT NULL DEFAULT 0,"
  68. " type INTEGER NOT NULL DEFAULT 0,"
  69. " prio INTEGER NOT NULL DEFAULT 0,"
  70. " anonLevel INTEGER NOT NULL DEFAULT 0,"
  71. " expire BIGINT NOT NULL DEFAULT 0,"
  72. " rvalue BIGINT NOT NULL DEFAULT 0,"
  73. " hash BYTEA NOT NULL DEFAULT '',"
  74. " vhash BYTEA NOT NULL DEFAULT '',"
  75. " value BYTEA NOT NULL DEFAULT '',"
  76. " oid OID NOT NULL DEFAULT nextval('gn090_oid_seq'))"),
  77. GNUNET_PQ_make_try_execute (
  78. "ALTER SEQUENCE gn090_oid_seq OWNED BY gn090.oid"),
  79. GNUNET_PQ_make_try_execute (
  80. "CREATE INDEX IF NOT EXISTS oid_hash ON gn090 (oid)"),
  81. GNUNET_PQ_make_try_execute (
  82. "CREATE INDEX IF NOT EXISTS idx_hash ON gn090 (hash)"),
  83. GNUNET_PQ_make_try_execute (
  84. "CREATE INDEX IF NOT EXISTS idx_prio ON gn090 (prio)"),
  85. GNUNET_PQ_make_try_execute (
  86. "CREATE INDEX IF NOT EXISTS idx_expire ON gn090 (expire)"),
  87. GNUNET_PQ_make_try_execute (
  88. "CREATE INDEX IF NOT EXISTS idx_prio_anon ON gn090 (prio,anonLevel)"),
  89. GNUNET_PQ_make_try_execute (
  90. "CREATE INDEX IF NOT EXISTS idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)"),
  91. GNUNET_PQ_make_try_execute (
  92. "CREATE INDEX IF NOT EXISTS idx_repl_rvalue ON gn090 (repl,rvalue)"),
  93. GNUNET_PQ_make_try_execute (
  94. "CREATE INDEX IF NOT EXISTS idx_expire_hash ON gn090 (expire,hash)"),
  95. GNUNET_PQ_make_execute (
  96. "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL"),
  97. GNUNET_PQ_make_execute ("ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN"),
  98. GNUNET_PQ_make_execute ("ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN"),
  99. GNUNET_PQ_EXECUTE_STATEMENT_END
  100. };
  101. #define RESULT_COLUMNS "repl, type, prio, anonLevel, expire, hash, value, oid"
  102. struct GNUNET_PQ_PreparedStatement ps[] = {
  103. GNUNET_PQ_make_prepare ("get",
  104. "SELECT " RESULT_COLUMNS " FROM gn090"
  105. " WHERE oid >= $1::bigint AND"
  106. " (rvalue >= $2 OR 0 = $3::smallint) AND"
  107. " (hash = $4 OR 0 = $5::smallint) AND"
  108. " (type = $6 OR 0 = $7::smallint)"
  109. " ORDER BY oid ASC LIMIT 1",
  110. 7),
  111. GNUNET_PQ_make_prepare ("put",
  112. "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
  113. "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
  114. 9),
  115. GNUNET_PQ_make_prepare ("update",
  116. "UPDATE gn090"
  117. " SET prio = prio + $1,"
  118. " repl = repl + $2,"
  119. " expire = GREATEST(expire, $3)"
  120. " WHERE hash = $4 AND vhash = $5",
  121. 5),
  122. GNUNET_PQ_make_prepare ("decrepl",
  123. "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
  124. "WHERE oid = $1",
  125. 1),
  126. GNUNET_PQ_make_prepare ("select_non_anonymous",
  127. "SELECT " RESULT_COLUMNS " FROM gn090 "
  128. "WHERE anonLevel = 0 AND type = $1 AND oid >= $2::bigint "
  129. "ORDER BY oid ASC LIMIT 1",
  130. 2),
  131. GNUNET_PQ_make_prepare ("select_expiration_order",
  132. "(SELECT " RESULT_COLUMNS " FROM gn090 "
  133. "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) "
  134. "UNION "
  135. "(SELECT " RESULT_COLUMNS " FROM gn090 "
  136. "ORDER BY prio ASC LIMIT 1) "
  137. "ORDER BY expire ASC LIMIT 1",
  138. 1),
  139. GNUNET_PQ_make_prepare ("select_replication_order",
  140. "SELECT " RESULT_COLUMNS " FROM gn090 "
  141. "ORDER BY repl DESC,RANDOM() LIMIT 1",
  142. 0),
  143. GNUNET_PQ_make_prepare ("delrow",
  144. "DELETE FROM gn090 "
  145. "WHERE oid=$1",
  146. 1),
  147. GNUNET_PQ_make_prepare ("remove",
  148. "DELETE FROM gn090"
  149. " WHERE hash = $1 AND"
  150. " value = $2",
  151. 2),
  152. GNUNET_PQ_make_prepare ("get_keys",
  153. "SELECT hash FROM gn090",
  154. 0),
  155. GNUNET_PQ_make_prepare ("estimate_size",
  156. "SELECT CASE WHEN NOT EXISTS"
  157. " (SELECT 1 FROM gn090)"
  158. " THEN 0"
  159. " ELSE (SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090)"
  160. "END AS total",
  161. 0),
  162. GNUNET_PQ_PREPARED_STATEMENT_END
  163. };
  164. #undef RESULT_COLUMNS
  165. plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->env->cfg,
  166. "datastore-postgres",
  167. NULL,
  168. es,
  169. ps);
  170. if (NULL == plugin->dbh)
  171. return GNUNET_SYSERR;
  172. return GNUNET_OK;
  173. }
  174. /**
  175. * Get an estimate of how much space the database is
  176. * currently using.
  177. *
  178. * @param cls our `struct Plugin *`
  179. * @return number of bytes used on disk
  180. */
  181. static void
  182. postgres_plugin_estimate_size (void *cls,
  183. unsigned long long *estimate)
  184. {
  185. struct Plugin *plugin = cls;
  186. uint64_t total;
  187. struct GNUNET_PQ_QueryParam params[] = {
  188. GNUNET_PQ_query_param_end
  189. };
  190. struct GNUNET_PQ_ResultSpec rs[] = {
  191. GNUNET_PQ_result_spec_uint64 ("total",
  192. &total),
  193. GNUNET_PQ_result_spec_end
  194. };
  195. enum GNUNET_DB_QueryStatus ret;
  196. if (NULL == estimate)
  197. return;
  198. ret = GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh,
  199. "estimate_size",
  200. params,
  201. rs);
  202. if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != ret)
  203. {
  204. *estimate = 0LL;
  205. return;
  206. }
  207. *estimate = total;
  208. }
  209. /**
  210. * Store an item in the datastore.
  211. *
  212. * @param cls closure with the `struct Plugin`
  213. * @param key key for the item
  214. * @param absent true if the key was not found in the bloom filter
  215. * @param size number of bytes in data
  216. * @param data content stored
  217. * @param type type of the content
  218. * @param priority priority of the content
  219. * @param anonymity anonymity-level for the content
  220. * @param replication replication-level for the content
  221. * @param expiration expiration time for the content
  222. * @param cont continuation called with success or failure status
  223. * @param cont_cls continuation closure
  224. */
  225. static void
  226. postgres_plugin_put (void *cls,
  227. const struct GNUNET_HashCode *key,
  228. bool absent,
  229. uint32_t size,
  230. const void *data,
  231. enum GNUNET_BLOCK_Type type,
  232. uint32_t priority,
  233. uint32_t anonymity,
  234. uint32_t replication,
  235. struct GNUNET_TIME_Absolute expiration,
  236. PluginPutCont cont,
  237. void *cont_cls)
  238. {
  239. struct Plugin *plugin = cls;
  240. struct GNUNET_HashCode vhash;
  241. enum GNUNET_DB_QueryStatus ret;
  242. GNUNET_CRYPTO_hash (data,
  243. size,
  244. &vhash);
  245. if (! absent)
  246. {
  247. struct GNUNET_PQ_QueryParam params[] = {
  248. GNUNET_PQ_query_param_uint32 (&priority),
  249. GNUNET_PQ_query_param_uint32 (&replication),
  250. GNUNET_PQ_query_param_absolute_time (&expiration),
  251. GNUNET_PQ_query_param_auto_from_type (key),
  252. GNUNET_PQ_query_param_auto_from_type (&vhash),
  253. GNUNET_PQ_query_param_end
  254. };
  255. ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
  256. "update",
  257. params);
  258. if (0 > ret)
  259. {
  260. cont (cont_cls,
  261. key,
  262. size,
  263. GNUNET_SYSERR,
  264. _ ("Postgress exec failure"));
  265. return;
  266. }
  267. bool affected = (0 != ret);
  268. if (affected)
  269. {
  270. cont (cont_cls,
  271. key,
  272. size,
  273. GNUNET_NO,
  274. NULL);
  275. return;
  276. }
  277. }
  278. {
  279. uint32_t utype = (uint32_t) type;
  280. uint64_t rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
  281. UINT64_MAX);
  282. struct GNUNET_PQ_QueryParam params[] = {
  283. GNUNET_PQ_query_param_uint32 (&replication),
  284. GNUNET_PQ_query_param_uint32 (&utype),
  285. GNUNET_PQ_query_param_uint32 (&priority),
  286. GNUNET_PQ_query_param_uint32 (&anonymity),
  287. GNUNET_PQ_query_param_absolute_time (&expiration),
  288. GNUNET_PQ_query_param_uint64 (&rvalue),
  289. GNUNET_PQ_query_param_auto_from_type (key),
  290. GNUNET_PQ_query_param_auto_from_type (&vhash),
  291. GNUNET_PQ_query_param_fixed_size (data, size),
  292. GNUNET_PQ_query_param_end
  293. };
  294. ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
  295. "put",
  296. params);
  297. if (0 > ret)
  298. {
  299. cont (cont_cls,
  300. key,
  301. size,
  302. GNUNET_SYSERR,
  303. "Postgress exec failure");
  304. return;
  305. }
  306. }
  307. plugin->env->duc (plugin->env->cls,
  308. size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
  309. GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
  310. "datastore-postgres",
  311. "Stored %u bytes in database\n",
  312. (unsigned int) size);
  313. cont (cont_cls,
  314. key,
  315. size,
  316. GNUNET_OK,
  317. NULL);
  318. }
  319. /**
  320. * Closure for #process_result.
  321. */
  322. struct ProcessResultContext
  323. {
  324. /**
  325. * The plugin handle.
  326. */
  327. struct Plugin *plugin;
  328. /**
  329. * Function to call on each result.
  330. */
  331. PluginDatumProcessor proc;
  332. /**
  333. * Closure for @e proc.
  334. */
  335. void *proc_cls;
  336. };
  337. /**
  338. * Function invoked to process the result and call the processor of @a
  339. * cls.
  340. *
  341. * @param cls our `struct ProcessResultContext`
  342. * @param res result from exec
  343. * @param num_results number of results in @a res
  344. */
  345. static void
  346. process_result (void *cls,
  347. PGresult *res,
  348. unsigned int num_results)
  349. {
  350. struct ProcessResultContext *prc = cls;
  351. struct Plugin *plugin = prc->plugin;
  352. if (0 == num_results)
  353. {
  354. /* no result */
  355. GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
  356. "datastore-postgres",
  357. "Ending iteration (no more results)\n");
  358. prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
  359. GNUNET_TIME_UNIT_ZERO_ABS, 0);
  360. return;
  361. }
  362. if (1 != num_results)
  363. {
  364. GNUNET_break (0);
  365. prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
  366. GNUNET_TIME_UNIT_ZERO_ABS, 0);
  367. return;
  368. }
  369. /* Technically we don't need the loop here, but nicer in case
  370. we ever relax the condition above. */
  371. for (unsigned int i = 0; i < num_results; i++)
  372. {
  373. int iret;
  374. uint32_t rowid;
  375. uint32_t utype;
  376. uint32_t anonymity;
  377. uint32_t replication;
  378. uint32_t priority;
  379. size_t size;
  380. void *data;
  381. struct GNUNET_TIME_Absolute expiration_time;
  382. struct GNUNET_HashCode key;
  383. struct GNUNET_PQ_ResultSpec rs[] = {
  384. GNUNET_PQ_result_spec_uint32 ("repl", &replication),
  385. GNUNET_PQ_result_spec_uint32 ("type", &utype),
  386. GNUNET_PQ_result_spec_uint32 ("prio", &priority),
  387. GNUNET_PQ_result_spec_uint32 ("anonLevel", &anonymity),
  388. GNUNET_PQ_result_spec_absolute_time ("expire", &expiration_time),
  389. GNUNET_PQ_result_spec_auto_from_type ("hash", &key),
  390. GNUNET_PQ_result_spec_variable_size ("value", &data, &size),
  391. GNUNET_PQ_result_spec_uint32 ("oid", &rowid),
  392. GNUNET_PQ_result_spec_end
  393. };
  394. if (GNUNET_OK !=
  395. GNUNET_PQ_extract_result (res,
  396. rs,
  397. i))
  398. {
  399. GNUNET_break (0);
  400. prc->proc (prc->proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
  401. GNUNET_TIME_UNIT_ZERO_ABS, 0);
  402. return;
  403. }
  404. GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
  405. "datastore-postgres",
  406. "Found result of size %u bytes and type %u in database\n",
  407. (unsigned int) size,
  408. (unsigned int) utype);
  409. iret = prc->proc (prc->proc_cls,
  410. &key,
  411. size,
  412. data,
  413. (enum GNUNET_BLOCK_Type) utype,
  414. priority,
  415. anonymity,
  416. replication,
  417. expiration_time,
  418. rowid);
  419. if (iret == GNUNET_NO)
  420. {
  421. struct GNUNET_PQ_QueryParam param[] = {
  422. GNUNET_PQ_query_param_uint32 (&rowid),
  423. GNUNET_PQ_query_param_end
  424. };
  425. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  426. "Processor asked for item %u to be removed.\n",
  427. (unsigned int) rowid);
  428. if (0 <
  429. GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
  430. "delrow",
  431. param))
  432. {
  433. GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
  434. "datastore-postgres",
  435. "Deleting %u bytes from database\n",
  436. (unsigned int) size);
  437. plugin->env->duc (plugin->env->cls,
  438. -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
  439. GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
  440. "datastore-postgres",
  441. "Deleted %u bytes from database\n",
  442. (unsigned int) size);
  443. }
  444. }
  445. GNUNET_PQ_cleanup_result (rs);
  446. } /* for (i) */
  447. }
  448. /**
  449. * Get one of the results for a particular key in the datastore.
  450. *
  451. * @param cls closure with the `struct Plugin`
  452. * @param next_uid return the result with lowest uid >= next_uid
  453. * @param random if true, return a random result instead of using next_uid
  454. * @param key maybe NULL (to match all entries)
  455. * @param type entries of which type are relevant?
  456. * Use 0 for any type.
  457. * @param proc function to call on the matching value;
  458. * will be called with NULL if nothing matches
  459. * @param proc_cls closure for @a proc
  460. */
  461. static void
  462. postgres_plugin_get_key (void *cls,
  463. uint64_t next_uid,
  464. bool random,
  465. const struct GNUNET_HashCode *key,
  466. enum GNUNET_BLOCK_Type type,
  467. PluginDatumProcessor proc,
  468. void *proc_cls)
  469. {
  470. struct Plugin *plugin = cls;
  471. uint32_t utype = type;
  472. uint16_t use_rvalue = random;
  473. uint16_t use_key = NULL != key;
  474. uint16_t use_type = GNUNET_BLOCK_TYPE_ANY != type;
  475. uint64_t rvalue;
  476. struct GNUNET_PQ_QueryParam params[] = {
  477. GNUNET_PQ_query_param_uint64 (&next_uid),
  478. GNUNET_PQ_query_param_uint64 (&rvalue),
  479. GNUNET_PQ_query_param_uint16 (&use_rvalue),
  480. GNUNET_PQ_query_param_auto_from_type (key),
  481. GNUNET_PQ_query_param_uint16 (&use_key),
  482. GNUNET_PQ_query_param_uint32 (&utype),
  483. GNUNET_PQ_query_param_uint16 (&use_type),
  484. GNUNET_PQ_query_param_end
  485. };
  486. struct ProcessResultContext prc;
  487. enum GNUNET_DB_QueryStatus res;
  488. if (random)
  489. {
  490. rvalue = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
  491. UINT64_MAX);
  492. next_uid = 0;
  493. }
  494. else
  495. {
  496. rvalue = 0;
  497. }
  498. prc.plugin = plugin;
  499. prc.proc = proc;
  500. prc.proc_cls = proc_cls;
  501. res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
  502. "get",
  503. params,
  504. &process_result,
  505. &prc);
  506. if (0 > res)
  507. proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
  508. GNUNET_TIME_UNIT_ZERO_ABS, 0);
  509. }
  510. /**
  511. * Select a subset of the items in the datastore and call
  512. * the given iterator for each of them.
  513. *
  514. * @param cls our `struct Plugin *`
  515. * @param next_uid return the result with lowest uid >= next_uid
  516. * @param type entries of which type should be considered?
  517. * Must not be zero (ANY).
  518. * @param proc function to call on the matching value;
  519. * will be called with NULL if no value matches
  520. * @param proc_cls closure for @a proc
  521. */
  522. static void
  523. postgres_plugin_get_zero_anonymity (void *cls,
  524. uint64_t next_uid,
  525. enum GNUNET_BLOCK_Type type,
  526. PluginDatumProcessor proc,
  527. void *proc_cls)
  528. {
  529. struct Plugin *plugin = cls;
  530. uint32_t utype = type;
  531. struct GNUNET_PQ_QueryParam params[] = {
  532. GNUNET_PQ_query_param_uint32 (&utype),
  533. GNUNET_PQ_query_param_uint64 (&next_uid),
  534. GNUNET_PQ_query_param_end
  535. };
  536. struct ProcessResultContext prc;
  537. enum GNUNET_DB_QueryStatus res;
  538. prc.plugin = plugin;
  539. prc.proc = proc;
  540. prc.proc_cls = proc_cls;
  541. res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
  542. "select_non_anonymous",
  543. params,
  544. &process_result,
  545. &prc);
  546. if (0 > res)
  547. proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
  548. GNUNET_TIME_UNIT_ZERO_ABS, 0);
  549. }
  550. /**
  551. * Context for #repl_iter() function.
  552. */
  553. struct ReplCtx
  554. {
  555. /**
  556. * Plugin handle.
  557. */
  558. struct Plugin *plugin;
  559. /**
  560. * Function to call for the result (or the NULL).
  561. */
  562. PluginDatumProcessor proc;
  563. /**
  564. * Closure for @e proc.
  565. */
  566. void *proc_cls;
  567. };
  568. /**
  569. * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
  570. * Decrements the replication counter and calls the original
  571. * iterator.
  572. *
  573. * @param cls closure with the `struct ReplCtx *`
  574. * @param key key for the content
  575. * @param size number of bytes in @a data
  576. * @param data content stored
  577. * @param type type of the content
  578. * @param priority priority of the content
  579. * @param anonymity anonymity-level for the content
  580. * @param replication replication-level for the content
  581. * @param expiration expiration time for the content
  582. * @param uid unique identifier for the datum;
  583. * maybe 0 if no unique identifier is available
  584. * @return #GNUNET_SYSERR to abort the iteration,
  585. * #GNUNET_OK to continue
  586. * (continue on call to "next", of course),
  587. * #GNUNET_NO to delete the item and continue (if supported)
  588. */
  589. static int
  590. repl_proc (void *cls,
  591. const struct GNUNET_HashCode *key,
  592. uint32_t size,
  593. const void *data,
  594. enum GNUNET_BLOCK_Type type,
  595. uint32_t priority,
  596. uint32_t anonymity,
  597. uint32_t replication,
  598. struct GNUNET_TIME_Absolute expiration,
  599. uint64_t uid)
  600. {
  601. struct ReplCtx *rc = cls;
  602. struct Plugin *plugin = rc->plugin;
  603. int ret;
  604. uint32_t oid = (uint32_t) uid;
  605. struct GNUNET_PQ_QueryParam params[] = {
  606. GNUNET_PQ_query_param_uint32 (&oid),
  607. GNUNET_PQ_query_param_end
  608. };
  609. enum GNUNET_DB_QueryStatus qret;
  610. ret = rc->proc (rc->proc_cls,
  611. key,
  612. size,
  613. data,
  614. type,
  615. priority,
  616. anonymity,
  617. replication,
  618. expiration,
  619. uid);
  620. if (NULL == key)
  621. return ret;
  622. qret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
  623. "decrepl",
  624. params);
  625. if (0 > qret)
  626. return GNUNET_SYSERR;
  627. return ret;
  628. }
  629. /**
  630. * Get a random item for replication. Returns a single, not expired,
  631. * random item from those with the highest replication counters. The
  632. * item's replication counter is decremented by one IF it was positive
  633. * before. Call @a proc with all values ZERO or NULL if the datastore
  634. * is empty.
  635. *
  636. * @param cls closure with the `struct Plugin`
  637. * @param proc function to call the value (once only).
  638. * @param proc_cls closure for @a proc
  639. */
  640. static void
  641. postgres_plugin_get_replication (void *cls,
  642. PluginDatumProcessor proc,
  643. void *proc_cls)
  644. {
  645. struct Plugin *plugin = cls;
  646. struct GNUNET_PQ_QueryParam params[] = {
  647. GNUNET_PQ_query_param_end
  648. };
  649. struct ReplCtx rc;
  650. struct ProcessResultContext prc;
  651. enum GNUNET_DB_QueryStatus res;
  652. rc.plugin = plugin;
  653. rc.proc = proc;
  654. rc.proc_cls = proc_cls;
  655. prc.plugin = plugin;
  656. prc.proc = &repl_proc;
  657. prc.proc_cls = &rc;
  658. res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
  659. "select_replication_order",
  660. params,
  661. &process_result,
  662. &prc);
  663. if (0 > res)
  664. proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0,
  665. GNUNET_TIME_UNIT_ZERO_ABS, 0);
  666. }
  667. /**
  668. * Get a random item for expiration. Call @a proc with all values
  669. * ZERO or NULL if the datastore is empty.
  670. *
  671. * @param cls closure with the `struct Plugin`
  672. * @param proc function to call the value (once only).
  673. * @param proc_cls closure for @a proc
  674. */
  675. static void
  676. postgres_plugin_get_expiration (void *cls,
  677. PluginDatumProcessor proc,
  678. void *proc_cls)
  679. {
  680. struct Plugin *plugin = cls;
  681. struct GNUNET_TIME_Absolute now;
  682. struct GNUNET_PQ_QueryParam params[] = {
  683. GNUNET_PQ_query_param_absolute_time (&now),
  684. GNUNET_PQ_query_param_end
  685. };
  686. struct ProcessResultContext prc;
  687. now = GNUNET_TIME_absolute_get ();
  688. prc.plugin = plugin;
  689. prc.proc = proc;
  690. prc.proc_cls = proc_cls;
  691. (void) GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
  692. "select_expiration_order",
  693. params,
  694. &process_result,
  695. &prc);
  696. }
  697. /**
  698. * Closure for #process_keys.
  699. */
  700. struct ProcessKeysContext
  701. {
  702. /**
  703. * Function to call for each key.
  704. */
  705. PluginKeyProcessor proc;
  706. /**
  707. * Closure for @e proc.
  708. */
  709. void *proc_cls;
  710. };
  711. /**
  712. * Function to be called with the results of a SELECT statement
  713. * that has returned @a num_results results.
  714. *
  715. * @param cls closure with a `struct ProcessKeysContext`
  716. * @param result the postgres result
  717. * @param num_result the number of results in @a result
  718. */
  719. static void
  720. process_keys (void *cls,
  721. PGresult *result,
  722. unsigned int num_results)
  723. {
  724. struct ProcessKeysContext *pkc = cls;
  725. for (unsigned i = 0; i < num_results; i++)
  726. {
  727. struct GNUNET_HashCode key;
  728. struct GNUNET_PQ_ResultSpec rs[] = {
  729. GNUNET_PQ_result_spec_auto_from_type ("hash",
  730. &key),
  731. GNUNET_PQ_result_spec_end
  732. };
  733. if (GNUNET_OK !=
  734. GNUNET_PQ_extract_result (result,
  735. rs,
  736. i))
  737. {
  738. GNUNET_break (0);
  739. continue;
  740. }
  741. pkc->proc (pkc->proc_cls,
  742. &key,
  743. 1);
  744. GNUNET_PQ_cleanup_result (rs);
  745. }
  746. }
  747. /**
  748. * Get all of the keys in the datastore.
  749. *
  750. * @param cls closure with the `struct Plugin *`
  751. * @param proc function to call on each key
  752. * @param proc_cls closure for @a proc
  753. */
  754. static void
  755. postgres_plugin_get_keys (void *cls,
  756. PluginKeyProcessor proc,
  757. void *proc_cls)
  758. {
  759. struct Plugin *plugin = cls;
  760. struct GNUNET_PQ_QueryParam params[] = {
  761. GNUNET_PQ_query_param_end
  762. };
  763. struct ProcessKeysContext pkc;
  764. pkc.proc = proc;
  765. pkc.proc_cls = proc_cls;
  766. (void) GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
  767. "get_keys",
  768. params,
  769. &process_keys,
  770. &pkc);
  771. proc (proc_cls,
  772. NULL,
  773. 0);
  774. }
  775. /**
  776. * Drop database.
  777. *
  778. * @param cls closure with the `struct Plugin *`
  779. */
  780. static void
  781. postgres_plugin_drop (void *cls)
  782. {
  783. struct Plugin *plugin = cls;
  784. struct GNUNET_PQ_ExecuteStatement es[] = {
  785. GNUNET_PQ_make_execute ("DROP TABLE gn090"),
  786. GNUNET_PQ_EXECUTE_STATEMENT_END
  787. };
  788. if (GNUNET_OK !=
  789. GNUNET_PQ_exec_statements (plugin->dbh,
  790. es))
  791. GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
  792. "postgres",
  793. _ ("Failed to drop table from database.\n"));
  794. }
  795. /**
  796. * Remove a particular key in the datastore.
  797. *
  798. * @param cls closure
  799. * @param key key for the content
  800. * @param size number of bytes in data
  801. * @param data content stored
  802. * @param cont continuation called with success or failure status
  803. * @param cont_cls continuation closure for @a cont
  804. */
  805. static void
  806. postgres_plugin_remove_key (void *cls,
  807. const struct GNUNET_HashCode *key,
  808. uint32_t size,
  809. const void *data,
  810. PluginRemoveCont cont,
  811. void *cont_cls)
  812. {
  813. struct Plugin *plugin = cls;
  814. enum GNUNET_DB_QueryStatus ret;
  815. struct GNUNET_PQ_QueryParam params[] = {
  816. GNUNET_PQ_query_param_auto_from_type (key),
  817. GNUNET_PQ_query_param_fixed_size (data, size),
  818. GNUNET_PQ_query_param_end
  819. };
  820. ret = GNUNET_PQ_eval_prepared_non_select (plugin->dbh,
  821. "remove",
  822. params);
  823. if (0 > ret)
  824. {
  825. cont (cont_cls,
  826. key,
  827. size,
  828. GNUNET_SYSERR,
  829. _ ("Postgress exec failure"));
  830. return;
  831. }
  832. if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == ret)
  833. {
  834. cont (cont_cls,
  835. key,
  836. size,
  837. GNUNET_NO,
  838. NULL);
  839. return;
  840. }
  841. plugin->env->duc (plugin->env->cls,
  842. -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
  843. GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
  844. "datastore-postgres",
  845. "Deleted %u bytes from database\n",
  846. (unsigned int) size);
  847. cont (cont_cls,
  848. key,
  849. size,
  850. GNUNET_OK,
  851. NULL);
  852. }
  853. /**
  854. * Entry point for the plugin.
  855. *
  856. * @param cls the `struct GNUNET_DATASTORE_PluginEnvironment*`
  857. * @return our `struct Plugin *`
  858. */
  859. void *
  860. libgnunet_plugin_datastore_postgres_init (void *cls)
  861. {
  862. struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
  863. struct GNUNET_DATASTORE_PluginFunctions *api;
  864. struct Plugin *plugin;
  865. plugin = GNUNET_new (struct Plugin);
  866. plugin->env = env;
  867. if (GNUNET_OK != init_connection (plugin))
  868. {
  869. GNUNET_free (plugin);
  870. return NULL;
  871. }
  872. api = GNUNET_new (struct GNUNET_DATASTORE_PluginFunctions);
  873. api->cls = plugin;
  874. api->estimate_size = &postgres_plugin_estimate_size;
  875. api->put = &postgres_plugin_put;
  876. api->get_key = &postgres_plugin_get_key;
  877. api->get_replication = &postgres_plugin_get_replication;
  878. api->get_expiration = &postgres_plugin_get_expiration;
  879. api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
  880. api->get_keys = &postgres_plugin_get_keys;
  881. api->drop = &postgres_plugin_drop;
  882. api->remove_key = &postgres_plugin_remove_key;
  883. GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
  884. "datastore-postgres",
  885. _ ("Postgres database running\n"));
  886. return api;
  887. }
  888. /**
  889. * Exit point from the plugin.
  890. *
  891. * @param cls our `struct Plugin *`
  892. * @return always NULL
  893. */
  894. void *
  895. libgnunet_plugin_datastore_postgres_done (void *cls)
  896. {
  897. struct GNUNET_DATASTORE_PluginFunctions *api = cls;
  898. struct Plugin *plugin = api->cls;
  899. GNUNET_PQ_disconnect (plugin->dbh);
  900. GNUNET_free (plugin);
  901. GNUNET_free (api);
  902. return NULL;
  903. }
  904. /* end of plugin_datastore_postgres.c */