plugin_datastore_postgres.c 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890
  1. /*
  2. This file is part of GNUnet
  3. (C) 2009-2013 Christian Grothoff (and other contributing authors)
  4. GNUnet is free software; you can redistribute it and/or modify
  5. it under the terms of the GNU General Public License as published
  6. by the Free Software Foundation; either version 3, or (at your
  7. option) any later version.
  8. GNUnet is distributed in the hope that it will be useful, but
  9. WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. General Public License for more details.
  12. You should have received a copy of the GNU General Public License
  13. along with GNUnet; see the file COPYING. If not, write to the
  14. Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  15. Boston, MA 02111-1307, USA.
  16. */
  17. /**
  18. * @file datastore/plugin_datastore_postgres.c
  19. * @brief postgres-based datastore backend
  20. * @author Christian Grothoff
  21. */
  22. #include "platform.h"
  23. #include "gnunet_datastore_plugin.h"
  24. #include "gnunet_postgres_lib.h"
  25. /**
  26. * After how many ms "busy" should a DB operation fail for good?
  27. * A low value makes sure that we are more responsive to requests
  28. * (especially PUTs). A high value guarantees a higher success
  29. * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
  30. *
  31. * The default value of 1s should ensure that users do not experience
  32. * huge latencies while at the same time allowing operations to succeed
  33. * with reasonable probability.
  34. */
  35. #define BUSY_TIMEOUT GNUNET_TIME_UNIT_SECONDS
  36. /**
  37. * Context for all functions in this plugin.
  38. */
  39. struct Plugin
  40. {
  41. /**
  42. * Our execution environment.
  43. */
  44. struct GNUNET_DATASTORE_PluginEnvironment *env;
  45. /**
  46. * Native Postgres database handle.
  47. */
  48. PGconn *dbh;
  49. };
  50. /**
  51. * @brief Get a database handle
  52. *
  53. * @param plugin global context
  54. * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
  55. */
  56. static int
  57. init_connection (struct Plugin *plugin)
  58. {
  59. PGresult *ret;
  60. plugin->dbh = GNUNET_POSTGRES_connect (plugin->env->cfg, "datastore-postgres");
  61. if (NULL == plugin->dbh)
  62. return GNUNET_SYSERR;
  63. ret =
  64. PQexec (plugin->dbh,
  65. "CREATE TABLE gn090 (" " repl INTEGER NOT NULL DEFAULT 0,"
  66. " type INTEGER NOT NULL DEFAULT 0,"
  67. " prio INTEGER NOT NULL DEFAULT 0,"
  68. " anonLevel INTEGER NOT NULL DEFAULT 0,"
  69. " expire BIGINT NOT NULL DEFAULT 0,"
  70. " rvalue BIGINT NOT NULL DEFAULT 0,"
  71. " hash BYTEA NOT NULL DEFAULT '',"
  72. " vhash BYTEA NOT NULL DEFAULT '',"
  73. " value BYTEA NOT NULL DEFAULT '')" "WITH OIDS");
  74. if ( (NULL == ret) ||
  75. ((PQresultStatus (ret) != PGRES_COMMAND_OK) &&
  76. (0 != strcmp ("42P07", /* duplicate table */
  77. PQresultErrorField
  78. (ret,
  79. PG_DIAG_SQLSTATE)))))
  80. {
  81. (void) GNUNET_POSTGRES_check_result (plugin->dbh,
  82. ret,
  83. PGRES_COMMAND_OK,
  84. "CREATE TABLE",
  85. "gn090");
  86. PQfinish (plugin->dbh);
  87. plugin->dbh = NULL;
  88. return GNUNET_SYSERR;
  89. }
  90. if (PQresultStatus (ret) == PGRES_COMMAND_OK)
  91. {
  92. if ((GNUNET_OK !=
  93. GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_hash ON gn090 (hash)")) ||
  94. (GNUNET_OK !=
  95. GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)")) ||
  96. (GNUNET_OK !=
  97. GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_prio ON gn090 (prio)")) ||
  98. (GNUNET_OK !=
  99. GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_expire ON gn090 (expire)")) ||
  100. (GNUNET_OK !=
  101. GNUNET_POSTGRES_exec (plugin->dbh,
  102. "CREATE INDEX idx_prio_anon ON gn090 (prio,anonLevel)")) ||
  103. (GNUNET_OK !=
  104. GNUNET_POSTGRES_exec (plugin->dbh,
  105. "CREATE INDEX idx_prio_hash_anon ON gn090 (prio,hash,anonLevel)")) ||
  106. (GNUNET_OK !=
  107. GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_repl_rvalue ON gn090 (repl,rvalue)")) ||
  108. (GNUNET_OK !=
  109. GNUNET_POSTGRES_exec (plugin->dbh, "CREATE INDEX idx_expire_hash ON gn090 (expire,hash)")))
  110. {
  111. PQclear (ret);
  112. PQfinish (plugin->dbh);
  113. plugin->dbh = NULL;
  114. return GNUNET_SYSERR;
  115. }
  116. }
  117. PQclear (ret);
  118. ret =
  119. PQexec (plugin->dbh,
  120. "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
  121. if (GNUNET_OK !=
  122. GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
  123. {
  124. PQfinish (plugin->dbh);
  125. plugin->dbh = NULL;
  126. return GNUNET_SYSERR;
  127. }
  128. PQclear (ret);
  129. ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER hash SET STORAGE PLAIN");
  130. if (GNUNET_OK !=
  131. GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
  132. {
  133. PQfinish (plugin->dbh);
  134. plugin->dbh = NULL;
  135. return GNUNET_SYSERR;
  136. }
  137. PQclear (ret);
  138. ret = PQexec (plugin->dbh, "ALTER TABLE gn090 ALTER vhash SET STORAGE PLAIN");
  139. if (GNUNET_OK !=
  140. GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "ALTER TABLE", "gn090"))
  141. {
  142. PQfinish (plugin->dbh);
  143. plugin->dbh = NULL;
  144. return GNUNET_SYSERR;
  145. }
  146. PQclear (ret);
  147. if ((GNUNET_OK !=
  148. GNUNET_POSTGRES_prepare (plugin->dbh, "getvt",
  149. "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
  150. "WHERE hash=$1 AND vhash=$2 AND type=$3 "
  151. "ORDER BY oid ASC LIMIT 1 OFFSET $4", 4)) ||
  152. (GNUNET_OK !=
  153. GNUNET_POSTGRES_prepare (plugin->dbh, "gett",
  154. "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
  155. "WHERE hash=$1 AND type=$2 "
  156. "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
  157. (GNUNET_OK !=
  158. GNUNET_POSTGRES_prepare (plugin->dbh, "getv",
  159. "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
  160. "WHERE hash=$1 AND vhash=$2 "
  161. "ORDER BY oid ASC LIMIT 1 OFFSET $3", 3)) ||
  162. (GNUNET_OK !=
  163. GNUNET_POSTGRES_prepare (plugin->dbh, "get",
  164. "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
  165. "WHERE hash=$1 " "ORDER BY oid ASC LIMIT 1 OFFSET $2", 2)) ||
  166. (GNUNET_OK !=
  167. GNUNET_POSTGRES_prepare (plugin->dbh, "put",
  168. "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, rvalue, hash, vhash, value) "
  169. "VALUES ($1, $2, $3, $4, $5, RANDOM(), $6, $7, $8)", 9)) ||
  170. (GNUNET_OK !=
  171. GNUNET_POSTGRES_prepare (plugin->dbh, "update",
  172. "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END "
  173. "WHERE oid = $3", 3)) ||
  174. (GNUNET_OK !=
  175. GNUNET_POSTGRES_prepare (plugin->dbh, "decrepl",
  176. "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
  177. "WHERE oid = $1", 1)) ||
  178. (GNUNET_OK !=
  179. GNUNET_POSTGRES_prepare (plugin->dbh, "select_non_anonymous",
  180. "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
  181. "WHERE anonLevel = 0 AND type = $1 ORDER BY oid DESC LIMIT 1 OFFSET $2",
  182. 1)) ||
  183. (GNUNET_OK !=
  184. GNUNET_POSTGRES_prepare (plugin->dbh, "select_expiration_order",
  185. "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
  186. "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) " "UNION "
  187. "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
  188. "ORDER BY prio ASC LIMIT 1) " "ORDER BY expire ASC LIMIT 1",
  189. 1)) ||
  190. (GNUNET_OK !=
  191. GNUNET_POSTGRES_prepare (plugin->dbh, "select_replication_order",
  192. "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
  193. "ORDER BY repl DESC,RANDOM() LIMIT 1", 0)) ||
  194. (GNUNET_OK !=
  195. GNUNET_POSTGRES_prepare (plugin->dbh, "delrow", "DELETE FROM gn090 " "WHERE oid=$1", 1)) ||
  196. (GNUNET_OK !=
  197. GNUNET_POSTGRES_prepare (plugin->dbh, "get_keys", "SELECT hash FROM gn090", 0)))
  198. {
  199. PQfinish (plugin->dbh);
  200. plugin->dbh = NULL;
  201. return GNUNET_SYSERR;
  202. }
  203. return GNUNET_OK;
  204. }
  205. /**
  206. * Get an estimate of how much space the database is
  207. * currently using.
  208. *
  209. * @param cls our `struct Plugin *`
  210. * @return number of bytes used on disk
  211. */
  212. static unsigned long long
  213. postgres_plugin_estimate_size (void *cls)
  214. {
  215. struct Plugin *plugin = cls;
  216. unsigned long long total;
  217. PGresult *ret;
  218. ret =
  219. PQexecParams (plugin->dbh,
  220. "SELECT SUM(LENGTH(value))+256*COUNT(*) FROM gn090", 0,
  221. NULL, NULL, NULL, NULL, 1);
  222. if (GNUNET_OK !=
  223. GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_TUPLES_OK, "PQexecParams", "get_size"))
  224. {
  225. return 0;
  226. }
  227. if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) )
  228. {
  229. GNUNET_break (0);
  230. PQclear (ret);
  231. return 0;
  232. }
  233. if (PQgetlength (ret, 0, 0) != sizeof (unsigned long long))
  234. {
  235. GNUNET_break (0 == PQgetlength (ret, 0, 0));
  236. PQclear (ret);
  237. return 0;
  238. }
  239. total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
  240. PQclear (ret);
  241. return total;
  242. }
  243. /**
  244. * Store an item in the datastore.
  245. *
  246. * @param cls closure with the `struct Plugin`
  247. * @param key key for the item
  248. * @param size number of bytes in data
  249. * @param data content stored
  250. * @param type type of the content
  251. * @param priority priority of the content
  252. * @param anonymity anonymity-level for the content
  253. * @param replication replication-level for the content
  254. * @param expiration expiration time for the content
  255. * @param msg set to error message
  256. * @return #GNUNET_OK on success
  257. */
  258. static int
  259. postgres_plugin_put (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
  260. const void *data, enum GNUNET_BLOCK_Type type,
  261. uint32_t priority, uint32_t anonymity,
  262. uint32_t replication,
  263. struct GNUNET_TIME_Absolute expiration, char **msg)
  264. {
  265. struct Plugin *plugin = cls;
  266. struct GNUNET_HashCode vhash;
  267. PGresult *ret;
  268. uint32_t btype = htonl (type);
  269. uint32_t bprio = htonl (priority);
  270. uint32_t banon = htonl (anonymity);
  271. uint32_t brepl = htonl (replication);
  272. uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).abs_value_us__;
  273. const char *paramValues[] = {
  274. (const char *) &brepl,
  275. (const char *) &btype,
  276. (const char *) &bprio,
  277. (const char *) &banon,
  278. (const char *) &bexpi,
  279. (const char *) key,
  280. (const char *) &vhash,
  281. (const char *) data
  282. };
  283. int paramLengths[] = {
  284. sizeof (brepl),
  285. sizeof (btype),
  286. sizeof (bprio),
  287. sizeof (banon),
  288. sizeof (bexpi),
  289. sizeof (struct GNUNET_HashCode),
  290. sizeof (struct GNUNET_HashCode),
  291. size
  292. };
  293. const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1, 1 };
  294. GNUNET_CRYPTO_hash (data, size, &vhash);
  295. ret =
  296. PQexecPrepared (plugin->dbh, "put", 8, paramValues, paramLengths,
  297. paramFormats, 1);
  298. if (GNUNET_OK !=
  299. GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "PQexecPrepared", "put"))
  300. return GNUNET_SYSERR;
  301. PQclear (ret);
  302. plugin->env->duc (plugin->env->cls, size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
  303. GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
  304. "Stored %u bytes in database\n", (unsigned int) size);
  305. return GNUNET_OK;
  306. }
  307. /**
  308. * Function invoked to process the result and call
  309. * the processor.
  310. *
  311. * @param plugin global plugin data
  312. * @param proc function to call the value (once only).
  313. * @param proc_cls closure for proc
  314. * @param res result from exec
  315. * @param filename filename for error messages
  316. * @param line line number for error messages
  317. */
  318. static void
  319. process_result (struct Plugin *plugin, PluginDatumProcessor proc,
  320. void *proc_cls, PGresult * res,
  321. const char *filename, int line)
  322. {
  323. int iret;
  324. enum GNUNET_BLOCK_Type type;
  325. uint32_t anonymity;
  326. uint32_t priority;
  327. uint32_t size;
  328. unsigned int rowid;
  329. struct GNUNET_TIME_Absolute expiration_time;
  330. struct GNUNET_HashCode key;
  331. if (GNUNET_OK !=
  332. GNUNET_POSTGRES_check_result_ (plugin->dbh, res, PGRES_TUPLES_OK, "PQexecPrepared", "select",
  333. filename, line))
  334. {
  335. GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
  336. "Ending iteration (postgres error)\n");
  337. proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
  338. return;
  339. }
  340. if (0 == PQntuples (res))
  341. {
  342. /* no result */
  343. GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
  344. "Ending iteration (no more results)\n");
  345. proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
  346. PQclear (res);
  347. return;
  348. }
  349. if ((1 != PQntuples (res)) || (7 != PQnfields (res)) ||
  350. (sizeof (uint32_t) != PQfsize (res, 0)) ||
  351. (sizeof (uint32_t) != PQfsize (res, 6)))
  352. {
  353. GNUNET_break (0);
  354. proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
  355. PQclear (res);
  356. return;
  357. }
  358. rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6));
  359. if ((sizeof (uint32_t) != PQfsize (res, 0)) ||
  360. (sizeof (uint32_t) != PQfsize (res, 1)) ||
  361. (sizeof (uint32_t) != PQfsize (res, 2)) ||
  362. (sizeof (uint64_t) != PQfsize (res, 3)) ||
  363. (sizeof (struct GNUNET_HashCode) != PQgetlength (res, 0, 4)))
  364. {
  365. GNUNET_break (0);
  366. PQclear (res);
  367. GNUNET_POSTGRES_delete_by_rowid (plugin->dbh, "delrow", rowid);
  368. proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
  369. return;
  370. }
  371. type = ntohl (*(uint32_t *) PQgetvalue (res, 0, 0));
  372. priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
  373. anonymity = ntohl (*(uint32_t *) PQgetvalue (res, 0, 2));
  374. expiration_time.abs_value_us =
  375. GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3));
  376. memcpy (&key, PQgetvalue (res, 0, 4), sizeof (struct GNUNET_HashCode));
  377. size = PQgetlength (res, 0, 5);
  378. GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
  379. "Found result of size %u bytes and type %u in database\n",
  380. (unsigned int) size, (unsigned int) type);
  381. iret =
  382. proc (proc_cls, &key, size, PQgetvalue (res, 0, 5),
  383. (enum GNUNET_BLOCK_Type) type, priority, anonymity, expiration_time,
  384. rowid);
  385. PQclear (res);
  386. if (iret == GNUNET_NO)
  387. {
  388. GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
  389. "Processor asked for item %u to be removed.\n", rowid);
  390. if (GNUNET_OK == GNUNET_POSTGRES_delete_by_rowid (plugin->dbh, "delrow", rowid))
  391. {
  392. GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
  393. "Deleting %u bytes from database\n",
  394. (unsigned int) size);
  395. plugin->env->duc (plugin->env->cls,
  396. -(size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
  397. GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "datastore-postgres",
  398. "Deleted %u bytes from database\n", (unsigned int) size);
  399. }
  400. }
  401. }
  402. /**
  403. * Iterate over the results for a particular key
  404. * in the datastore.
  405. *
  406. * @param cls closure with the 'struct Plugin'
  407. * @param offset offset of the result (modulo num-results);
  408. * specific ordering does not matter for the offset
  409. * @param key maybe NULL (to match all entries)
  410. * @param vhash hash of the value, maybe NULL (to
  411. * match all values that have the right key).
  412. * Note that for DBlocks there is no difference
  413. * betwen key and vhash, but for other blocks
  414. * there may be!
  415. * @param type entries of which type are relevant?
  416. * Use 0 for any type.
  417. * @param proc function to call on the matching value;
  418. * will be called once with a NULL if no value matches
  419. * @param proc_cls closure for iter
  420. */
  421. static void
  422. postgres_plugin_get_key (void *cls, uint64_t offset,
  423. const struct GNUNET_HashCode * key,
  424. const struct GNUNET_HashCode * vhash,
  425. enum GNUNET_BLOCK_Type type, PluginDatumProcessor proc,
  426. void *proc_cls)
  427. {
  428. struct Plugin *plugin = cls;
  429. const int paramFormats[] = { 1, 1, 1, 1, 1 };
  430. int paramLengths[4];
  431. const char *paramValues[4];
  432. int nparams;
  433. const char *pname;
  434. PGresult *ret;
  435. uint64_t total;
  436. uint64_t blimit_off;
  437. uint32_t btype;
  438. GNUNET_assert (key != NULL);
  439. paramValues[0] = (const char *) key;
  440. paramLengths[0] = sizeof (struct GNUNET_HashCode);
  441. btype = htonl (type);
  442. if (type != 0)
  443. {
  444. if (vhash != NULL)
  445. {
  446. paramValues[1] = (const char *) vhash;
  447. paramLengths[1] = sizeof (struct GNUNET_HashCode);
  448. paramValues[2] = (const char *) &btype;
  449. paramLengths[2] = sizeof (btype);
  450. paramValues[3] = (const char *) &blimit_off;
  451. paramLengths[3] = sizeof (blimit_off);
  452. nparams = 4;
  453. pname = "getvt";
  454. ret =
  455. PQexecParams (plugin->dbh,
  456. "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3",
  457. 3, NULL, paramValues, paramLengths, paramFormats, 1);
  458. }
  459. else
  460. {
  461. paramValues[1] = (const char *) &btype;
  462. paramLengths[1] = sizeof (btype);
  463. paramValues[2] = (const char *) &blimit_off;
  464. paramLengths[2] = sizeof (blimit_off);
  465. nparams = 3;
  466. pname = "gett";
  467. ret =
  468. PQexecParams (plugin->dbh,
  469. "SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2",
  470. 2, NULL, paramValues, paramLengths, paramFormats, 1);
  471. }
  472. }
  473. else
  474. {
  475. if (vhash != NULL)
  476. {
  477. paramValues[1] = (const char *) vhash;
  478. paramLengths[1] = sizeof (struct GNUNET_HashCode);
  479. paramValues[2] = (const char *) &blimit_off;
  480. paramLengths[2] = sizeof (blimit_off);
  481. nparams = 3;
  482. pname = "getv";
  483. ret =
  484. PQexecParams (plugin->dbh,
  485. "SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2",
  486. 2, NULL, paramValues, paramLengths, paramFormats, 1);
  487. }
  488. else
  489. {
  490. paramValues[1] = (const char *) &blimit_off;
  491. paramLengths[1] = sizeof (blimit_off);
  492. nparams = 2;
  493. pname = "get";
  494. ret =
  495. PQexecParams (plugin->dbh, "SELECT count(*) FROM gn090 WHERE hash=$1",
  496. 1, NULL, paramValues, paramLengths, paramFormats, 1);
  497. }
  498. }
  499. if (GNUNET_OK !=
  500. GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_TUPLES_OK, "PQexecParams", pname))
  501. {
  502. proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
  503. return;
  504. }
  505. if ((PQntuples (ret) != 1) || (PQnfields (ret) != 1) ||
  506. (PQgetlength (ret, 0, 0) != sizeof (unsigned long long)))
  507. {
  508. GNUNET_break (0);
  509. PQclear (ret);
  510. proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
  511. return;
  512. }
  513. total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
  514. PQclear (ret);
  515. if (total == 0)
  516. {
  517. proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
  518. return;
  519. }
  520. blimit_off = GNUNET_htonll (offset % total);
  521. ret =
  522. PQexecPrepared (plugin->dbh, pname, nparams, paramValues, paramLengths,
  523. paramFormats, 1);
  524. process_result (plugin, proc, proc_cls, ret, __FILE__, __LINE__);
  525. }
  526. /**
  527. * Select a subset of the items in the datastore and call
  528. * the given iterator for each of them.
  529. *
  530. * @param cls our "struct Plugin*"
  531. * @param offset offset of the result (modulo num-results);
  532. * specific ordering does not matter for the offset
  533. * @param type entries of which type should be considered?
  534. * Use 0 for any type.
  535. * @param proc function to call on the matching value;
  536. * will be called with a NULL if no value matches
  537. * @param proc_cls closure for proc
  538. */
  539. static void
  540. postgres_plugin_get_zero_anonymity (void *cls, uint64_t offset,
  541. enum GNUNET_BLOCK_Type type,
  542. PluginDatumProcessor proc, void *proc_cls)
  543. {
  544. struct Plugin *plugin = cls;
  545. uint32_t btype;
  546. uint64_t boff;
  547. const int paramFormats[] = { 1, 1 };
  548. int paramLengths[] = { sizeof (btype), sizeof (boff) };
  549. const char *paramValues[] = { (const char *) &btype, (const char *) &boff };
  550. PGresult *ret;
  551. btype = htonl ((uint32_t) type);
  552. boff = GNUNET_htonll (offset);
  553. ret =
  554. PQexecPrepared (plugin->dbh, "select_non_anonymous", 2, paramValues,
  555. paramLengths, paramFormats, 1);
  556. process_result (plugin, proc, proc_cls, ret, __FILE__, __LINE__);
  557. }
  558. /**
  559. * Context for 'repl_iter' function.
  560. */
  561. struct ReplCtx
  562. {
  563. /**
  564. * Plugin handle.
  565. */
  566. struct Plugin *plugin;
  567. /**
  568. * Function to call for the result (or the NULL).
  569. */
  570. PluginDatumProcessor proc;
  571. /**
  572. * Closure for proc.
  573. */
  574. void *proc_cls;
  575. };
  576. /**
  577. * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
  578. * Decrements the replication counter and calls the original
  579. * iterator.
  580. *
  581. * @param cls closure with the 'struct ReplCtx*'
  582. * @param key key for the content
  583. * @param size number of bytes in data
  584. * @param data content stored
  585. * @param type type of the content
  586. * @param priority priority of the content
  587. * @param anonymity anonymity-level for the content
  588. * @param expiration expiration time for the content
  589. * @param uid unique identifier for the datum;
  590. * maybe 0 if no unique identifier is available
  591. *
  592. * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
  593. * (continue on call to "next", of course),
  594. * GNUNET_NO to delete the item and continue (if supported)
  595. */
  596. static int
  597. repl_proc (void *cls, const struct GNUNET_HashCode * key, uint32_t size,
  598. const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority,
  599. uint32_t anonymity, struct GNUNET_TIME_Absolute expiration,
  600. uint64_t uid)
  601. {
  602. struct ReplCtx *rc = cls;
  603. struct Plugin *plugin = rc->plugin;
  604. int ret;
  605. PGresult *qret;
  606. uint32_t boid;
  607. ret =
  608. rc->proc (rc->proc_cls, key, size, data, type, priority, anonymity,
  609. expiration, uid);
  610. if (NULL != key)
  611. {
  612. boid = htonl ((uint32_t) uid);
  613. const char *paramValues[] = {
  614. (const char *) &boid,
  615. };
  616. int paramLengths[] = {
  617. sizeof (boid),
  618. };
  619. const int paramFormats[] = { 1 };
  620. qret =
  621. PQexecPrepared (plugin->dbh, "decrepl", 1, paramValues, paramLengths,
  622. paramFormats, 1);
  623. if (GNUNET_OK !=
  624. GNUNET_POSTGRES_check_result (plugin->dbh, qret, PGRES_COMMAND_OK, "PQexecPrepared",
  625. "decrepl"))
  626. return GNUNET_SYSERR;
  627. PQclear (qret);
  628. }
  629. return ret;
  630. }
  631. /**
  632. * Get a random item for replication. Returns a single, not expired, random item
  633. * from those with the highest replication counters. The item's
  634. * replication counter is decremented by one IF it was positive before.
  635. * Call 'proc' with all values ZERO or NULL if the datastore is empty.
  636. *
  637. * @param cls closure with the 'struct Plugin'
  638. * @param proc function to call the value (once only).
  639. * @param proc_cls closure for proc
  640. */
  641. static void
  642. postgres_plugin_get_replication (void *cls, PluginDatumProcessor proc,
  643. void *proc_cls)
  644. {
  645. struct Plugin *plugin = cls;
  646. struct ReplCtx rc;
  647. PGresult *ret;
  648. rc.plugin = plugin;
  649. rc.proc = proc;
  650. rc.proc_cls = proc_cls;
  651. ret =
  652. PQexecPrepared (plugin->dbh, "select_replication_order", 0, NULL, NULL,
  653. NULL, 1);
  654. process_result (plugin, &repl_proc, &rc, ret, __FILE__, __LINE__);
  655. }
  656. /**
  657. * Get a random item for expiration.
  658. * Call 'proc' with all values ZERO or NULL if the datastore is empty.
  659. *
  660. * @param cls closure with the 'struct Plugin'
  661. * @param proc function to call the value (once only).
  662. * @param proc_cls closure for proc
  663. */
  664. static void
  665. postgres_plugin_get_expiration (void *cls, PluginDatumProcessor proc,
  666. void *proc_cls)
  667. {
  668. struct Plugin *plugin = cls;
  669. uint64_t btime;
  670. const int paramFormats[] = { 1 };
  671. int paramLengths[] = { sizeof (btime) };
  672. const char *paramValues[] = { (const char *) &btime };
  673. PGresult *ret;
  674. btime = GNUNET_htonll (GNUNET_TIME_absolute_get ().abs_value_us);
  675. ret =
  676. PQexecPrepared (plugin->dbh, "select_expiration_order", 1, paramValues,
  677. paramLengths, paramFormats, 1);
  678. process_result (plugin, proc, proc_cls, ret, __FILE__, __LINE__);
  679. }
  680. /**
  681. * Update the priority for a particular key in the datastore. If
  682. * the expiration time in value is different than the time found in
  683. * the datastore, the higher value should be kept. For the
  684. * anonymity level, the lower value is to be used. The specified
  685. * priority should be added to the existing priority, ignoring the
  686. * priority in value.
  687. *
  688. * Note that it is possible for multiple values to match this put.
  689. * In that case, all of the respective values are updated.
  690. *
  691. * @param cls our "struct Plugin*"
  692. * @param uid unique identifier of the datum
  693. * @param delta by how much should the priority
  694. * change? If priority + delta < 0 the
  695. * priority should be set to 0 (never go
  696. * negative).
  697. * @param expire new expiration time should be the
  698. * MAX of any existing expiration time and
  699. * this value
  700. * @param msg set to error message
  701. * @return GNUNET_OK on success
  702. */
  703. static int
  704. postgres_plugin_update (void *cls, uint64_t uid, int delta,
  705. struct GNUNET_TIME_Absolute expire, char **msg)
  706. {
  707. struct Plugin *plugin = cls;
  708. PGresult *ret;
  709. int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
  710. uint32_t boid = htonl ((uint32_t) uid);
  711. uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value_us__;
  712. const char *paramValues[] = {
  713. (const char *) &bdelta,
  714. (const char *) &bexpire,
  715. (const char *) &boid,
  716. };
  717. int paramLengths[] = {
  718. sizeof (bdelta),
  719. sizeof (bexpire),
  720. sizeof (boid),
  721. };
  722. const int paramFormats[] = { 1, 1, 1 };
  723. ret =
  724. PQexecPrepared (plugin->dbh, "update", 3, paramValues, paramLengths,
  725. paramFormats, 1);
  726. if (GNUNET_OK !=
  727. GNUNET_POSTGRES_check_result (plugin->dbh, ret, PGRES_COMMAND_OK, "PQexecPrepared", "update"))
  728. return GNUNET_SYSERR;
  729. PQclear (ret);
  730. return GNUNET_OK;
  731. }
  732. /**
  733. * Get all of the keys in the datastore.
  734. *
  735. * @param cls closure with the 'struct Plugin'
  736. * @param proc function to call on each key
  737. * @param proc_cls closure for proc
  738. */
  739. static void
  740. postgres_plugin_get_keys (void *cls,
  741. PluginKeyProcessor proc,
  742. void *proc_cls)
  743. {
  744. struct Plugin *plugin = cls;
  745. int ret;
  746. int i;
  747. struct GNUNET_HashCode key;
  748. PGresult * res;
  749. res = PQexecPrepared (plugin->dbh, "get_keys", 0, NULL, NULL, NULL, 1);
  750. ret = PQntuples (res);
  751. for (i=0;i<ret;i++)
  752. {
  753. if (sizeof (struct GNUNET_HashCode) != PQgetlength (res, i, 0))
  754. {
  755. memcpy (&key, PQgetvalue (res, i, 0), sizeof (struct GNUNET_HashCode));
  756. proc (proc_cls, &key, 1);
  757. }
  758. }
  759. PQclear (res);
  760. }
  761. /**
  762. * Drop database.
  763. *
  764. * @param cls closure with the 'struct Plugin'
  765. */
  766. static void
  767. postgres_plugin_drop (void *cls)
  768. {
  769. struct Plugin *plugin = cls;
  770. if (GNUNET_OK != GNUNET_POSTGRES_exec (plugin->dbh, "DROP TABLE gn090"))
  771. GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, "postgres", _("Failed to drop table from database.\n"));
  772. }
  773. /**
  774. * Entry point for the plugin.
  775. *
  776. * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
  777. * @return our "struct Plugin*"
  778. */
  779. void *
  780. libgnunet_plugin_datastore_postgres_init (void *cls)
  781. {
  782. struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
  783. struct GNUNET_DATASTORE_PluginFunctions *api;
  784. struct Plugin *plugin;
  785. plugin = GNUNET_new (struct Plugin);
  786. plugin->env = env;
  787. if (GNUNET_OK != init_connection (plugin))
  788. {
  789. GNUNET_free (plugin);
  790. return NULL;
  791. }
  792. api = GNUNET_new (struct GNUNET_DATASTORE_PluginFunctions);
  793. api->cls = plugin;
  794. api->estimate_size = &postgres_plugin_estimate_size;
  795. api->put = &postgres_plugin_put;
  796. api->update = &postgres_plugin_update;
  797. api->get_key = &postgres_plugin_get_key;
  798. api->get_replication = &postgres_plugin_get_replication;
  799. api->get_expiration = &postgres_plugin_get_expiration;
  800. api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
  801. api->get_keys = &postgres_plugin_get_keys;
  802. api->drop = &postgres_plugin_drop;
  803. GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "datastore-postgres",
  804. _("Postgres database running\n"));
  805. return api;
  806. }
  807. /**
  808. * Exit point from the plugin.
  809. * @param cls our "struct Plugin*"
  810. * @return always NULL
  811. */
  812. void *
  813. libgnunet_plugin_datastore_postgres_done (void *cls)
  814. {
  815. struct GNUNET_DATASTORE_PluginFunctions *api = cls;
  816. struct Plugin *plugin = api->cls;
  817. PQfinish (plugin->dbh);
  818. GNUNET_free (plugin);
  819. GNUNET_free (api);
  820. return NULL;
  821. }
  822. /* end of plugin_datastore_postgres.c */