database.py 57 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. # Copyright 2017-2018 New Vector Ltd
  4. # Copyright 2019 The Matrix.org Foundation C.I.C.
  5. #
  6. # Licensed under the Apache License, Version 2.0 (the "License");
  7. # you may not use this file except in compliance with the License.
  8. # You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. import logging
  18. import time
  19. from sys import intern
  20. from time import monotonic as monotonic_time
  21. from typing import (
  22. Any,
  23. Callable,
  24. Dict,
  25. Iterable,
  26. Iterator,
  27. List,
  28. Optional,
  29. Tuple,
  30. TypeVar,
  31. )
  32. from prometheus_client import Histogram
  33. from twisted.enterprise import adbapi
  34. from twisted.internet import defer
  35. from synapse.api.errors import StoreError
  36. from synapse.config.database import DatabaseConnectionConfig
  37. from synapse.logging.context import (
  38. LoggingContext,
  39. LoggingContextOrSentinel,
  40. current_context,
  41. make_deferred_yieldable,
  42. )
  43. from synapse.metrics.background_process_metrics import run_as_background_process
  44. from synapse.storage.background_updates import BackgroundUpdater
  45. from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
  46. from synapse.storage.types import Connection, Cursor
  47. from synapse.types import Collection
  48. logger = logging.getLogger(__name__)
  49. # python 3 does not have a maximum int value
  50. MAX_TXN_ID = 2 ** 63 - 1
  51. sql_logger = logging.getLogger("synapse.storage.SQL")
  52. transaction_logger = logging.getLogger("synapse.storage.txn")
  53. perf_logger = logging.getLogger("synapse.storage.TIME")
  54. sql_scheduling_timer = Histogram("synapse_storage_schedule_time", "sec")
  55. sql_query_timer = Histogram("synapse_storage_query_time", "sec", ["verb"])
  56. sql_txn_timer = Histogram("synapse_storage_transaction_time", "sec", ["desc"])
  57. # Unique indexes which have been added in background updates. Maps from table name
  58. # to the name of the background update which added the unique index to that table.
  59. #
  60. # This is used by the upsert logic to figure out which tables are safe to do a proper
  61. # UPSERT on: until the relevant background update has completed, we
  62. # have to emulate an upsert by locking the table.
  63. #
  64. UNIQUE_INDEX_BACKGROUND_UPDATES = {
  65. "user_ips": "user_ips_device_unique_index",
  66. "device_lists_remote_extremeties": "device_lists_remote_extremeties_unique_idx",
  67. "device_lists_remote_cache": "device_lists_remote_cache_unique_idx",
  68. "event_search": "event_search_event_id_idx",
  69. }
  70. def make_pool(
  71. reactor, db_config: DatabaseConnectionConfig, engine: BaseDatabaseEngine
  72. ) -> adbapi.ConnectionPool:
  73. """Get the connection pool for the database.
  74. """
  75. return adbapi.ConnectionPool(
  76. db_config.config["name"],
  77. cp_reactor=reactor,
  78. cp_openfun=engine.on_new_connection,
  79. **db_config.config.get("args", {})
  80. )
  81. def make_conn(
  82. db_config: DatabaseConnectionConfig, engine: BaseDatabaseEngine
  83. ) -> Connection:
  84. """Make a new connection to the database and return it.
  85. Returns:
  86. Connection
  87. """
  88. db_params = {
  89. k: v
  90. for k, v in db_config.config.get("args", {}).items()
  91. if not k.startswith("cp_")
  92. }
  93. db_conn = engine.module.connect(**db_params)
  94. engine.on_new_connection(db_conn)
  95. return db_conn
  96. # The type of entry which goes on our after_callbacks and exception_callbacks lists.
  97. #
  98. # Python 3.5.2 doesn't support Callable with an ellipsis, so we wrap it in quotes so
  99. # that mypy sees the type but the runtime python doesn't.
  100. _CallbackListEntry = Tuple["Callable[..., None]", Iterable[Any], Dict[str, Any]]
  101. class LoggingTransaction:
  102. """An object that almost-transparently proxies for the 'txn' object
  103. passed to the constructor. Adds logging and metrics to the .execute()
  104. method.
  105. Args:
  106. txn: The database transcation object to wrap.
  107. name: The name of this transactions for logging.
  108. database_engine
  109. after_callbacks: A list that callbacks will be appended to
  110. that have been added by `call_after` which should be run on
  111. successful completion of the transaction. None indicates that no
  112. callbacks should be allowed to be scheduled to run.
  113. exception_callbacks: A list that callbacks will be appended
  114. to that have been added by `call_on_exception` which should be run
  115. if transaction ends with an error. None indicates that no callbacks
  116. should be allowed to be scheduled to run.
  117. """
  118. __slots__ = [
  119. "txn",
  120. "name",
  121. "database_engine",
  122. "after_callbacks",
  123. "exception_callbacks",
  124. ]
  125. def __init__(
  126. self,
  127. txn: Cursor,
  128. name: str,
  129. database_engine: BaseDatabaseEngine,
  130. after_callbacks: Optional[List[_CallbackListEntry]] = None,
  131. exception_callbacks: Optional[List[_CallbackListEntry]] = None,
  132. ):
  133. self.txn = txn
  134. self.name = name
  135. self.database_engine = database_engine
  136. self.after_callbacks = after_callbacks
  137. self.exception_callbacks = exception_callbacks
  138. def call_after(self, callback: "Callable[..., None]", *args, **kwargs):
  139. """Call the given callback on the main twisted thread after the
  140. transaction has finished. Used to invalidate the caches on the
  141. correct thread.
  142. """
  143. # if self.after_callbacks is None, that means that whatever constructed the
  144. # LoggingTransaction isn't expecting there to be any callbacks; assert that
  145. # is not the case.
  146. assert self.after_callbacks is not None
  147. self.after_callbacks.append((callback, args, kwargs))
  148. def call_on_exception(self, callback: "Callable[..., None]", *args, **kwargs):
  149. # if self.exception_callbacks is None, that means that whatever constructed the
  150. # LoggingTransaction isn't expecting there to be any callbacks; assert that
  151. # is not the case.
  152. assert self.exception_callbacks is not None
  153. self.exception_callbacks.append((callback, args, kwargs))
  154. def fetchall(self) -> List[Tuple]:
  155. return self.txn.fetchall()
  156. def fetchone(self) -> Tuple:
  157. return self.txn.fetchone()
  158. def __iter__(self) -> Iterator[Tuple]:
  159. return self.txn.__iter__()
  160. @property
  161. def rowcount(self) -> int:
  162. return self.txn.rowcount
  163. @property
  164. def description(self) -> Any:
  165. return self.txn.description
  166. def execute_batch(self, sql, args):
  167. if isinstance(self.database_engine, PostgresEngine):
  168. from psycopg2.extras import execute_batch # type: ignore
  169. self._do_execute(lambda *x: execute_batch(self.txn, *x), sql, args)
  170. else:
  171. for val in args:
  172. self.execute(sql, val)
  173. def execute(self, sql: str, *args: Any):
  174. self._do_execute(self.txn.execute, sql, *args)
  175. def executemany(self, sql: str, *args: Any):
  176. self._do_execute(self.txn.executemany, sql, *args)
  177. def _make_sql_one_line(self, sql: str) -> str:
  178. "Strip newlines out of SQL so that the loggers in the DB are on one line"
  179. return " ".join(line.strip() for line in sql.splitlines() if line.strip())
  180. def _do_execute(self, func, sql, *args):
  181. sql = self._make_sql_one_line(sql)
  182. # TODO(paul): Maybe use 'info' and 'debug' for values?
  183. sql_logger.debug("[SQL] {%s} %s", self.name, sql)
  184. sql = self.database_engine.convert_param_style(sql)
  185. if args:
  186. try:
  187. sql_logger.debug("[SQL values] {%s} %r", self.name, args[0])
  188. except Exception:
  189. # Don't let logging failures stop SQL from working
  190. pass
  191. start = time.time()
  192. try:
  193. return func(sql, *args)
  194. except Exception as e:
  195. logger.debug("[SQL FAIL] {%s} %s", self.name, e)
  196. raise
  197. finally:
  198. secs = time.time() - start
  199. sql_logger.debug("[SQL time] {%s} %f sec", self.name, secs)
  200. sql_query_timer.labels(sql.split()[0]).observe(secs)
  201. def close(self):
  202. self.txn.close()
  203. class PerformanceCounters(object):
  204. def __init__(self):
  205. self.current_counters = {}
  206. self.previous_counters = {}
  207. def update(self, key, duration_secs):
  208. count, cum_time = self.current_counters.get(key, (0, 0))
  209. count += 1
  210. cum_time += duration_secs
  211. self.current_counters[key] = (count, cum_time)
  212. def interval(self, interval_duration_secs, limit=3):
  213. counters = []
  214. for name, (count, cum_time) in self.current_counters.items():
  215. prev_count, prev_time = self.previous_counters.get(name, (0, 0))
  216. counters.append(
  217. (
  218. (cum_time - prev_time) / interval_duration_secs,
  219. count - prev_count,
  220. name,
  221. )
  222. )
  223. self.previous_counters = dict(self.current_counters)
  224. counters.sort(reverse=True)
  225. top_n_counters = ", ".join(
  226. "%s(%d): %.3f%%" % (name, count, 100 * ratio)
  227. for ratio, count, name in counters[:limit]
  228. )
  229. return top_n_counters
  230. class Database(object):
  231. """Wraps a single physical database and connection pool.
  232. A single database may be used by multiple data stores.
  233. """
  234. _TXN_ID = 0
  235. def __init__(
  236. self, hs, database_config: DatabaseConnectionConfig, engine: BaseDatabaseEngine
  237. ):
  238. self.hs = hs
  239. self._clock = hs.get_clock()
  240. self._database_config = database_config
  241. self._db_pool = make_pool(hs.get_reactor(), database_config, engine)
  242. self.updates = BackgroundUpdater(hs, self)
  243. self._previous_txn_total_time = 0.0
  244. self._current_txn_total_time = 0.0
  245. self._previous_loop_ts = 0.0
  246. # TODO(paul): These can eventually be removed once the metrics code
  247. # is running in mainline, and we have some nice monitoring frontends
  248. # to watch it
  249. self._txn_perf_counters = PerformanceCounters()
  250. self.engine = engine
  251. # A set of tables that are not safe to use native upserts in.
  252. self._unsafe_to_upsert_tables = set(UNIQUE_INDEX_BACKGROUND_UPDATES.keys())
  253. # We add the user_directory_search table to the blacklist on SQLite
  254. # because the existing search table does not have an index, making it
  255. # unsafe to use native upserts.
  256. if isinstance(self.engine, Sqlite3Engine):
  257. self._unsafe_to_upsert_tables.add("user_directory_search")
  258. if self.engine.can_native_upsert:
  259. # Check ASAP (and then later, every 1s) to see if we have finished
  260. # background updates of tables that aren't safe to update.
  261. self._clock.call_later(
  262. 0.0,
  263. run_as_background_process,
  264. "upsert_safety_check",
  265. self._check_safe_to_upsert,
  266. )
  267. def is_running(self):
  268. """Is the database pool currently running
  269. """
  270. return self._db_pool.running
  271. @defer.inlineCallbacks
  272. def _check_safe_to_upsert(self):
  273. """
  274. Is it safe to use native UPSERT?
  275. If there are background updates, we will need to wait, as they may be
  276. the addition of indexes that set the UNIQUE constraint that we require.
  277. If the background updates have not completed, wait 15 sec and check again.
  278. """
  279. updates = yield self.simple_select_list(
  280. "background_updates",
  281. keyvalues=None,
  282. retcols=["update_name"],
  283. desc="check_background_updates",
  284. )
  285. updates = [x["update_name"] for x in updates]
  286. for table, update_name in UNIQUE_INDEX_BACKGROUND_UPDATES.items():
  287. if update_name not in updates:
  288. logger.debug("Now safe to upsert in %s", table)
  289. self._unsafe_to_upsert_tables.discard(table)
  290. # If there's any updates still running, reschedule to run.
  291. if updates:
  292. self._clock.call_later(
  293. 15.0,
  294. run_as_background_process,
  295. "upsert_safety_check",
  296. self._check_safe_to_upsert,
  297. )
  298. def start_profiling(self):
  299. self._previous_loop_ts = monotonic_time()
  300. def loop():
  301. curr = self._current_txn_total_time
  302. prev = self._previous_txn_total_time
  303. self._previous_txn_total_time = curr
  304. time_now = monotonic_time()
  305. time_then = self._previous_loop_ts
  306. self._previous_loop_ts = time_now
  307. duration = time_now - time_then
  308. ratio = (curr - prev) / duration
  309. top_three_counters = self._txn_perf_counters.interval(duration, limit=3)
  310. perf_logger.debug(
  311. "Total database time: %.3f%% {%s}", ratio * 100, top_three_counters
  312. )
  313. self._clock.looping_call(loop, 10000)
  314. def new_transaction(
  315. self, conn, desc, after_callbacks, exception_callbacks, func, *args, **kwargs
  316. ):
  317. start = monotonic_time()
  318. txn_id = self._TXN_ID
  319. # We don't really need these to be unique, so lets stop it from
  320. # growing really large.
  321. self._TXN_ID = (self._TXN_ID + 1) % (MAX_TXN_ID)
  322. name = "%s-%x" % (desc, txn_id)
  323. transaction_logger.debug("[TXN START] {%s}", name)
  324. try:
  325. i = 0
  326. N = 5
  327. while True:
  328. cursor = LoggingTransaction(
  329. conn.cursor(),
  330. name,
  331. self.engine,
  332. after_callbacks,
  333. exception_callbacks,
  334. )
  335. try:
  336. r = func(cursor, *args, **kwargs)
  337. conn.commit()
  338. return r
  339. except self.engine.module.OperationalError as e:
  340. # This can happen if the database disappears mid
  341. # transaction.
  342. logger.warning(
  343. "[TXN OPERROR] {%s} %s %d/%d", name, e, i, N,
  344. )
  345. if i < N:
  346. i += 1
  347. try:
  348. conn.rollback()
  349. except self.engine.module.Error as e1:
  350. logger.warning("[TXN EROLL] {%s} %s", name, e1)
  351. continue
  352. raise
  353. except self.engine.module.DatabaseError as e:
  354. if self.engine.is_deadlock(e):
  355. logger.warning("[TXN DEADLOCK] {%s} %d/%d", name, i, N)
  356. if i < N:
  357. i += 1
  358. try:
  359. conn.rollback()
  360. except self.engine.module.Error as e1:
  361. logger.warning(
  362. "[TXN EROLL] {%s} %s", name, e1,
  363. )
  364. continue
  365. raise
  366. finally:
  367. # we're either about to retry with a new cursor, or we're about to
  368. # release the connection. Once we release the connection, it could
  369. # get used for another query, which might do a conn.rollback().
  370. #
  371. # In the latter case, even though that probably wouldn't affect the
  372. # results of this transaction, python's sqlite will reset all
  373. # statements on the connection [1], which will make our cursor
  374. # invalid [2].
  375. #
  376. # In any case, continuing to read rows after commit()ing seems
  377. # dubious from the PoV of ACID transactional semantics
  378. # (sqlite explicitly says that once you commit, you may see rows
  379. # from subsequent updates.)
  380. #
  381. # In psycopg2, cursors are essentially a client-side fabrication -
  382. # all the data is transferred to the client side when the statement
  383. # finishes executing - so in theory we could go on streaming results
  384. # from the cursor, but attempting to do so would make us
  385. # incompatible with sqlite, so let's make sure we're not doing that
  386. # by closing the cursor.
  387. #
  388. # (*named* cursors in psycopg2 are different and are proper server-
  389. # side things, but (a) we don't use them and (b) they are implicitly
  390. # closed by ending the transaction anyway.)
  391. #
  392. # In short, if we haven't finished with the cursor yet, that's a
  393. # problem waiting to bite us.
  394. #
  395. # TL;DR: we're done with the cursor, so we can close it.
  396. #
  397. # [1]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/connection.c#L465
  398. # [2]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/cursor.c#L236
  399. cursor.close()
  400. except Exception as e:
  401. logger.debug("[TXN FAIL] {%s} %s", name, e)
  402. raise
  403. finally:
  404. end = monotonic_time()
  405. duration = end - start
  406. current_context().add_database_transaction(duration)
  407. transaction_logger.debug("[TXN END] {%s} %f sec", name, duration)
  408. self._current_txn_total_time += duration
  409. self._txn_perf_counters.update(desc, duration)
  410. sql_txn_timer.labels(desc).observe(duration)
  411. @defer.inlineCallbacks
  412. def runInteraction(self, desc: str, func: Callable, *args: Any, **kwargs: Any):
  413. """Starts a transaction on the database and runs a given function
  414. Arguments:
  415. desc: description of the transaction, for logging and metrics
  416. func: callback function, which will be called with a
  417. database transaction (twisted.enterprise.adbapi.Transaction) as
  418. its first argument, followed by `args` and `kwargs`.
  419. args: positional args to pass to `func`
  420. kwargs: named args to pass to `func`
  421. Returns:
  422. Deferred: The result of func
  423. """
  424. after_callbacks = [] # type: List[_CallbackListEntry]
  425. exception_callbacks = [] # type: List[_CallbackListEntry]
  426. if not current_context():
  427. logger.warning("Starting db txn '%s' from sentinel context", desc)
  428. try:
  429. result = yield self.runWithConnection(
  430. self.new_transaction,
  431. desc,
  432. after_callbacks,
  433. exception_callbacks,
  434. func,
  435. *args,
  436. **kwargs
  437. )
  438. for after_callback, after_args, after_kwargs in after_callbacks:
  439. after_callback(*after_args, **after_kwargs)
  440. except: # noqa: E722, as we reraise the exception this is fine.
  441. for after_callback, after_args, after_kwargs in exception_callbacks:
  442. after_callback(*after_args, **after_kwargs)
  443. raise
  444. return result
  445. @defer.inlineCallbacks
  446. def runWithConnection(self, func: Callable, *args: Any, **kwargs: Any):
  447. """Wraps the .runWithConnection() method on the underlying db_pool.
  448. Arguments:
  449. func: callback function, which will be called with a
  450. database connection (twisted.enterprise.adbapi.Connection) as
  451. its first argument, followed by `args` and `kwargs`.
  452. args: positional args to pass to `func`
  453. kwargs: named args to pass to `func`
  454. Returns:
  455. Deferred: The result of func
  456. """
  457. parent_context = current_context() # type: Optional[LoggingContextOrSentinel]
  458. if not parent_context:
  459. logger.warning(
  460. "Starting db connection from sentinel context: metrics will be lost"
  461. )
  462. parent_context = None
  463. start_time = monotonic_time()
  464. def inner_func(conn, *args, **kwargs):
  465. with LoggingContext("runWithConnection", parent_context) as context:
  466. sched_duration_sec = monotonic_time() - start_time
  467. sql_scheduling_timer.observe(sched_duration_sec)
  468. context.add_database_scheduled(sched_duration_sec)
  469. if self.engine.is_connection_closed(conn):
  470. logger.debug("Reconnecting closed database connection")
  471. conn.reconnect()
  472. return func(conn, *args, **kwargs)
  473. result = yield make_deferred_yieldable(
  474. self._db_pool.runWithConnection(inner_func, *args, **kwargs)
  475. )
  476. return result
  477. @staticmethod
  478. def cursor_to_dict(cursor):
  479. """Converts a SQL cursor into an list of dicts.
  480. Args:
  481. cursor : The DBAPI cursor which has executed a query.
  482. Returns:
  483. A list of dicts where the key is the column header.
  484. """
  485. col_headers = [intern(str(column[0])) for column in cursor.description]
  486. results = [dict(zip(col_headers, row)) for row in cursor]
  487. return results
  488. def execute(self, desc, decoder, query, *args):
  489. """Runs a single query for a result set.
  490. Args:
  491. decoder - The function which can resolve the cursor results to
  492. something meaningful.
  493. query - The query string to execute
  494. *args - Query args.
  495. Returns:
  496. The result of decoder(results)
  497. """
  498. def interaction(txn):
  499. txn.execute(query, args)
  500. if decoder:
  501. return decoder(txn)
  502. else:
  503. return txn.fetchall()
  504. return self.runInteraction(desc, interaction)
  505. # "Simple" SQL API methods that operate on a single table with no JOINs,
  506. # no complex WHERE clauses, just a dict of values for columns.
  507. @defer.inlineCallbacks
  508. def simple_insert(self, table, values, or_ignore=False, desc="simple_insert"):
  509. """Executes an INSERT query on the named table.
  510. Args:
  511. table : string giving the table name
  512. values : dict of new column names and values for them
  513. or_ignore : bool stating whether an exception should be raised
  514. when a conflicting row already exists. If True, False will be
  515. returned by the function instead
  516. desc : string giving a description of the transaction
  517. Returns:
  518. bool: Whether the row was inserted or not. Only useful when
  519. `or_ignore` is True
  520. """
  521. try:
  522. yield self.runInteraction(desc, self.simple_insert_txn, table, values)
  523. except self.engine.module.IntegrityError:
  524. # We have to do or_ignore flag at this layer, since we can't reuse
  525. # a cursor after we receive an error from the db.
  526. if not or_ignore:
  527. raise
  528. return False
  529. return True
  530. @staticmethod
  531. def simple_insert_txn(txn, table, values):
  532. keys, vals = zip(*values.items())
  533. sql = "INSERT INTO %s (%s) VALUES(%s)" % (
  534. table,
  535. ", ".join(k for k in keys),
  536. ", ".join("?" for _ in keys),
  537. )
  538. txn.execute(sql, vals)
  539. def simple_insert_many(self, table, values, desc):
  540. return self.runInteraction(desc, self.simple_insert_many_txn, table, values)
  541. @staticmethod
  542. def simple_insert_many_txn(txn, table, values):
  543. if not values:
  544. return
  545. # This is a *slight* abomination to get a list of tuples of key names
  546. # and a list of tuples of value names.
  547. #
  548. # i.e. [{"a": 1, "b": 2}, {"c": 3, "d": 4}]
  549. # => [("a", "b",), ("c", "d",)] and [(1, 2,), (3, 4,)]
  550. #
  551. # The sort is to ensure that we don't rely on dictionary iteration
  552. # order.
  553. keys, vals = zip(
  554. *[zip(*(sorted(i.items(), key=lambda kv: kv[0]))) for i in values if i]
  555. )
  556. for k in keys:
  557. if k != keys[0]:
  558. raise RuntimeError("All items must have the same keys")
  559. sql = "INSERT INTO %s (%s) VALUES(%s)" % (
  560. table,
  561. ", ".join(k for k in keys[0]),
  562. ", ".join("?" for _ in keys[0]),
  563. )
  564. txn.executemany(sql, vals)
  565. @defer.inlineCallbacks
  566. def simple_upsert(
  567. self,
  568. table,
  569. keyvalues,
  570. values,
  571. insertion_values={},
  572. desc="simple_upsert",
  573. lock=True,
  574. ):
  575. """
  576. `lock` should generally be set to True (the default), but can be set
  577. to False if either of the following are true:
  578. * there is a UNIQUE INDEX on the key columns. In this case a conflict
  579. will cause an IntegrityError in which case this function will retry
  580. the update.
  581. * we somehow know that we are the only thread which will be updating
  582. this table.
  583. Args:
  584. table (str): The table to upsert into
  585. keyvalues (dict): The unique key columns and their new values
  586. values (dict): The nonunique columns and their new values
  587. insertion_values (dict): additional key/values to use only when
  588. inserting
  589. lock (bool): True to lock the table when doing the upsert.
  590. Returns:
  591. Deferred(None or bool): Native upserts always return None. Emulated
  592. upserts return True if a new entry was created, False if an existing
  593. one was updated.
  594. """
  595. attempts = 0
  596. while True:
  597. try:
  598. result = yield self.runInteraction(
  599. desc,
  600. self.simple_upsert_txn,
  601. table,
  602. keyvalues,
  603. values,
  604. insertion_values,
  605. lock=lock,
  606. )
  607. return result
  608. except self.engine.module.IntegrityError as e:
  609. attempts += 1
  610. if attempts >= 5:
  611. # don't retry forever, because things other than races
  612. # can cause IntegrityErrors
  613. raise
  614. # presumably we raced with another transaction: let's retry.
  615. logger.warning(
  616. "IntegrityError when upserting into %s; retrying: %s", table, e
  617. )
  618. def simple_upsert_txn(
  619. self, txn, table, keyvalues, values, insertion_values={}, lock=True
  620. ):
  621. """
  622. Pick the UPSERT method which works best on the platform. Either the
  623. native one (Pg9.5+, recent SQLites), or fall back to an emulated method.
  624. Args:
  625. txn: The transaction to use.
  626. table (str): The table to upsert into
  627. keyvalues (dict): The unique key tables and their new values
  628. values (dict): The nonunique columns and their new values
  629. insertion_values (dict): additional key/values to use only when
  630. inserting
  631. lock (bool): True to lock the table when doing the upsert.
  632. Returns:
  633. None or bool: Native upserts always return None. Emulated
  634. upserts return True if a new entry was created, False if an existing
  635. one was updated.
  636. """
  637. if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
  638. return self.simple_upsert_txn_native_upsert(
  639. txn, table, keyvalues, values, insertion_values=insertion_values
  640. )
  641. else:
  642. return self.simple_upsert_txn_emulated(
  643. txn,
  644. table,
  645. keyvalues,
  646. values,
  647. insertion_values=insertion_values,
  648. lock=lock,
  649. )
  650. def simple_upsert_txn_emulated(
  651. self, txn, table, keyvalues, values, insertion_values={}, lock=True
  652. ):
  653. """
  654. Args:
  655. table (str): The table to upsert into
  656. keyvalues (dict): The unique key tables and their new values
  657. values (dict): The nonunique columns and their new values
  658. insertion_values (dict): additional key/values to use only when
  659. inserting
  660. lock (bool): True to lock the table when doing the upsert.
  661. Returns:
  662. bool: Return True if a new entry was created, False if an existing
  663. one was updated.
  664. """
  665. # We need to lock the table :(, unless we're *really* careful
  666. if lock:
  667. self.engine.lock_table(txn, table)
  668. def _getwhere(key):
  669. # If the value we're passing in is None (aka NULL), we need to use
  670. # IS, not =, as NULL = NULL equals NULL (False).
  671. if keyvalues[key] is None:
  672. return "%s IS ?" % (key,)
  673. else:
  674. return "%s = ?" % (key,)
  675. if not values:
  676. # If `values` is empty, then all of the values we care about are in
  677. # the unique key, so there is nothing to UPDATE. We can just do a
  678. # SELECT instead to see if it exists.
  679. sql = "SELECT 1 FROM %s WHERE %s" % (
  680. table,
  681. " AND ".join(_getwhere(k) for k in keyvalues),
  682. )
  683. sqlargs = list(keyvalues.values())
  684. txn.execute(sql, sqlargs)
  685. if txn.fetchall():
  686. # We have an existing record.
  687. return False
  688. else:
  689. # First try to update.
  690. sql = "UPDATE %s SET %s WHERE %s" % (
  691. table,
  692. ", ".join("%s = ?" % (k,) for k in values),
  693. " AND ".join(_getwhere(k) for k in keyvalues),
  694. )
  695. sqlargs = list(values.values()) + list(keyvalues.values())
  696. txn.execute(sql, sqlargs)
  697. if txn.rowcount > 0:
  698. # successfully updated at least one row.
  699. return False
  700. # We didn't find any existing rows, so insert a new one
  701. allvalues = {} # type: Dict[str, Any]
  702. allvalues.update(keyvalues)
  703. allvalues.update(values)
  704. allvalues.update(insertion_values)
  705. sql = "INSERT INTO %s (%s) VALUES (%s)" % (
  706. table,
  707. ", ".join(k for k in allvalues),
  708. ", ".join("?" for _ in allvalues),
  709. )
  710. txn.execute(sql, list(allvalues.values()))
  711. # successfully inserted
  712. return True
  713. def simple_upsert_txn_native_upsert(
  714. self, txn, table, keyvalues, values, insertion_values={}
  715. ):
  716. """
  717. Use the native UPSERT functionality in recent PostgreSQL versions.
  718. Args:
  719. table (str): The table to upsert into
  720. keyvalues (dict): The unique key tables and their new values
  721. values (dict): The nonunique columns and their new values
  722. insertion_values (dict): additional key/values to use only when
  723. inserting
  724. Returns:
  725. None
  726. """
  727. allvalues = {} # type: Dict[str, Any]
  728. allvalues.update(keyvalues)
  729. allvalues.update(insertion_values)
  730. if not values:
  731. latter = "NOTHING"
  732. else:
  733. allvalues.update(values)
  734. latter = "UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in values)
  735. sql = ("INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s") % (
  736. table,
  737. ", ".join(k for k in allvalues),
  738. ", ".join("?" for _ in allvalues),
  739. ", ".join(k for k in keyvalues),
  740. latter,
  741. )
  742. txn.execute(sql, list(allvalues.values()))
  743. def simple_upsert_many_txn(
  744. self,
  745. txn: LoggingTransaction,
  746. table: str,
  747. key_names: Collection[str],
  748. key_values: Collection[Iterable[Any]],
  749. value_names: Collection[str],
  750. value_values: Iterable[Iterable[str]],
  751. ) -> None:
  752. """
  753. Upsert, many times.
  754. Args:
  755. table: The table to upsert into
  756. key_names: The key column names.
  757. key_values: A list of each row's key column values.
  758. value_names: The value column names
  759. value_values: A list of each row's value column values.
  760. Ignored if value_names is empty.
  761. """
  762. if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
  763. return self.simple_upsert_many_txn_native_upsert(
  764. txn, table, key_names, key_values, value_names, value_values
  765. )
  766. else:
  767. return self.simple_upsert_many_txn_emulated(
  768. txn, table, key_names, key_values, value_names, value_values
  769. )
  770. def simple_upsert_many_txn_emulated(
  771. self,
  772. txn: LoggingTransaction,
  773. table: str,
  774. key_names: Iterable[str],
  775. key_values: Collection[Iterable[Any]],
  776. value_names: Collection[str],
  777. value_values: Iterable[Iterable[str]],
  778. ) -> None:
  779. """
  780. Upsert, many times, but without native UPSERT support or batching.
  781. Args:
  782. table: The table to upsert into
  783. key_names: The key column names.
  784. key_values: A list of each row's key column values.
  785. value_names: The value column names
  786. value_values: A list of each row's value column values.
  787. Ignored if value_names is empty.
  788. """
  789. # No value columns, therefore make a blank list so that the following
  790. # zip() works correctly.
  791. if not value_names:
  792. value_values = [() for x in range(len(key_values))]
  793. for keyv, valv in zip(key_values, value_values):
  794. _keys = {x: y for x, y in zip(key_names, keyv)}
  795. _vals = {x: y for x, y in zip(value_names, valv)}
  796. self.simple_upsert_txn_emulated(txn, table, _keys, _vals)
  797. def simple_upsert_many_txn_native_upsert(
  798. self,
  799. txn: LoggingTransaction,
  800. table: str,
  801. key_names: Collection[str],
  802. key_values: Collection[Iterable[Any]],
  803. value_names: Collection[str],
  804. value_values: Iterable[Iterable[Any]],
  805. ) -> None:
  806. """
  807. Upsert, many times, using batching where possible.
  808. Args:
  809. table: The table to upsert into
  810. key_names: The key column names.
  811. key_values: A list of each row's key column values.
  812. value_names: The value column names
  813. value_values: A list of each row's value column values.
  814. Ignored if value_names is empty.
  815. """
  816. allnames = [] # type: List[str]
  817. allnames.extend(key_names)
  818. allnames.extend(value_names)
  819. if not value_names:
  820. # No value columns, therefore make a blank list so that the
  821. # following zip() works correctly.
  822. latter = "NOTHING"
  823. value_values = [() for x in range(len(key_values))]
  824. else:
  825. latter = "UPDATE SET " + ", ".join(
  826. k + "=EXCLUDED." + k for k in value_names
  827. )
  828. sql = "INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s" % (
  829. table,
  830. ", ".join(k for k in allnames),
  831. ", ".join("?" for _ in allnames),
  832. ", ".join(key_names),
  833. latter,
  834. )
  835. args = []
  836. for x, y in zip(key_values, value_values):
  837. args.append(tuple(x) + tuple(y))
  838. return txn.execute_batch(sql, args)
  839. def simple_select_one(
  840. self, table, keyvalues, retcols, allow_none=False, desc="simple_select_one"
  841. ):
  842. """Executes a SELECT query on the named table, which is expected to
  843. return a single row, returning multiple columns from it.
  844. Args:
  845. table : string giving the table name
  846. keyvalues : dict of column names and values to select the row with
  847. retcols : list of strings giving the names of the columns to return
  848. allow_none : If true, return None instead of failing if the SELECT
  849. statement returns no rows
  850. """
  851. return self.runInteraction(
  852. desc, self.simple_select_one_txn, table, keyvalues, retcols, allow_none
  853. )
  854. def simple_select_one_onecol(
  855. self,
  856. table,
  857. keyvalues,
  858. retcol,
  859. allow_none=False,
  860. desc="simple_select_one_onecol",
  861. ):
  862. """Executes a SELECT query on the named table, which is expected to
  863. return a single row, returning a single column from it.
  864. Args:
  865. table : string giving the table name
  866. keyvalues : dict of column names and values to select the row with
  867. retcol : string giving the name of the column to return
  868. """
  869. return self.runInteraction(
  870. desc,
  871. self.simple_select_one_onecol_txn,
  872. table,
  873. keyvalues,
  874. retcol,
  875. allow_none=allow_none,
  876. )
  877. @classmethod
  878. def simple_select_one_onecol_txn(
  879. cls, txn, table, keyvalues, retcol, allow_none=False
  880. ):
  881. ret = cls.simple_select_onecol_txn(
  882. txn, table=table, keyvalues=keyvalues, retcol=retcol
  883. )
  884. if ret:
  885. return ret[0]
  886. else:
  887. if allow_none:
  888. return None
  889. else:
  890. raise StoreError(404, "No row found")
  891. @staticmethod
  892. def simple_select_onecol_txn(txn, table, keyvalues, retcol):
  893. sql = ("SELECT %(retcol)s FROM %(table)s") % {"retcol": retcol, "table": table}
  894. if keyvalues:
  895. sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
  896. txn.execute(sql, list(keyvalues.values()))
  897. else:
  898. txn.execute(sql)
  899. return [r[0] for r in txn]
  900. def simple_select_onecol(
  901. self, table, keyvalues, retcol, desc="simple_select_onecol"
  902. ):
  903. """Executes a SELECT query on the named table, which returns a list
  904. comprising of the values of the named column from the selected rows.
  905. Args:
  906. table (str): table name
  907. keyvalues (dict|None): column names and values to select the rows with
  908. retcol (str): column whos value we wish to retrieve.
  909. Returns:
  910. Deferred: Results in a list
  911. """
  912. return self.runInteraction(
  913. desc, self.simple_select_onecol_txn, table, keyvalues, retcol
  914. )
  915. def simple_select_list(self, table, keyvalues, retcols, desc="simple_select_list"):
  916. """Executes a SELECT query on the named table, which may return zero or
  917. more rows, returning the result as a list of dicts.
  918. Args:
  919. table (str): the table name
  920. keyvalues (dict[str, Any] | None):
  921. column names and values to select the rows with, or None to not
  922. apply a WHERE clause.
  923. retcols (iterable[str]): the names of the columns to return
  924. Returns:
  925. defer.Deferred: resolves to list[dict[str, Any]]
  926. """
  927. return self.runInteraction(
  928. desc, self.simple_select_list_txn, table, keyvalues, retcols
  929. )
  930. @classmethod
  931. def simple_select_list_txn(cls, txn, table, keyvalues, retcols):
  932. """Executes a SELECT query on the named table, which may return zero or
  933. more rows, returning the result as a list of dicts.
  934. Args:
  935. txn : Transaction object
  936. table (str): the table name
  937. keyvalues (dict[str, T] | None):
  938. column names and values to select the rows with, or None to not
  939. apply a WHERE clause.
  940. retcols (iterable[str]): the names of the columns to return
  941. """
  942. if keyvalues:
  943. sql = "SELECT %s FROM %s WHERE %s" % (
  944. ", ".join(retcols),
  945. table,
  946. " AND ".join("%s = ?" % (k,) for k in keyvalues),
  947. )
  948. txn.execute(sql, list(keyvalues.values()))
  949. else:
  950. sql = "SELECT %s FROM %s" % (", ".join(retcols), table)
  951. txn.execute(sql)
  952. return cls.cursor_to_dict(txn)
  953. @defer.inlineCallbacks
  954. def simple_select_many_batch(
  955. self,
  956. table,
  957. column,
  958. iterable,
  959. retcols,
  960. keyvalues={},
  961. desc="simple_select_many_batch",
  962. batch_size=100,
  963. ):
  964. """Executes a SELECT query on the named table, which may return zero or
  965. more rows, returning the result as a list of dicts.
  966. Filters rows by if value of `column` is in `iterable`.
  967. Args:
  968. table : string giving the table name
  969. column : column name to test for inclusion against `iterable`
  970. iterable : list
  971. keyvalues : dict of column names and values to select the rows with
  972. retcols : list of strings giving the names of the columns to return
  973. """
  974. results = [] # type: List[Dict[str, Any]]
  975. if not iterable:
  976. return results
  977. # iterables can not be sliced, so convert it to a list first
  978. it_list = list(iterable)
  979. chunks = [
  980. it_list[i : i + batch_size] for i in range(0, len(it_list), batch_size)
  981. ]
  982. for chunk in chunks:
  983. rows = yield self.runInteraction(
  984. desc,
  985. self.simple_select_many_txn,
  986. table,
  987. column,
  988. chunk,
  989. keyvalues,
  990. retcols,
  991. )
  992. results.extend(rows)
  993. return results
  994. @classmethod
  995. def simple_select_many_txn(cls, txn, table, column, iterable, keyvalues, retcols):
  996. """Executes a SELECT query on the named table, which may return zero or
  997. more rows, returning the result as a list of dicts.
  998. Filters rows by if value of `column` is in `iterable`.
  999. Args:
  1000. txn : Transaction object
  1001. table : string giving the table name
  1002. column : column name to test for inclusion against `iterable`
  1003. iterable : list
  1004. keyvalues : dict of column names and values to select the rows with
  1005. retcols : list of strings giving the names of the columns to return
  1006. """
  1007. if not iterable:
  1008. return []
  1009. clause, values = make_in_list_sql_clause(txn.database_engine, column, iterable)
  1010. clauses = [clause]
  1011. for key, value in keyvalues.items():
  1012. clauses.append("%s = ?" % (key,))
  1013. values.append(value)
  1014. sql = "SELECT %s FROM %s WHERE %s" % (
  1015. ", ".join(retcols),
  1016. table,
  1017. " AND ".join(clauses),
  1018. )
  1019. txn.execute(sql, values)
  1020. return cls.cursor_to_dict(txn)
  1021. def simple_update(self, table, keyvalues, updatevalues, desc):
  1022. return self.runInteraction(
  1023. desc, self.simple_update_txn, table, keyvalues, updatevalues
  1024. )
  1025. @staticmethod
  1026. def simple_update_txn(txn, table, keyvalues, updatevalues):
  1027. if keyvalues:
  1028. where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
  1029. else:
  1030. where = ""
  1031. update_sql = "UPDATE %s SET %s %s" % (
  1032. table,
  1033. ", ".join("%s = ?" % (k,) for k in updatevalues),
  1034. where,
  1035. )
  1036. txn.execute(update_sql, list(updatevalues.values()) + list(keyvalues.values()))
  1037. return txn.rowcount
  1038. def simple_update_one(
  1039. self, table, keyvalues, updatevalues, desc="simple_update_one"
  1040. ):
  1041. """Executes an UPDATE query on the named table, setting new values for
  1042. columns in a row matching the key values.
  1043. Args:
  1044. table : string giving the table name
  1045. keyvalues : dict of column names and values to select the row with
  1046. updatevalues : dict giving column names and values to update
  1047. retcols : optional list of column names to return
  1048. If present, retcols gives a list of column names on which to perform
  1049. a SELECT statement *before* performing the UPDATE statement. The values
  1050. of these will be returned in a dict.
  1051. These are performed within the same transaction, allowing an atomic
  1052. get-and-set. This can be used to implement compare-and-set by putting
  1053. the update column in the 'keyvalues' dict as well.
  1054. """
  1055. return self.runInteraction(
  1056. desc, self.simple_update_one_txn, table, keyvalues, updatevalues
  1057. )
  1058. @classmethod
  1059. def simple_update_one_txn(cls, txn, table, keyvalues, updatevalues):
  1060. rowcount = cls.simple_update_txn(txn, table, keyvalues, updatevalues)
  1061. if rowcount == 0:
  1062. raise StoreError(404, "No row found (%s)" % (table,))
  1063. if rowcount > 1:
  1064. raise StoreError(500, "More than one row matched (%s)" % (table,))
  1065. @staticmethod
  1066. def simple_select_one_txn(txn, table, keyvalues, retcols, allow_none=False):
  1067. select_sql = "SELECT %s FROM %s WHERE %s" % (
  1068. ", ".join(retcols),
  1069. table,
  1070. " AND ".join("%s = ?" % (k,) for k in keyvalues),
  1071. )
  1072. txn.execute(select_sql, list(keyvalues.values()))
  1073. row = txn.fetchone()
  1074. if not row:
  1075. if allow_none:
  1076. return None
  1077. raise StoreError(404, "No row found (%s)" % (table,))
  1078. if txn.rowcount > 1:
  1079. raise StoreError(500, "More than one row matched (%s)" % (table,))
  1080. return dict(zip(retcols, row))
  1081. def simple_delete_one(self, table, keyvalues, desc="simple_delete_one"):
  1082. """Executes a DELETE query on the named table, expecting to delete a
  1083. single row.
  1084. Args:
  1085. table : string giving the table name
  1086. keyvalues : dict of column names and values to select the row with
  1087. """
  1088. return self.runInteraction(desc, self.simple_delete_one_txn, table, keyvalues)
  1089. @staticmethod
  1090. def simple_delete_one_txn(txn, table, keyvalues):
  1091. """Executes a DELETE query on the named table, expecting to delete a
  1092. single row.
  1093. Args:
  1094. table : string giving the table name
  1095. keyvalues : dict of column names and values to select the row with
  1096. """
  1097. sql = "DELETE FROM %s WHERE %s" % (
  1098. table,
  1099. " AND ".join("%s = ?" % (k,) for k in keyvalues),
  1100. )
  1101. txn.execute(sql, list(keyvalues.values()))
  1102. if txn.rowcount == 0:
  1103. raise StoreError(404, "No row found (%s)" % (table,))
  1104. if txn.rowcount > 1:
  1105. raise StoreError(500, "More than one row matched (%s)" % (table,))
  1106. def simple_delete(self, table, keyvalues, desc):
  1107. return self.runInteraction(desc, self.simple_delete_txn, table, keyvalues)
  1108. @staticmethod
  1109. def simple_delete_txn(txn, table, keyvalues):
  1110. sql = "DELETE FROM %s WHERE %s" % (
  1111. table,
  1112. " AND ".join("%s = ?" % (k,) for k in keyvalues),
  1113. )
  1114. txn.execute(sql, list(keyvalues.values()))
  1115. return txn.rowcount
  1116. def simple_delete_many(self, table, column, iterable, keyvalues, desc):
  1117. return self.runInteraction(
  1118. desc, self.simple_delete_many_txn, table, column, iterable, keyvalues
  1119. )
  1120. @staticmethod
  1121. def simple_delete_many_txn(txn, table, column, iterable, keyvalues):
  1122. """Executes a DELETE query on the named table.
  1123. Filters rows by if value of `column` is in `iterable`.
  1124. Args:
  1125. txn : Transaction object
  1126. table : string giving the table name
  1127. column : column name to test for inclusion against `iterable`
  1128. iterable : list
  1129. keyvalues : dict of column names and values to select the rows with
  1130. Returns:
  1131. int: Number rows deleted
  1132. """
  1133. if not iterable:
  1134. return 0
  1135. sql = "DELETE FROM %s" % table
  1136. clause, values = make_in_list_sql_clause(txn.database_engine, column, iterable)
  1137. clauses = [clause]
  1138. for key, value in keyvalues.items():
  1139. clauses.append("%s = ?" % (key,))
  1140. values.append(value)
  1141. if clauses:
  1142. sql = "%s WHERE %s" % (sql, " AND ".join(clauses))
  1143. txn.execute(sql, values)
  1144. return txn.rowcount
  1145. def get_cache_dict(
  1146. self, db_conn, table, entity_column, stream_column, max_value, limit=100000
  1147. ):
  1148. # Fetch a mapping of room_id -> max stream position for "recent" rooms.
  1149. # It doesn't really matter how many we get, the StreamChangeCache will
  1150. # do the right thing to ensure it respects the max size of cache.
  1151. sql = (
  1152. "SELECT %(entity)s, MAX(%(stream)s) FROM %(table)s"
  1153. " WHERE %(stream)s > ? - %(limit)s"
  1154. " GROUP BY %(entity)s"
  1155. ) % {
  1156. "table": table,
  1157. "entity": entity_column,
  1158. "stream": stream_column,
  1159. "limit": limit,
  1160. }
  1161. sql = self.engine.convert_param_style(sql)
  1162. txn = db_conn.cursor()
  1163. txn.execute(sql, (int(max_value),))
  1164. cache = {row[0]: int(row[1]) for row in txn}
  1165. txn.close()
  1166. if cache:
  1167. min_val = min(cache.values())
  1168. else:
  1169. min_val = max_value
  1170. return cache, min_val
  1171. def simple_select_list_paginate(
  1172. self,
  1173. table,
  1174. orderby,
  1175. start,
  1176. limit,
  1177. retcols,
  1178. filters=None,
  1179. keyvalues=None,
  1180. order_direction="ASC",
  1181. desc="simple_select_list_paginate",
  1182. ):
  1183. """
  1184. Executes a SELECT query on the named table with start and limit,
  1185. of row numbers, which may return zero or number of rows from start to limit,
  1186. returning the result as a list of dicts.
  1187. Args:
  1188. table (str): the table name
  1189. filters (dict[str, T] | None):
  1190. column names and values to filter the rows with, or None to not
  1191. apply a WHERE ? LIKE ? clause.
  1192. keyvalues (dict[str, T] | None):
  1193. column names and values to select the rows with, or None to not
  1194. apply a WHERE clause.
  1195. orderby (str): Column to order the results by.
  1196. start (int): Index to begin the query at.
  1197. limit (int): Number of results to return.
  1198. retcols (iterable[str]): the names of the columns to return
  1199. order_direction (str): Whether the results should be ordered "ASC" or "DESC".
  1200. Returns:
  1201. defer.Deferred: resolves to list[dict[str, Any]]
  1202. """
  1203. return self.runInteraction(
  1204. desc,
  1205. self.simple_select_list_paginate_txn,
  1206. table,
  1207. orderby,
  1208. start,
  1209. limit,
  1210. retcols,
  1211. filters=filters,
  1212. keyvalues=keyvalues,
  1213. order_direction=order_direction,
  1214. )
  1215. @classmethod
  1216. def simple_select_list_paginate_txn(
  1217. cls,
  1218. txn,
  1219. table,
  1220. orderby,
  1221. start,
  1222. limit,
  1223. retcols,
  1224. filters=None,
  1225. keyvalues=None,
  1226. order_direction="ASC",
  1227. ):
  1228. """
  1229. Executes a SELECT query on the named table with start and limit,
  1230. of row numbers, which may return zero or number of rows from start to limit,
  1231. returning the result as a list of dicts.
  1232. Use `filters` to search attributes using SQL wildcards and/or `keyvalues` to
  1233. select attributes with exact matches. All constraints are joined together
  1234. using 'AND'.
  1235. Args:
  1236. txn : Transaction object
  1237. table (str): the table name
  1238. orderby (str): Column to order the results by.
  1239. start (int): Index to begin the query at.
  1240. limit (int): Number of results to return.
  1241. retcols (iterable[str]): the names of the columns to return
  1242. filters (dict[str, T] | None):
  1243. column names and values to filter the rows with, or None to not
  1244. apply a WHERE ? LIKE ? clause.
  1245. keyvalues (dict[str, T] | None):
  1246. column names and values to select the rows with, or None to not
  1247. apply a WHERE clause.
  1248. order_direction (str): Whether the results should be ordered "ASC" or "DESC".
  1249. Returns:
  1250. defer.Deferred: resolves to list[dict[str, Any]]
  1251. """
  1252. if order_direction not in ["ASC", "DESC"]:
  1253. raise ValueError("order_direction must be one of 'ASC' or 'DESC'.")
  1254. where_clause = "WHERE " if filters or keyvalues else ""
  1255. arg_list = [] # type: List[Any]
  1256. if filters:
  1257. where_clause += " AND ".join("%s LIKE ?" % (k,) for k in filters)
  1258. arg_list += list(filters.values())
  1259. where_clause += " AND " if filters and keyvalues else ""
  1260. if keyvalues:
  1261. where_clause += " AND ".join("%s = ?" % (k,) for k in keyvalues)
  1262. arg_list += list(keyvalues.values())
  1263. sql = "SELECT %s FROM %s %s ORDER BY %s %s LIMIT ? OFFSET ?" % (
  1264. ", ".join(retcols),
  1265. table,
  1266. where_clause,
  1267. orderby,
  1268. order_direction,
  1269. )
  1270. txn.execute(sql, arg_list + [limit, start])
  1271. return cls.cursor_to_dict(txn)
  1272. def simple_search_list(self, table, term, col, retcols, desc="simple_search_list"):
  1273. """Executes a SELECT query on the named table, which may return zero or
  1274. more rows, returning the result as a list of dicts.
  1275. Args:
  1276. table (str): the table name
  1277. term (str | None):
  1278. term for searching the table matched to a column.
  1279. col (str): column to query term should be matched to
  1280. retcols (iterable[str]): the names of the columns to return
  1281. Returns:
  1282. defer.Deferred: resolves to list[dict[str, Any]] or None
  1283. """
  1284. return self.runInteraction(
  1285. desc, self.simple_search_list_txn, table, term, col, retcols
  1286. )
  1287. @classmethod
  1288. def simple_search_list_txn(cls, txn, table, term, col, retcols):
  1289. """Executes a SELECT query on the named table, which may return zero or
  1290. more rows, returning the result as a list of dicts.
  1291. Args:
  1292. txn : Transaction object
  1293. table (str): the table name
  1294. term (str | None):
  1295. term for searching the table matched to a column.
  1296. col (str): column to query term should be matched to
  1297. retcols (iterable[str]): the names of the columns to return
  1298. Returns:
  1299. defer.Deferred: resolves to list[dict[str, Any]] or None
  1300. """
  1301. if term:
  1302. sql = "SELECT %s FROM %s WHERE %s LIKE ?" % (", ".join(retcols), table, col)
  1303. termvalues = ["%%" + term + "%%"]
  1304. txn.execute(sql, termvalues)
  1305. else:
  1306. return 0
  1307. return cls.cursor_to_dict(txn)
  1308. def make_in_list_sql_clause(
  1309. database_engine, column: str, iterable: Iterable
  1310. ) -> Tuple[str, list]:
  1311. """Returns an SQL clause that checks the given column is in the iterable.
  1312. On SQLite this expands to `column IN (?, ?, ...)`, whereas on Postgres
  1313. it expands to `column = ANY(?)`. While both DBs support the `IN` form,
  1314. using the `ANY` form on postgres means that it views queries with
  1315. different length iterables as the same, helping the query stats.
  1316. Args:
  1317. database_engine
  1318. column: Name of the column
  1319. iterable: The values to check the column against.
  1320. Returns:
  1321. A tuple of SQL query and the args
  1322. """
  1323. if database_engine.supports_using_any_list:
  1324. # This should hopefully be faster, but also makes postgres query
  1325. # stats easier to understand.
  1326. return "%s = ANY(?)" % (column,), [list(iterable)]
  1327. else:
  1328. return "%s IN (%s)" % (column, ",".join("?" for _ in iterable)), list(iterable)
  1329. KV = TypeVar("KV")
  1330. def make_tuple_comparison_clause(
  1331. database_engine: BaseDatabaseEngine, keys: List[Tuple[str, KV]]
  1332. ) -> Tuple[str, List[KV]]:
  1333. """Returns a tuple comparison SQL clause
  1334. Depending what the SQL engine supports, builds a SQL clause that looks like either
  1335. "(a, b) > (?, ?)", or "(a > ?) OR (a == ? AND b > ?)".
  1336. Args:
  1337. database_engine
  1338. keys: A set of (column, value) pairs to be compared.
  1339. Returns:
  1340. A tuple of SQL query and the args
  1341. """
  1342. if database_engine.supports_tuple_comparison:
  1343. return (
  1344. "(%s) > (%s)" % (",".join(k[0] for k in keys), ",".join("?" for _ in keys)),
  1345. [k[1] for k in keys],
  1346. )
  1347. # we want to build a clause
  1348. # (a > ?) OR
  1349. # (a == ? AND b > ?) OR
  1350. # (a == ? AND b == ? AND c > ?)
  1351. # ...
  1352. # (a == ? AND b == ? AND ... AND z > ?)
  1353. #
  1354. # or, equivalently:
  1355. #
  1356. # (a > ? OR (a == ? AND
  1357. # (b > ? OR (b == ? AND
  1358. # ...
  1359. # (y > ? OR (y == ? AND
  1360. # z > ?
  1361. # ))
  1362. # ...
  1363. # ))
  1364. # ))
  1365. #
  1366. # which itself is equivalent to (and apparently easier for the query optimiser):
  1367. #
  1368. # (a >= ? AND (a > ? OR
  1369. # (b >= ? AND (b > ? OR
  1370. # ...
  1371. # (y >= ? AND (y > ? OR
  1372. # z > ?
  1373. # ))
  1374. # ...
  1375. # ))
  1376. # ))
  1377. #
  1378. #
  1379. clause = ""
  1380. args = [] # type: List[KV]
  1381. for k, v in keys[:-1]:
  1382. clause = clause + "(%s >= ? AND (%s > ? OR " % (k, k)
  1383. args.extend([v, v])
  1384. (k, v) = keys[-1]
  1385. clause += "%s > ?" % (k,)
  1386. args.append(v)
  1387. clause += "))" * (len(keys) - 1)
  1388. return clause, args