database.py 76 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217
  1. # Copyright 2014-2016 OpenMarket Ltd
  2. # Copyright 2017-2018 New Vector Ltd
  3. # Copyright 2019 The Matrix.org Foundation C.I.C.
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import inspect
  17. import logging
  18. import time
  19. import types
  20. from collections import defaultdict
  21. from sys import intern
  22. from time import monotonic as monotonic_time
  23. from typing import (
  24. TYPE_CHECKING,
  25. Any,
  26. Callable,
  27. Collection,
  28. Dict,
  29. Iterable,
  30. Iterator,
  31. List,
  32. Optional,
  33. Tuple,
  34. TypeVar,
  35. cast,
  36. overload,
  37. )
  38. import attr
  39. from prometheus_client import Histogram
  40. from typing_extensions import Literal
  41. from twisted.enterprise import adbapi
  42. from twisted.internet import defer
  43. from synapse.api.errors import StoreError
  44. from synapse.config.database import DatabaseConnectionConfig
  45. from synapse.logging import opentracing
  46. from synapse.logging.context import (
  47. LoggingContext,
  48. current_context,
  49. make_deferred_yieldable,
  50. )
  51. from synapse.metrics import register_threadpool
  52. from synapse.metrics.background_process_metrics import run_as_background_process
  53. from synapse.storage.background_updates import BackgroundUpdater
  54. from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
  55. from synapse.storage.types import Connection, Cursor
  56. from synapse.util.async_helpers import delay_cancellation
  57. from synapse.util.iterutils import batch_iter
  58. if TYPE_CHECKING:
  59. from synapse.server import HomeServer
  60. # python 3 does not have a maximum int value
  61. MAX_TXN_ID = 2 ** 63 - 1
  62. logger = logging.getLogger(__name__)
  63. sql_logger = logging.getLogger("synapse.storage.SQL")
  64. transaction_logger = logging.getLogger("synapse.storage.txn")
  65. perf_logger = logging.getLogger("synapse.storage.TIME")
  66. sql_scheduling_timer = Histogram("synapse_storage_schedule_time", "sec")
  67. sql_query_timer = Histogram("synapse_storage_query_time", "sec", ["verb"])
  68. sql_txn_timer = Histogram("synapse_storage_transaction_time", "sec", ["desc"])
  69. # Unique indexes which have been added in background updates. Maps from table name
  70. # to the name of the background update which added the unique index to that table.
  71. #
  72. # This is used by the upsert logic to figure out which tables are safe to do a proper
  73. # UPSERT on: until the relevant background update has completed, we
  74. # have to emulate an upsert by locking the table.
  75. #
  76. UNIQUE_INDEX_BACKGROUND_UPDATES = {
  77. "user_ips": "user_ips_device_unique_index",
  78. "device_lists_remote_extremeties": "device_lists_remote_extremeties_unique_idx",
  79. "device_lists_remote_cache": "device_lists_remote_cache_unique_idx",
  80. "event_search": "event_search_event_id_idx",
  81. }
  82. def make_pool(
  83. reactor, db_config: DatabaseConnectionConfig, engine: BaseDatabaseEngine
  84. ) -> adbapi.ConnectionPool:
  85. """Get the connection pool for the database."""
  86. # By default enable `cp_reconnect`. We need to fiddle with db_args in case
  87. # someone has explicitly set `cp_reconnect`.
  88. db_args = dict(db_config.config.get("args", {}))
  89. db_args.setdefault("cp_reconnect", True)
  90. def _on_new_connection(conn):
  91. # Ensure we have a logging context so we can correctly track queries,
  92. # etc.
  93. with LoggingContext("db.on_new_connection"):
  94. engine.on_new_connection(
  95. LoggingDatabaseConnection(conn, engine, "on_new_connection")
  96. )
  97. connection_pool = adbapi.ConnectionPool(
  98. db_config.config["name"],
  99. cp_reactor=reactor,
  100. cp_openfun=_on_new_connection,
  101. **db_args,
  102. )
  103. register_threadpool(f"database-{db_config.name}", connection_pool.threadpool)
  104. return connection_pool
  105. def make_conn(
  106. db_config: DatabaseConnectionConfig,
  107. engine: BaseDatabaseEngine,
  108. default_txn_name: str,
  109. ) -> "LoggingDatabaseConnection":
  110. """Make a new connection to the database and return it.
  111. Returns:
  112. Connection
  113. """
  114. db_params = {
  115. k: v
  116. for k, v in db_config.config.get("args", {}).items()
  117. if not k.startswith("cp_")
  118. }
  119. native_db_conn = engine.module.connect(**db_params)
  120. db_conn = LoggingDatabaseConnection(native_db_conn, engine, default_txn_name)
  121. engine.on_new_connection(db_conn)
  122. return db_conn
  123. @attr.s(slots=True, auto_attribs=True)
  124. class LoggingDatabaseConnection:
  125. """A wrapper around a database connection that returns `LoggingTransaction`
  126. as its cursor class.
  127. This is mainly used on startup to ensure that queries get logged correctly
  128. """
  129. conn: Connection
  130. engine: BaseDatabaseEngine
  131. default_txn_name: str
  132. def cursor(
  133. self, *, txn_name=None, after_callbacks=None, exception_callbacks=None
  134. ) -> "LoggingTransaction":
  135. if not txn_name:
  136. txn_name = self.default_txn_name
  137. return LoggingTransaction(
  138. self.conn.cursor(),
  139. name=txn_name,
  140. database_engine=self.engine,
  141. after_callbacks=after_callbacks,
  142. exception_callbacks=exception_callbacks,
  143. )
  144. def close(self) -> None:
  145. self.conn.close()
  146. def commit(self) -> None:
  147. self.conn.commit()
  148. def rollback(self) -> None:
  149. self.conn.rollback()
  150. def __enter__(self) -> "LoggingDatabaseConnection":
  151. self.conn.__enter__()
  152. return self
  153. def __exit__(self, exc_type, exc_value, traceback) -> Optional[bool]:
  154. return self.conn.__exit__(exc_type, exc_value, traceback)
  155. # Proxy through any unknown lookups to the DB conn class.
  156. def __getattr__(self, name):
  157. return getattr(self.conn, name)
  158. # The type of entry which goes on our after_callbacks and exception_callbacks lists.
  159. _CallbackListEntry = Tuple[Callable[..., object], Iterable[Any], Dict[str, Any]]
  160. R = TypeVar("R")
  161. class LoggingTransaction:
  162. """An object that almost-transparently proxies for the 'txn' object
  163. passed to the constructor. Adds logging and metrics to the .execute()
  164. method.
  165. Args:
  166. txn: The database transaction object to wrap.
  167. name: The name of this transactions for logging.
  168. database_engine
  169. after_callbacks: A list that callbacks will be appended to
  170. that have been added by `call_after` which should be run on
  171. successful completion of the transaction. None indicates that no
  172. callbacks should be allowed to be scheduled to run.
  173. exception_callbacks: A list that callbacks will be appended
  174. to that have been added by `call_on_exception` which should be run
  175. if transaction ends with an error. None indicates that no callbacks
  176. should be allowed to be scheduled to run.
  177. """
  178. __slots__ = [
  179. "txn",
  180. "name",
  181. "database_engine",
  182. "after_callbacks",
  183. "exception_callbacks",
  184. ]
  185. def __init__(
  186. self,
  187. txn: Cursor,
  188. name: str,
  189. database_engine: BaseDatabaseEngine,
  190. after_callbacks: Optional[List[_CallbackListEntry]] = None,
  191. exception_callbacks: Optional[List[_CallbackListEntry]] = None,
  192. ):
  193. self.txn = txn
  194. self.name = name
  195. self.database_engine = database_engine
  196. self.after_callbacks = after_callbacks
  197. self.exception_callbacks = exception_callbacks
  198. def call_after(self, callback: Callable[..., object], *args: Any, **kwargs: Any):
  199. """Call the given callback on the main twisted thread after the
  200. transaction has finished. Used to invalidate the caches on the
  201. correct thread.
  202. """
  203. # if self.after_callbacks is None, that means that whatever constructed the
  204. # LoggingTransaction isn't expecting there to be any callbacks; assert that
  205. # is not the case.
  206. assert self.after_callbacks is not None
  207. self.after_callbacks.append((callback, args, kwargs))
  208. def call_on_exception(
  209. self, callback: Callable[..., object], *args: Any, **kwargs: Any
  210. ):
  211. # if self.exception_callbacks is None, that means that whatever constructed the
  212. # LoggingTransaction isn't expecting there to be any callbacks; assert that
  213. # is not the case.
  214. assert self.exception_callbacks is not None
  215. self.exception_callbacks.append((callback, args, kwargs))
  216. def fetchone(self) -> Optional[Tuple]:
  217. return self.txn.fetchone()
  218. def fetchmany(self, size: Optional[int] = None) -> List[Tuple]:
  219. return self.txn.fetchmany(size=size)
  220. def fetchall(self) -> List[Tuple]:
  221. return self.txn.fetchall()
  222. def __iter__(self) -> Iterator[Tuple]:
  223. return self.txn.__iter__()
  224. @property
  225. def rowcount(self) -> int:
  226. return self.txn.rowcount
  227. @property
  228. def description(self) -> Any:
  229. return self.txn.description
  230. def execute_batch(self, sql: str, args: Iterable[Iterable[Any]]) -> None:
  231. """Similar to `executemany`, except `txn.rowcount` will not be correct
  232. afterwards.
  233. More efficient than `executemany` on PostgreSQL
  234. """
  235. if isinstance(self.database_engine, PostgresEngine):
  236. from psycopg2.extras import execute_batch # type: ignore
  237. self._do_execute(lambda *x: execute_batch(self.txn, *x), sql, args)
  238. else:
  239. self.executemany(sql, args)
  240. def execute_values(self, sql: str, *args: Any, fetch: bool = True) -> List[Tuple]:
  241. """Corresponds to psycopg2.extras.execute_values. Only available when
  242. using postgres.
  243. The `fetch` parameter must be set to False if the query does not return
  244. rows (e.g. INSERTs).
  245. """
  246. assert isinstance(self.database_engine, PostgresEngine)
  247. from psycopg2.extras import execute_values # type: ignore
  248. return self._do_execute(
  249. lambda *x: execute_values(self.txn, *x, fetch=fetch), sql, *args
  250. )
  251. def execute(self, sql: str, *args: Any) -> None:
  252. self._do_execute(self.txn.execute, sql, *args)
  253. def executemany(self, sql: str, *args: Any) -> None:
  254. self._do_execute(self.txn.executemany, sql, *args)
  255. def _make_sql_one_line(self, sql: str) -> str:
  256. "Strip newlines out of SQL so that the loggers in the DB are on one line"
  257. return " ".join(line.strip() for line in sql.splitlines() if line.strip())
  258. def _do_execute(self, func: Callable[..., R], sql: str, *args: Any) -> R:
  259. sql = self._make_sql_one_line(sql)
  260. # TODO(paul): Maybe use 'info' and 'debug' for values?
  261. sql_logger.debug("[SQL] {%s} %s", self.name, sql)
  262. sql = self.database_engine.convert_param_style(sql)
  263. if args:
  264. try:
  265. sql_logger.debug("[SQL values] {%s} %r", self.name, args[0])
  266. except Exception:
  267. # Don't let logging failures stop SQL from working
  268. pass
  269. start = time.time()
  270. try:
  271. with opentracing.start_active_span(
  272. "db.query",
  273. tags={
  274. opentracing.tags.DATABASE_TYPE: "sql",
  275. opentracing.tags.DATABASE_STATEMENT: sql,
  276. },
  277. ):
  278. return func(sql, *args)
  279. except Exception as e:
  280. sql_logger.debug("[SQL FAIL] {%s} %s", self.name, e)
  281. raise
  282. finally:
  283. secs = time.time() - start
  284. sql_logger.debug("[SQL time] {%s} %f sec", self.name, secs)
  285. sql_query_timer.labels(sql.split()[0]).observe(secs)
  286. def close(self) -> None:
  287. self.txn.close()
  288. def __enter__(self) -> "LoggingTransaction":
  289. return self
  290. def __exit__(self, exc_type, exc_value, traceback):
  291. self.close()
  292. class PerformanceCounters:
  293. def __init__(self):
  294. self.current_counters = {}
  295. self.previous_counters = {}
  296. def update(self, key: str, duration_secs: float) -> None:
  297. count, cum_time = self.current_counters.get(key, (0, 0))
  298. count += 1
  299. cum_time += duration_secs
  300. self.current_counters[key] = (count, cum_time)
  301. def interval(self, interval_duration_secs: float, limit: int = 3) -> str:
  302. counters = []
  303. for name, (count, cum_time) in self.current_counters.items():
  304. prev_count, prev_time = self.previous_counters.get(name, (0, 0))
  305. counters.append(
  306. (
  307. (cum_time - prev_time) / interval_duration_secs,
  308. count - prev_count,
  309. name,
  310. )
  311. )
  312. self.previous_counters = dict(self.current_counters)
  313. counters.sort(reverse=True)
  314. top_n_counters = ", ".join(
  315. "%s(%d): %.3f%%" % (name, count, 100 * ratio)
  316. for ratio, count, name in counters[:limit]
  317. )
  318. return top_n_counters
  319. class DatabasePool:
  320. """Wraps a single physical database and connection pool.
  321. A single database may be used by multiple data stores.
  322. """
  323. _TXN_ID = 0
  324. def __init__(
  325. self,
  326. hs: "HomeServer",
  327. database_config: DatabaseConnectionConfig,
  328. engine: BaseDatabaseEngine,
  329. ):
  330. self.hs = hs
  331. self._clock = hs.get_clock()
  332. self._txn_limit = database_config.config.get("txn_limit", 0)
  333. self._database_config = database_config
  334. self._db_pool = make_pool(hs.get_reactor(), database_config, engine)
  335. self.updates = BackgroundUpdater(hs, self)
  336. self._previous_txn_total_time = 0.0
  337. self._current_txn_total_time = 0.0
  338. self._previous_loop_ts = 0.0
  339. # Transaction counter: key is the twisted thread id, value is the current count
  340. self._txn_counters: Dict[int, int] = defaultdict(int)
  341. # TODO(paul): These can eventually be removed once the metrics code
  342. # is running in mainline, and we have some nice monitoring frontends
  343. # to watch it
  344. self._txn_perf_counters = PerformanceCounters()
  345. self.engine = engine
  346. # A set of tables that are not safe to use native upserts in.
  347. self._unsafe_to_upsert_tables = set(UNIQUE_INDEX_BACKGROUND_UPDATES.keys())
  348. # We add the user_directory_search table to the blacklist on SQLite
  349. # because the existing search table does not have an index, making it
  350. # unsafe to use native upserts.
  351. if isinstance(self.engine, Sqlite3Engine):
  352. self._unsafe_to_upsert_tables.add("user_directory_search")
  353. if self.engine.can_native_upsert:
  354. # Check ASAP (and then later, every 1s) to see if we have finished
  355. # background updates of tables that aren't safe to update.
  356. self._clock.call_later(
  357. 0.0,
  358. run_as_background_process,
  359. "upsert_safety_check",
  360. self._check_safe_to_upsert,
  361. )
  362. def name(self) -> str:
  363. "Return the name of this database"
  364. return self._database_config.name
  365. def is_running(self) -> bool:
  366. """Is the database pool currently running"""
  367. return self._db_pool.running
  368. async def _check_safe_to_upsert(self) -> None:
  369. """
  370. Is it safe to use native UPSERT?
  371. If there are background updates, we will need to wait, as they may be
  372. the addition of indexes that set the UNIQUE constraint that we require.
  373. If the background updates have not completed, wait 15 sec and check again.
  374. """
  375. updates = await self.simple_select_list(
  376. "background_updates",
  377. keyvalues=None,
  378. retcols=["update_name"],
  379. desc="check_background_updates",
  380. )
  381. updates = [x["update_name"] for x in updates]
  382. for table, update_name in UNIQUE_INDEX_BACKGROUND_UPDATES.items():
  383. if update_name not in updates:
  384. logger.debug("Now safe to upsert in %s", table)
  385. self._unsafe_to_upsert_tables.discard(table)
  386. # If there's any updates still running, reschedule to run.
  387. if updates:
  388. self._clock.call_later(
  389. 15.0,
  390. run_as_background_process,
  391. "upsert_safety_check",
  392. self._check_safe_to_upsert,
  393. )
  394. def start_profiling(self) -> None:
  395. self._previous_loop_ts = monotonic_time()
  396. def loop():
  397. curr = self._current_txn_total_time
  398. prev = self._previous_txn_total_time
  399. self._previous_txn_total_time = curr
  400. time_now = monotonic_time()
  401. time_then = self._previous_loop_ts
  402. self._previous_loop_ts = time_now
  403. duration = time_now - time_then
  404. ratio = (curr - prev) / duration
  405. top_three_counters = self._txn_perf_counters.interval(duration, limit=3)
  406. perf_logger.debug(
  407. "Total database time: %.3f%% {%s}", ratio * 100, top_three_counters
  408. )
  409. self._clock.looping_call(loop, 10000)
  410. def new_transaction(
  411. self,
  412. conn: LoggingDatabaseConnection,
  413. desc: str,
  414. after_callbacks: List[_CallbackListEntry],
  415. exception_callbacks: List[_CallbackListEntry],
  416. func: Callable[..., R],
  417. *args: Any,
  418. **kwargs: Any,
  419. ) -> R:
  420. """Start a new database transaction with the given connection.
  421. Note: The given func may be called multiple times under certain
  422. failure modes. This is normally fine when in a standard transaction,
  423. but care must be taken if the connection is in `autocommit` mode that
  424. the function will correctly handle being aborted and retried half way
  425. through its execution.
  426. Similarly, the arguments to `func` (`args`, `kwargs`) should not be generators,
  427. since they could be evaluated multiple times (which would produce an empty
  428. result on the second or subsequent evaluation). Likewise, the closure of `func`
  429. must not reference any generators. This method attempts to detect such usage
  430. and will log an error.
  431. Args:
  432. conn
  433. desc
  434. after_callbacks
  435. exception_callbacks
  436. func
  437. *args
  438. **kwargs
  439. """
  440. # Robustness check: ensure that none of the arguments are generators, since that
  441. # will fail if we have to repeat the transaction.
  442. # For now, we just log an error, and hope that it works on the first attempt.
  443. # TODO: raise an exception.
  444. for i, arg in enumerate(args):
  445. if inspect.isgenerator(arg):
  446. logger.error(
  447. "Programming error: generator passed to new_transaction as "
  448. "argument %i to function %s",
  449. i,
  450. func,
  451. )
  452. for name, val in kwargs.items():
  453. if inspect.isgenerator(val):
  454. logger.error(
  455. "Programming error: generator passed to new_transaction as "
  456. "argument %s to function %s",
  457. name,
  458. func,
  459. )
  460. # also check variables referenced in func's closure
  461. if inspect.isfunction(func):
  462. f = cast(types.FunctionType, func)
  463. if f.__closure__:
  464. for i, cell in enumerate(f.__closure__):
  465. if inspect.isgenerator(cell.cell_contents):
  466. logger.error(
  467. "Programming error: function %s references generator %s "
  468. "via its closure",
  469. f,
  470. f.__code__.co_freevars[i],
  471. )
  472. start = monotonic_time()
  473. txn_id = self._TXN_ID
  474. # We don't really need these to be unique, so lets stop it from
  475. # growing really large.
  476. self._TXN_ID = (self._TXN_ID + 1) % (MAX_TXN_ID)
  477. name = "%s-%x" % (desc, txn_id)
  478. transaction_logger.debug("[TXN START] {%s}", name)
  479. try:
  480. i = 0
  481. N = 5
  482. while True:
  483. cursor = conn.cursor(
  484. txn_name=name,
  485. after_callbacks=after_callbacks,
  486. exception_callbacks=exception_callbacks,
  487. )
  488. try:
  489. with opentracing.start_active_span(
  490. "db.txn",
  491. tags={
  492. opentracing.SynapseTags.DB_TXN_DESC: desc,
  493. opentracing.SynapseTags.DB_TXN_ID: name,
  494. },
  495. ):
  496. r = func(cursor, *args, **kwargs)
  497. opentracing.log_kv({"message": "commit"})
  498. conn.commit()
  499. return r
  500. except self.engine.module.OperationalError as e:
  501. # This can happen if the database disappears mid
  502. # transaction.
  503. transaction_logger.warning(
  504. "[TXN OPERROR] {%s} %s %d/%d",
  505. name,
  506. e,
  507. i,
  508. N,
  509. )
  510. if i < N:
  511. i += 1
  512. try:
  513. with opentracing.start_active_span("db.rollback"):
  514. conn.rollback()
  515. except self.engine.module.Error as e1:
  516. transaction_logger.warning("[TXN EROLL] {%s} %s", name, e1)
  517. continue
  518. raise
  519. except self.engine.module.DatabaseError as e:
  520. if self.engine.is_deadlock(e):
  521. transaction_logger.warning(
  522. "[TXN DEADLOCK] {%s} %d/%d", name, i, N
  523. )
  524. if i < N:
  525. i += 1
  526. try:
  527. with opentracing.start_active_span("db.rollback"):
  528. conn.rollback()
  529. except self.engine.module.Error as e1:
  530. transaction_logger.warning(
  531. "[TXN EROLL] {%s} %s",
  532. name,
  533. e1,
  534. )
  535. continue
  536. raise
  537. finally:
  538. # we're either about to retry with a new cursor, or we're about to
  539. # release the connection. Once we release the connection, it could
  540. # get used for another query, which might do a conn.rollback().
  541. #
  542. # In the latter case, even though that probably wouldn't affect the
  543. # results of this transaction, python's sqlite will reset all
  544. # statements on the connection [1], which will make our cursor
  545. # invalid [2].
  546. #
  547. # In any case, continuing to read rows after commit()ing seems
  548. # dubious from the PoV of ACID transactional semantics
  549. # (sqlite explicitly says that once you commit, you may see rows
  550. # from subsequent updates.)
  551. #
  552. # In psycopg2, cursors are essentially a client-side fabrication -
  553. # all the data is transferred to the client side when the statement
  554. # finishes executing - so in theory we could go on streaming results
  555. # from the cursor, but attempting to do so would make us
  556. # incompatible with sqlite, so let's make sure we're not doing that
  557. # by closing the cursor.
  558. #
  559. # (*named* cursors in psycopg2 are different and are proper server-
  560. # side things, but (a) we don't use them and (b) they are implicitly
  561. # closed by ending the transaction anyway.)
  562. #
  563. # In short, if we haven't finished with the cursor yet, that's a
  564. # problem waiting to bite us.
  565. #
  566. # TL;DR: we're done with the cursor, so we can close it.
  567. #
  568. # [1]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/connection.c#L465
  569. # [2]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/cursor.c#L236
  570. cursor.close()
  571. except Exception as e:
  572. transaction_logger.debug("[TXN FAIL] {%s} %s", name, e)
  573. raise
  574. finally:
  575. end = monotonic_time()
  576. duration = end - start
  577. current_context().add_database_transaction(duration)
  578. transaction_logger.debug("[TXN END] {%s} %f sec", name, duration)
  579. self._current_txn_total_time += duration
  580. self._txn_perf_counters.update(desc, duration)
  581. sql_txn_timer.labels(desc).observe(duration)
  582. async def runInteraction(
  583. self,
  584. desc: str,
  585. func: Callable[..., R],
  586. *args: Any,
  587. db_autocommit: bool = False,
  588. isolation_level: Optional[int] = None,
  589. **kwargs: Any,
  590. ) -> R:
  591. """Starts a transaction on the database and runs a given function
  592. Arguments:
  593. desc: description of the transaction, for logging and metrics
  594. func: callback function, which will be called with a
  595. database transaction (twisted.enterprise.adbapi.Transaction) as
  596. its first argument, followed by `args` and `kwargs`.
  597. db_autocommit: Whether to run the function in "autocommit" mode,
  598. i.e. outside of a transaction. This is useful for transactions
  599. that are only a single query.
  600. Currently, this is only implemented for Postgres. SQLite will still
  601. run the function inside a transaction.
  602. WARNING: This means that if func fails half way through then
  603. the changes will *not* be rolled back. `func` may also get
  604. called multiple times if the transaction is retried, so must
  605. correctly handle that case.
  606. isolation_level: Set the server isolation level for this transaction.
  607. args: positional args to pass to `func`
  608. kwargs: named args to pass to `func`
  609. Returns:
  610. The result of func
  611. """
  612. async def _runInteraction() -> R:
  613. after_callbacks: List[_CallbackListEntry] = []
  614. exception_callbacks: List[_CallbackListEntry] = []
  615. if not current_context():
  616. logger.warning("Starting db txn '%s' from sentinel context", desc)
  617. try:
  618. with opentracing.start_active_span(f"db.{desc}"):
  619. result = await self.runWithConnection(
  620. self.new_transaction,
  621. desc,
  622. after_callbacks,
  623. exception_callbacks,
  624. func,
  625. *args,
  626. db_autocommit=db_autocommit,
  627. isolation_level=isolation_level,
  628. **kwargs,
  629. )
  630. for after_callback, after_args, after_kwargs in after_callbacks:
  631. after_callback(*after_args, **after_kwargs)
  632. return cast(R, result)
  633. except Exception:
  634. for after_callback, after_args, after_kwargs in exception_callbacks:
  635. after_callback(*after_args, **after_kwargs)
  636. raise
  637. # To handle cancellation, we ensure that `after_callback`s and
  638. # `exception_callback`s are always run, since the transaction will complete
  639. # on another thread regardless of cancellation.
  640. #
  641. # We also wait until everything above is done before releasing the
  642. # `CancelledError`, so that logging contexts won't get used after they have been
  643. # finished.
  644. return await delay_cancellation(defer.ensureDeferred(_runInteraction()))
  645. async def runWithConnection(
  646. self,
  647. func: Callable[..., R],
  648. *args: Any,
  649. db_autocommit: bool = False,
  650. isolation_level: Optional[int] = None,
  651. **kwargs: Any,
  652. ) -> R:
  653. """Wraps the .runWithConnection() method on the underlying db_pool.
  654. Arguments:
  655. func: callback function, which will be called with a
  656. database connection (twisted.enterprise.adbapi.Connection) as
  657. its first argument, followed by `args` and `kwargs`.
  658. args: positional args to pass to `func`
  659. db_autocommit: Whether to run the function in "autocommit" mode,
  660. i.e. outside of a transaction. This is useful for transaction
  661. that are only a single query. Currently only affects postgres.
  662. isolation_level: Set the server isolation level for this transaction.
  663. kwargs: named args to pass to `func`
  664. Returns:
  665. The result of func
  666. """
  667. curr_context = current_context()
  668. if not curr_context:
  669. logger.warning(
  670. "Starting db connection from sentinel context: metrics will be lost"
  671. )
  672. parent_context = None
  673. else:
  674. assert isinstance(curr_context, LoggingContext)
  675. parent_context = curr_context
  676. start_time = monotonic_time()
  677. def inner_func(conn, *args, **kwargs):
  678. # We shouldn't be in a transaction. If we are then something
  679. # somewhere hasn't committed after doing work. (This is likely only
  680. # possible during startup, as `run*` will ensure changes are
  681. # committed/rolled back before putting the connection back in the
  682. # pool).
  683. assert not self.engine.in_transaction(conn)
  684. with LoggingContext(
  685. str(curr_context), parent_context=parent_context
  686. ) as context:
  687. with opentracing.start_active_span(
  688. operation_name="db.connection",
  689. ):
  690. sched_duration_sec = monotonic_time() - start_time
  691. sql_scheduling_timer.observe(sched_duration_sec)
  692. context.add_database_scheduled(sched_duration_sec)
  693. if self._txn_limit > 0:
  694. tid = self._db_pool.threadID()
  695. self._txn_counters[tid] += 1
  696. if self._txn_counters[tid] > self._txn_limit:
  697. logger.debug(
  698. "Reconnecting database connection over transaction limit"
  699. )
  700. conn.reconnect()
  701. opentracing.log_kv(
  702. {"message": "reconnected due to txn limit"}
  703. )
  704. self._txn_counters[tid] = 1
  705. if self.engine.is_connection_closed(conn):
  706. logger.debug("Reconnecting closed database connection")
  707. conn.reconnect()
  708. opentracing.log_kv({"message": "reconnected"})
  709. if self._txn_limit > 0:
  710. self._txn_counters[tid] = 1
  711. try:
  712. if db_autocommit:
  713. self.engine.attempt_to_set_autocommit(conn, True)
  714. if isolation_level is not None:
  715. self.engine.attempt_to_set_isolation_level(
  716. conn, isolation_level
  717. )
  718. db_conn = LoggingDatabaseConnection(
  719. conn, self.engine, "runWithConnection"
  720. )
  721. return func(db_conn, *args, **kwargs)
  722. finally:
  723. if db_autocommit:
  724. self.engine.attempt_to_set_autocommit(conn, False)
  725. if isolation_level:
  726. self.engine.attempt_to_set_isolation_level(conn, None)
  727. return await make_deferred_yieldable(
  728. self._db_pool.runWithConnection(inner_func, *args, **kwargs)
  729. )
  730. @staticmethod
  731. def cursor_to_dict(cursor: Cursor) -> List[Dict[str, Any]]:
  732. """Converts a SQL cursor into an list of dicts.
  733. Args:
  734. cursor: The DBAPI cursor which has executed a query.
  735. Returns:
  736. A list of dicts where the key is the column header.
  737. """
  738. assert cursor.description is not None, "cursor.description was None"
  739. col_headers = [intern(str(column[0])) for column in cursor.description]
  740. results = [dict(zip(col_headers, row)) for row in cursor]
  741. return results
  742. @overload
  743. async def execute(
  744. self, desc: str, decoder: Literal[None], query: str, *args: Any
  745. ) -> List[Tuple[Any, ...]]:
  746. ...
  747. @overload
  748. async def execute(
  749. self, desc: str, decoder: Callable[[Cursor], R], query: str, *args: Any
  750. ) -> R:
  751. ...
  752. async def execute(
  753. self,
  754. desc: str,
  755. decoder: Optional[Callable[[Cursor], R]],
  756. query: str,
  757. *args: Any,
  758. ) -> R:
  759. """Runs a single query for a result set.
  760. Args:
  761. desc: description of the transaction, for logging and metrics
  762. decoder - The function which can resolve the cursor results to
  763. something meaningful.
  764. query - The query string to execute
  765. *args - Query args.
  766. Returns:
  767. The result of decoder(results)
  768. """
  769. def interaction(txn):
  770. txn.execute(query, args)
  771. if decoder:
  772. return decoder(txn)
  773. else:
  774. return txn.fetchall()
  775. return await self.runInteraction(desc, interaction)
  776. # "Simple" SQL API methods that operate on a single table with no JOINs,
  777. # no complex WHERE clauses, just a dict of values for columns.
  778. async def simple_insert(
  779. self,
  780. table: str,
  781. values: Dict[str, Any],
  782. desc: str = "simple_insert",
  783. ) -> None:
  784. """Executes an INSERT query on the named table.
  785. Args:
  786. table: string giving the table name
  787. values: dict of new column names and values for them
  788. desc: description of the transaction, for logging and metrics
  789. """
  790. await self.runInteraction(desc, self.simple_insert_txn, table, values)
  791. @staticmethod
  792. def simple_insert_txn(
  793. txn: LoggingTransaction, table: str, values: Dict[str, Any]
  794. ) -> None:
  795. keys, vals = zip(*values.items())
  796. sql = "INSERT INTO %s (%s) VALUES(%s)" % (
  797. table,
  798. ", ".join(k for k in keys),
  799. ", ".join("?" for _ in keys),
  800. )
  801. txn.execute(sql, vals)
  802. async def simple_insert_many(
  803. self,
  804. table: str,
  805. keys: Collection[str],
  806. values: Collection[Collection[Any]],
  807. desc: str,
  808. ) -> None:
  809. """Executes an INSERT query on the named table.
  810. The input is given as a list of rows, where each row is a list of values.
  811. (Actually any iterable is fine.)
  812. Args:
  813. table: string giving the table name
  814. keys: list of column names
  815. values: for each row, a list of values in the same order as `keys`
  816. desc: description of the transaction, for logging and metrics
  817. """
  818. await self.runInteraction(
  819. desc, self.simple_insert_many_txn, table, keys, values
  820. )
  821. @staticmethod
  822. def simple_insert_many_txn(
  823. txn: LoggingTransaction,
  824. table: str,
  825. keys: Collection[str],
  826. values: Iterable[Iterable[Any]],
  827. ) -> None:
  828. """Executes an INSERT query on the named table.
  829. The input is given as a list of rows, where each row is a list of values.
  830. (Actually any iterable is fine.)
  831. Args:
  832. txn: The transaction to use.
  833. table: string giving the table name
  834. keys: list of column names
  835. values: for each row, a list of values in the same order as `keys`
  836. """
  837. if isinstance(txn.database_engine, PostgresEngine):
  838. # We use `execute_values` as it can be a lot faster than `execute_batch`,
  839. # but it's only available on postgres.
  840. sql = "INSERT INTO %s (%s) VALUES ?" % (
  841. table,
  842. ", ".join(k for k in keys),
  843. )
  844. txn.execute_values(sql, values, fetch=False)
  845. else:
  846. sql = "INSERT INTO %s (%s) VALUES(%s)" % (
  847. table,
  848. ", ".join(k for k in keys),
  849. ", ".join("?" for _ in keys),
  850. )
  851. txn.execute_batch(sql, values)
  852. async def simple_upsert(
  853. self,
  854. table: str,
  855. keyvalues: Dict[str, Any],
  856. values: Dict[str, Any],
  857. insertion_values: Optional[Dict[str, Any]] = None,
  858. desc: str = "simple_upsert",
  859. lock: bool = True,
  860. ) -> bool:
  861. """
  862. `lock` should generally be set to True (the default), but can be set
  863. to False if either of the following are true:
  864. 1. there is a UNIQUE INDEX on the key columns. In this case a conflict
  865. will cause an IntegrityError in which case this function will retry
  866. the update.
  867. 2. we somehow know that we are the only thread which will be updating
  868. this table.
  869. As an additional note, this parameter only matters for old SQLite versions
  870. because we will use native upserts otherwise.
  871. Args:
  872. table: The table to upsert into
  873. keyvalues: The unique key columns and their new values
  874. values: The nonunique columns and their new values
  875. insertion_values: additional key/values to use only when inserting
  876. desc: description of the transaction, for logging and metrics
  877. lock: True to lock the table when doing the upsert.
  878. Returns:
  879. Returns True if a row was inserted or updated (i.e. if `values` is
  880. not empty then this always returns True)
  881. """
  882. insertion_values = insertion_values or {}
  883. attempts = 0
  884. while True:
  885. try:
  886. # We can autocommit if we are going to use native upserts
  887. autocommit = (
  888. self.engine.can_native_upsert
  889. and table not in self._unsafe_to_upsert_tables
  890. )
  891. return await self.runInteraction(
  892. desc,
  893. self.simple_upsert_txn,
  894. table,
  895. keyvalues,
  896. values,
  897. insertion_values,
  898. lock=lock,
  899. db_autocommit=autocommit,
  900. )
  901. except self.engine.module.IntegrityError as e:
  902. attempts += 1
  903. if attempts >= 5:
  904. # don't retry forever, because things other than races
  905. # can cause IntegrityErrors
  906. raise
  907. # presumably we raced with another transaction: let's retry.
  908. logger.warning(
  909. "IntegrityError when upserting into %s; retrying: %s", table, e
  910. )
  911. def simple_upsert_txn(
  912. self,
  913. txn: LoggingTransaction,
  914. table: str,
  915. keyvalues: Dict[str, Any],
  916. values: Dict[str, Any],
  917. insertion_values: Optional[Dict[str, Any]] = None,
  918. lock: bool = True,
  919. ) -> bool:
  920. """
  921. Pick the UPSERT method which works best on the platform. Either the
  922. native one (Pg9.5+, recent SQLites), or fall back to an emulated method.
  923. Args:
  924. txn: The transaction to use.
  925. table: The table to upsert into
  926. keyvalues: The unique key tables and their new values
  927. values: The nonunique columns and their new values
  928. insertion_values: additional key/values to use only when inserting
  929. lock: True to lock the table when doing the upsert.
  930. Returns:
  931. Returns True if a row was inserted or updated (i.e. if `values` is
  932. not empty then this always returns True)
  933. """
  934. insertion_values = insertion_values or {}
  935. if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
  936. return self.simple_upsert_txn_native_upsert(
  937. txn, table, keyvalues, values, insertion_values=insertion_values
  938. )
  939. else:
  940. return self.simple_upsert_txn_emulated(
  941. txn,
  942. table,
  943. keyvalues,
  944. values,
  945. insertion_values=insertion_values,
  946. lock=lock,
  947. )
  948. def simple_upsert_txn_emulated(
  949. self,
  950. txn: LoggingTransaction,
  951. table: str,
  952. keyvalues: Dict[str, Any],
  953. values: Dict[str, Any],
  954. insertion_values: Optional[Dict[str, Any]] = None,
  955. lock: bool = True,
  956. ) -> bool:
  957. """
  958. Args:
  959. table: The table to upsert into
  960. keyvalues: The unique key tables and their new values
  961. values: The nonunique columns and their new values
  962. insertion_values: additional key/values to use only when inserting
  963. lock: True to lock the table when doing the upsert.
  964. Returns:
  965. Returns True if a row was inserted or updated (i.e. if `values` is
  966. not empty then this always returns True)
  967. """
  968. insertion_values = insertion_values or {}
  969. # We need to lock the table :(, unless we're *really* careful
  970. if lock:
  971. self.engine.lock_table(txn, table)
  972. def _getwhere(key):
  973. # If the value we're passing in is None (aka NULL), we need to use
  974. # IS, not =, as NULL = NULL equals NULL (False).
  975. if keyvalues[key] is None:
  976. return "%s IS ?" % (key,)
  977. else:
  978. return "%s = ?" % (key,)
  979. if not values:
  980. # If `values` is empty, then all of the values we care about are in
  981. # the unique key, so there is nothing to UPDATE. We can just do a
  982. # SELECT instead to see if it exists.
  983. sql = "SELECT 1 FROM %s WHERE %s" % (
  984. table,
  985. " AND ".join(_getwhere(k) for k in keyvalues),
  986. )
  987. sqlargs = list(keyvalues.values())
  988. txn.execute(sql, sqlargs)
  989. if txn.fetchall():
  990. # We have an existing record.
  991. return False
  992. else:
  993. # First try to update.
  994. sql = "UPDATE %s SET %s WHERE %s" % (
  995. table,
  996. ", ".join("%s = ?" % (k,) for k in values),
  997. " AND ".join(_getwhere(k) for k in keyvalues),
  998. )
  999. sqlargs = list(values.values()) + list(keyvalues.values())
  1000. txn.execute(sql, sqlargs)
  1001. if txn.rowcount > 0:
  1002. return True
  1003. # We didn't find any existing rows, so insert a new one
  1004. allvalues: Dict[str, Any] = {}
  1005. allvalues.update(keyvalues)
  1006. allvalues.update(values)
  1007. allvalues.update(insertion_values)
  1008. sql = "INSERT INTO %s (%s) VALUES (%s)" % (
  1009. table,
  1010. ", ".join(k for k in allvalues),
  1011. ", ".join("?" for _ in allvalues),
  1012. )
  1013. txn.execute(sql, list(allvalues.values()))
  1014. # successfully inserted
  1015. return True
  1016. def simple_upsert_txn_native_upsert(
  1017. self,
  1018. txn: LoggingTransaction,
  1019. table: str,
  1020. keyvalues: Dict[str, Any],
  1021. values: Dict[str, Any],
  1022. insertion_values: Optional[Dict[str, Any]] = None,
  1023. ) -> bool:
  1024. """
  1025. Use the native UPSERT functionality in PostgreSQL.
  1026. Args:
  1027. table: The table to upsert into
  1028. keyvalues: The unique key tables and their new values
  1029. values: The nonunique columns and their new values
  1030. insertion_values: additional key/values to use only when inserting
  1031. Returns:
  1032. Returns True if a row was inserted or updated (i.e. if `values` is
  1033. not empty then this always returns True)
  1034. """
  1035. allvalues: Dict[str, Any] = {}
  1036. allvalues.update(keyvalues)
  1037. allvalues.update(insertion_values or {})
  1038. if not values:
  1039. latter = "NOTHING"
  1040. else:
  1041. allvalues.update(values)
  1042. latter = "UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in values)
  1043. sql = ("INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s") % (
  1044. table,
  1045. ", ".join(k for k in allvalues),
  1046. ", ".join("?" for _ in allvalues),
  1047. ", ".join(k for k in keyvalues),
  1048. latter,
  1049. )
  1050. txn.execute(sql, list(allvalues.values()))
  1051. return bool(txn.rowcount)
  1052. async def simple_upsert_many(
  1053. self,
  1054. table: str,
  1055. key_names: Collection[str],
  1056. key_values: Collection[Collection[Any]],
  1057. value_names: Collection[str],
  1058. value_values: Collection[Collection[Any]],
  1059. desc: str,
  1060. ) -> None:
  1061. """
  1062. Upsert, many times.
  1063. Args:
  1064. table: The table to upsert into
  1065. key_names: The key column names.
  1066. key_values: A list of each row's key column values.
  1067. value_names: The value column names
  1068. value_values: A list of each row's value column values.
  1069. Ignored if value_names is empty.
  1070. """
  1071. # We can autocommit if we are going to use native upserts
  1072. autocommit = (
  1073. self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables
  1074. )
  1075. return await self.runInteraction(
  1076. desc,
  1077. self.simple_upsert_many_txn,
  1078. table,
  1079. key_names,
  1080. key_values,
  1081. value_names,
  1082. value_values,
  1083. db_autocommit=autocommit,
  1084. )
  1085. def simple_upsert_many_txn(
  1086. self,
  1087. txn: LoggingTransaction,
  1088. table: str,
  1089. key_names: Collection[str],
  1090. key_values: Collection[Iterable[Any]],
  1091. value_names: Collection[str],
  1092. value_values: Iterable[Iterable[Any]],
  1093. ) -> None:
  1094. """
  1095. Upsert, many times.
  1096. Args:
  1097. table: The table to upsert into
  1098. key_names: The key column names.
  1099. key_values: A list of each row's key column values.
  1100. value_names: The value column names
  1101. value_values: A list of each row's value column values.
  1102. Ignored if value_names is empty.
  1103. """
  1104. if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
  1105. return self.simple_upsert_many_txn_native_upsert(
  1106. txn, table, key_names, key_values, value_names, value_values
  1107. )
  1108. else:
  1109. return self.simple_upsert_many_txn_emulated(
  1110. txn, table, key_names, key_values, value_names, value_values
  1111. )
  1112. def simple_upsert_many_txn_emulated(
  1113. self,
  1114. txn: LoggingTransaction,
  1115. table: str,
  1116. key_names: Iterable[str],
  1117. key_values: Collection[Iterable[Any]],
  1118. value_names: Collection[str],
  1119. value_values: Iterable[Iterable[Any]],
  1120. ) -> None:
  1121. """
  1122. Upsert, many times, but without native UPSERT support or batching.
  1123. Args:
  1124. table: The table to upsert into
  1125. key_names: The key column names.
  1126. key_values: A list of each row's key column values.
  1127. value_names: The value column names
  1128. value_values: A list of each row's value column values.
  1129. Ignored if value_names is empty.
  1130. """
  1131. # No value columns, therefore make a blank list so that the following
  1132. # zip() works correctly.
  1133. if not value_names:
  1134. value_values = [() for x in range(len(key_values))]
  1135. for keyv, valv in zip(key_values, value_values):
  1136. _keys = {x: y for x, y in zip(key_names, keyv)}
  1137. _vals = {x: y for x, y in zip(value_names, valv)}
  1138. self.simple_upsert_txn_emulated(txn, table, _keys, _vals)
  1139. def simple_upsert_many_txn_native_upsert(
  1140. self,
  1141. txn: LoggingTransaction,
  1142. table: str,
  1143. key_names: Collection[str],
  1144. key_values: Collection[Iterable[Any]],
  1145. value_names: Collection[str],
  1146. value_values: Iterable[Iterable[Any]],
  1147. ) -> None:
  1148. """
  1149. Upsert, many times, using batching where possible.
  1150. Args:
  1151. table: The table to upsert into
  1152. key_names: The key column names.
  1153. key_values: A list of each row's key column values.
  1154. value_names: The value column names
  1155. value_values: A list of each row's value column values.
  1156. Ignored if value_names is empty.
  1157. """
  1158. allnames: List[str] = []
  1159. allnames.extend(key_names)
  1160. allnames.extend(value_names)
  1161. if not value_names:
  1162. # No value columns, therefore make a blank list so that the
  1163. # following zip() works correctly.
  1164. latter = "NOTHING"
  1165. value_values = [() for x in range(len(key_values))]
  1166. else:
  1167. latter = "UPDATE SET " + ", ".join(
  1168. k + "=EXCLUDED." + k for k in value_names
  1169. )
  1170. args = []
  1171. for x, y in zip(key_values, value_values):
  1172. args.append(tuple(x) + tuple(y))
  1173. if isinstance(txn.database_engine, PostgresEngine):
  1174. # We use `execute_values` as it can be a lot faster than `execute_batch`,
  1175. # but it's only available on postgres.
  1176. sql = "INSERT INTO %s (%s) VALUES ? ON CONFLICT (%s) DO %s" % (
  1177. table,
  1178. ", ".join(k for k in allnames),
  1179. ", ".join(key_names),
  1180. latter,
  1181. )
  1182. txn.execute_values(sql, args, fetch=False)
  1183. else:
  1184. sql = "INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s" % (
  1185. table,
  1186. ", ".join(k for k in allnames),
  1187. ", ".join("?" for _ in allnames),
  1188. ", ".join(key_names),
  1189. latter,
  1190. )
  1191. return txn.execute_batch(sql, args)
  1192. @overload
  1193. async def simple_select_one(
  1194. self,
  1195. table: str,
  1196. keyvalues: Dict[str, Any],
  1197. retcols: Collection[str],
  1198. allow_none: Literal[False] = False,
  1199. desc: str = "simple_select_one",
  1200. ) -> Dict[str, Any]:
  1201. ...
  1202. @overload
  1203. async def simple_select_one(
  1204. self,
  1205. table: str,
  1206. keyvalues: Dict[str, Any],
  1207. retcols: Collection[str],
  1208. allow_none: Literal[True] = True,
  1209. desc: str = "simple_select_one",
  1210. ) -> Optional[Dict[str, Any]]:
  1211. ...
  1212. async def simple_select_one(
  1213. self,
  1214. table: str,
  1215. keyvalues: Dict[str, Any],
  1216. retcols: Collection[str],
  1217. allow_none: bool = False,
  1218. desc: str = "simple_select_one",
  1219. ) -> Optional[Dict[str, Any]]:
  1220. """Executes a SELECT query on the named table, which is expected to
  1221. return a single row, returning multiple columns from it.
  1222. Args:
  1223. table: string giving the table name
  1224. keyvalues: dict of column names and values to select the row with
  1225. retcols: list of strings giving the names of the columns to return
  1226. allow_none: If true, return None instead of failing if the SELECT
  1227. statement returns no rows
  1228. desc: description of the transaction, for logging and metrics
  1229. """
  1230. return await self.runInteraction(
  1231. desc,
  1232. self.simple_select_one_txn,
  1233. table,
  1234. keyvalues,
  1235. retcols,
  1236. allow_none,
  1237. db_autocommit=True,
  1238. )
  1239. @overload
  1240. async def simple_select_one_onecol(
  1241. self,
  1242. table: str,
  1243. keyvalues: Dict[str, Any],
  1244. retcol: str,
  1245. allow_none: Literal[False] = False,
  1246. desc: str = "simple_select_one_onecol",
  1247. ) -> Any:
  1248. ...
  1249. @overload
  1250. async def simple_select_one_onecol(
  1251. self,
  1252. table: str,
  1253. keyvalues: Dict[str, Any],
  1254. retcol: str,
  1255. allow_none: Literal[True] = True,
  1256. desc: str = "simple_select_one_onecol",
  1257. ) -> Optional[Any]:
  1258. ...
  1259. async def simple_select_one_onecol(
  1260. self,
  1261. table: str,
  1262. keyvalues: Dict[str, Any],
  1263. retcol: str,
  1264. allow_none: bool = False,
  1265. desc: str = "simple_select_one_onecol",
  1266. ) -> Optional[Any]:
  1267. """Executes a SELECT query on the named table, which is expected to
  1268. return a single row, returning a single column from it.
  1269. Args:
  1270. table: string giving the table name
  1271. keyvalues: dict of column names and values to select the row with
  1272. retcol: string giving the name of the column to return
  1273. allow_none: If true, return None instead of failing if the SELECT
  1274. statement returns no rows
  1275. desc: description of the transaction, for logging and metrics
  1276. """
  1277. return await self.runInteraction(
  1278. desc,
  1279. self.simple_select_one_onecol_txn,
  1280. table,
  1281. keyvalues,
  1282. retcol,
  1283. allow_none=allow_none,
  1284. db_autocommit=True,
  1285. )
  1286. @overload
  1287. @classmethod
  1288. def simple_select_one_onecol_txn(
  1289. cls,
  1290. txn: LoggingTransaction,
  1291. table: str,
  1292. keyvalues: Dict[str, Any],
  1293. retcol: str,
  1294. allow_none: Literal[False] = False,
  1295. ) -> Any:
  1296. ...
  1297. @overload
  1298. @classmethod
  1299. def simple_select_one_onecol_txn(
  1300. cls,
  1301. txn: LoggingTransaction,
  1302. table: str,
  1303. keyvalues: Dict[str, Any],
  1304. retcol: str,
  1305. allow_none: Literal[True] = True,
  1306. ) -> Optional[Any]:
  1307. ...
  1308. @classmethod
  1309. def simple_select_one_onecol_txn(
  1310. cls,
  1311. txn: LoggingTransaction,
  1312. table: str,
  1313. keyvalues: Dict[str, Any],
  1314. retcol: str,
  1315. allow_none: bool = False,
  1316. ) -> Optional[Any]:
  1317. ret = cls.simple_select_onecol_txn(
  1318. txn, table=table, keyvalues=keyvalues, retcol=retcol
  1319. )
  1320. if ret:
  1321. return ret[0]
  1322. else:
  1323. if allow_none:
  1324. return None
  1325. else:
  1326. raise StoreError(404, "No row found")
  1327. @staticmethod
  1328. def simple_select_onecol_txn(
  1329. txn: LoggingTransaction,
  1330. table: str,
  1331. keyvalues: Dict[str, Any],
  1332. retcol: str,
  1333. ) -> List[Any]:
  1334. sql = ("SELECT %(retcol)s FROM %(table)s") % {"retcol": retcol, "table": table}
  1335. if keyvalues:
  1336. sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
  1337. txn.execute(sql, list(keyvalues.values()))
  1338. else:
  1339. txn.execute(sql)
  1340. return [r[0] for r in txn]
  1341. async def simple_select_onecol(
  1342. self,
  1343. table: str,
  1344. keyvalues: Optional[Dict[str, Any]],
  1345. retcol: str,
  1346. desc: str = "simple_select_onecol",
  1347. ) -> List[Any]:
  1348. """Executes a SELECT query on the named table, which returns a list
  1349. comprising of the values of the named column from the selected rows.
  1350. Args:
  1351. table: table name
  1352. keyvalues: column names and values to select the rows with
  1353. retcol: column whos value we wish to retrieve.
  1354. desc: description of the transaction, for logging and metrics
  1355. Returns:
  1356. Results in a list
  1357. """
  1358. return await self.runInteraction(
  1359. desc,
  1360. self.simple_select_onecol_txn,
  1361. table,
  1362. keyvalues,
  1363. retcol,
  1364. db_autocommit=True,
  1365. )
  1366. async def simple_select_list(
  1367. self,
  1368. table: str,
  1369. keyvalues: Optional[Dict[str, Any]],
  1370. retcols: Collection[str],
  1371. desc: str = "simple_select_list",
  1372. ) -> List[Dict[str, Any]]:
  1373. """Executes a SELECT query on the named table, which may return zero or
  1374. more rows, returning the result as a list of dicts.
  1375. Args:
  1376. table: the table name
  1377. keyvalues:
  1378. column names and values to select the rows with, or None to not
  1379. apply a WHERE clause.
  1380. retcols: the names of the columns to return
  1381. desc: description of the transaction, for logging and metrics
  1382. Returns:
  1383. A list of dictionaries.
  1384. """
  1385. return await self.runInteraction(
  1386. desc,
  1387. self.simple_select_list_txn,
  1388. table,
  1389. keyvalues,
  1390. retcols,
  1391. db_autocommit=True,
  1392. )
  1393. @classmethod
  1394. def simple_select_list_txn(
  1395. cls,
  1396. txn: LoggingTransaction,
  1397. table: str,
  1398. keyvalues: Optional[Dict[str, Any]],
  1399. retcols: Iterable[str],
  1400. ) -> List[Dict[str, Any]]:
  1401. """Executes a SELECT query on the named table, which may return zero or
  1402. more rows, returning the result as a list of dicts.
  1403. Args:
  1404. txn: Transaction object
  1405. table: the table name
  1406. keyvalues:
  1407. column names and values to select the rows with, or None to not
  1408. apply a WHERE clause.
  1409. retcols: the names of the columns to return
  1410. """
  1411. if keyvalues:
  1412. sql = "SELECT %s FROM %s WHERE %s" % (
  1413. ", ".join(retcols),
  1414. table,
  1415. " AND ".join("%s = ?" % (k,) for k in keyvalues),
  1416. )
  1417. txn.execute(sql, list(keyvalues.values()))
  1418. else:
  1419. sql = "SELECT %s FROM %s" % (", ".join(retcols), table)
  1420. txn.execute(sql)
  1421. return cls.cursor_to_dict(txn)
  1422. async def simple_select_many_batch(
  1423. self,
  1424. table: str,
  1425. column: str,
  1426. iterable: Iterable[Any],
  1427. retcols: Collection[str],
  1428. keyvalues: Optional[Dict[str, Any]] = None,
  1429. desc: str = "simple_select_many_batch",
  1430. batch_size: int = 100,
  1431. ) -> List[Any]:
  1432. """Executes a SELECT query on the named table, which may return zero or
  1433. more rows, returning the result as a list of dicts.
  1434. Filters rows by whether the value of `column` is in `iterable`.
  1435. Args:
  1436. table: string giving the table name
  1437. column: column name to test for inclusion against `iterable`
  1438. iterable: list
  1439. retcols: list of strings giving the names of the columns to return
  1440. keyvalues: dict of column names and values to select the rows with
  1441. desc: description of the transaction, for logging and metrics
  1442. batch_size: the number of rows for each select query
  1443. """
  1444. keyvalues = keyvalues or {}
  1445. results: List[Dict[str, Any]] = []
  1446. for chunk in batch_iter(iterable, batch_size):
  1447. rows = await self.runInteraction(
  1448. desc,
  1449. self.simple_select_many_txn,
  1450. table,
  1451. column,
  1452. chunk,
  1453. keyvalues,
  1454. retcols,
  1455. db_autocommit=True,
  1456. )
  1457. results.extend(rows)
  1458. return results
  1459. @classmethod
  1460. def simple_select_many_txn(
  1461. cls,
  1462. txn: LoggingTransaction,
  1463. table: str,
  1464. column: str,
  1465. iterable: Collection[Any],
  1466. keyvalues: Dict[str, Any],
  1467. retcols: Iterable[str],
  1468. ) -> List[Dict[str, Any]]:
  1469. """Executes a SELECT query on the named table, which may return zero or
  1470. more rows, returning the result as a list of dicts.
  1471. Filters rows by whether the value of `column` is in `iterable`.
  1472. Args:
  1473. txn: Transaction object
  1474. table: string giving the table name
  1475. column: column name to test for inclusion against `iterable`
  1476. iterable: list
  1477. keyvalues: dict of column names and values to select the rows with
  1478. retcols: list of strings giving the names of the columns to return
  1479. """
  1480. if not iterable:
  1481. return []
  1482. clause, values = make_in_list_sql_clause(txn.database_engine, column, iterable)
  1483. clauses = [clause]
  1484. for key, value in keyvalues.items():
  1485. clauses.append("%s = ?" % (key,))
  1486. values.append(value)
  1487. sql = "SELECT %s FROM %s WHERE %s" % (
  1488. ", ".join(retcols),
  1489. table,
  1490. " AND ".join(clauses),
  1491. )
  1492. txn.execute(sql, values)
  1493. return cls.cursor_to_dict(txn)
  1494. async def simple_update(
  1495. self,
  1496. table: str,
  1497. keyvalues: Dict[str, Any],
  1498. updatevalues: Dict[str, Any],
  1499. desc: str,
  1500. ) -> int:
  1501. return await self.runInteraction(
  1502. desc, self.simple_update_txn, table, keyvalues, updatevalues
  1503. )
  1504. @staticmethod
  1505. def simple_update_txn(
  1506. txn: LoggingTransaction,
  1507. table: str,
  1508. keyvalues: Dict[str, Any],
  1509. updatevalues: Dict[str, Any],
  1510. ) -> int:
  1511. if keyvalues:
  1512. where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
  1513. else:
  1514. where = ""
  1515. update_sql = "UPDATE %s SET %s %s" % (
  1516. table,
  1517. ", ".join("%s = ?" % (k,) for k in updatevalues),
  1518. where,
  1519. )
  1520. txn.execute(update_sql, list(updatevalues.values()) + list(keyvalues.values()))
  1521. return txn.rowcount
  1522. async def simple_update_one(
  1523. self,
  1524. table: str,
  1525. keyvalues: Dict[str, Any],
  1526. updatevalues: Dict[str, Any],
  1527. desc: str = "simple_update_one",
  1528. ) -> None:
  1529. """Executes an UPDATE query on the named table, setting new values for
  1530. columns in a row matching the key values.
  1531. Args:
  1532. table: string giving the table name
  1533. keyvalues: dict of column names and values to select the row with
  1534. updatevalues: dict giving column names and values to update
  1535. desc: description of the transaction, for logging and metrics
  1536. """
  1537. await self.runInteraction(
  1538. desc,
  1539. self.simple_update_one_txn,
  1540. table,
  1541. keyvalues,
  1542. updatevalues,
  1543. db_autocommit=True,
  1544. )
  1545. @classmethod
  1546. def simple_update_one_txn(
  1547. cls,
  1548. txn: LoggingTransaction,
  1549. table: str,
  1550. keyvalues: Dict[str, Any],
  1551. updatevalues: Dict[str, Any],
  1552. ) -> None:
  1553. rowcount = cls.simple_update_txn(txn, table, keyvalues, updatevalues)
  1554. if rowcount == 0:
  1555. raise StoreError(404, "No row found (%s)" % (table,))
  1556. if rowcount > 1:
  1557. raise StoreError(500, "More than one row matched (%s)" % (table,))
  1558. # Ideally we could use the overload decorator here to specify that the
  1559. # return type is only optional if allow_none is True, but this does not work
  1560. # when you call a static method from an instance.
  1561. # See https://github.com/python/mypy/issues/7781
  1562. @staticmethod
  1563. def simple_select_one_txn(
  1564. txn: LoggingTransaction,
  1565. table: str,
  1566. keyvalues: Dict[str, Any],
  1567. retcols: Collection[str],
  1568. allow_none: bool = False,
  1569. ) -> Optional[Dict[str, Any]]:
  1570. select_sql = "SELECT %s FROM %s WHERE %s" % (
  1571. ", ".join(retcols),
  1572. table,
  1573. " AND ".join("%s = ?" % (k,) for k in keyvalues),
  1574. )
  1575. txn.execute(select_sql, list(keyvalues.values()))
  1576. row = txn.fetchone()
  1577. if not row:
  1578. if allow_none:
  1579. return None
  1580. raise StoreError(404, "No row found (%s)" % (table,))
  1581. if txn.rowcount > 1:
  1582. raise StoreError(500, "More than one row matched (%s)" % (table,))
  1583. return dict(zip(retcols, row))
  1584. async def simple_delete_one(
  1585. self, table: str, keyvalues: Dict[str, Any], desc: str = "simple_delete_one"
  1586. ) -> None:
  1587. """Executes a DELETE query on the named table, expecting to delete a
  1588. single row.
  1589. Args:
  1590. table: string giving the table name
  1591. keyvalues: dict of column names and values to select the row with
  1592. desc: description of the transaction, for logging and metrics
  1593. """
  1594. await self.runInteraction(
  1595. desc,
  1596. self.simple_delete_one_txn,
  1597. table,
  1598. keyvalues,
  1599. db_autocommit=True,
  1600. )
  1601. @staticmethod
  1602. def simple_delete_one_txn(
  1603. txn: LoggingTransaction, table: str, keyvalues: Dict[str, Any]
  1604. ) -> None:
  1605. """Executes a DELETE query on the named table, expecting to delete a
  1606. single row.
  1607. Args:
  1608. table: string giving the table name
  1609. keyvalues: dict of column names and values to select the row with
  1610. """
  1611. sql = "DELETE FROM %s WHERE %s" % (
  1612. table,
  1613. " AND ".join("%s = ?" % (k,) for k in keyvalues),
  1614. )
  1615. txn.execute(sql, list(keyvalues.values()))
  1616. if txn.rowcount == 0:
  1617. raise StoreError(404, "No row found (%s)" % (table,))
  1618. if txn.rowcount > 1:
  1619. raise StoreError(500, "More than one row matched (%s)" % (table,))
  1620. async def simple_delete(
  1621. self, table: str, keyvalues: Dict[str, Any], desc: str
  1622. ) -> int:
  1623. """Executes a DELETE query on the named table.
  1624. Filters rows by the key-value pairs.
  1625. Args:
  1626. table: string giving the table name
  1627. keyvalues: dict of column names and values to select the row with
  1628. desc: description of the transaction, for logging and metrics
  1629. Returns:
  1630. The number of deleted rows.
  1631. """
  1632. return await self.runInteraction(
  1633. desc, self.simple_delete_txn, table, keyvalues, db_autocommit=True
  1634. )
  1635. @staticmethod
  1636. def simple_delete_txn(
  1637. txn: LoggingTransaction, table: str, keyvalues: Dict[str, Any]
  1638. ) -> int:
  1639. """Executes a DELETE query on the named table.
  1640. Filters rows by the key-value pairs.
  1641. Args:
  1642. table: string giving the table name
  1643. keyvalues: dict of column names and values to select the row with
  1644. Returns:
  1645. The number of deleted rows.
  1646. """
  1647. sql = "DELETE FROM %s WHERE %s" % (
  1648. table,
  1649. " AND ".join("%s = ?" % (k,) for k in keyvalues),
  1650. )
  1651. txn.execute(sql, list(keyvalues.values()))
  1652. return txn.rowcount
  1653. async def simple_delete_many(
  1654. self,
  1655. table: str,
  1656. column: str,
  1657. iterable: Collection[Any],
  1658. keyvalues: Dict[str, Any],
  1659. desc: str,
  1660. ) -> int:
  1661. """Executes a DELETE query on the named table.
  1662. Filters rows by if value of `column` is in `iterable`.
  1663. Args:
  1664. table: string giving the table name
  1665. column: column name to test for inclusion against `iterable`
  1666. iterable: list of values to match against `column`. NB cannot be a generator
  1667. as it may be evaluated multiple times.
  1668. keyvalues: dict of column names and values to select the rows with
  1669. desc: description of the transaction, for logging and metrics
  1670. Returns:
  1671. Number rows deleted
  1672. """
  1673. return await self.runInteraction(
  1674. desc,
  1675. self.simple_delete_many_txn,
  1676. table,
  1677. column,
  1678. iterable,
  1679. keyvalues,
  1680. db_autocommit=True,
  1681. )
  1682. @staticmethod
  1683. def simple_delete_many_txn(
  1684. txn: LoggingTransaction,
  1685. table: str,
  1686. column: str,
  1687. values: Collection[Any],
  1688. keyvalues: Dict[str, Any],
  1689. ) -> int:
  1690. """Executes a DELETE query on the named table.
  1691. Deletes the rows:
  1692. - whose value of `column` is in `values`; AND
  1693. - that match extra column-value pairs specified in `keyvalues`.
  1694. Args:
  1695. txn: Transaction object
  1696. table: string giving the table name
  1697. column: column name to test for inclusion against `values`
  1698. values: values of `column` which choose rows to delete
  1699. keyvalues: dict of extra column names and values to select the rows
  1700. with. They will be ANDed together with the main predicate.
  1701. Returns:
  1702. Number rows deleted
  1703. """
  1704. if not values:
  1705. return 0
  1706. sql = "DELETE FROM %s" % table
  1707. clause, values = make_in_list_sql_clause(txn.database_engine, column, values)
  1708. clauses = [clause]
  1709. for key, value in keyvalues.items():
  1710. clauses.append("%s = ?" % (key,))
  1711. values.append(value)
  1712. if clauses:
  1713. sql = "%s WHERE %s" % (sql, " AND ".join(clauses))
  1714. txn.execute(sql, values)
  1715. return txn.rowcount
  1716. def get_cache_dict(
  1717. self,
  1718. db_conn: LoggingDatabaseConnection,
  1719. table: str,
  1720. entity_column: str,
  1721. stream_column: str,
  1722. max_value: int,
  1723. limit: int = 100000,
  1724. ) -> Tuple[Dict[Any, int], int]:
  1725. # Fetch a mapping of room_id -> max stream position for "recent" rooms.
  1726. # It doesn't really matter how many we get, the StreamChangeCache will
  1727. # do the right thing to ensure it respects the max size of cache.
  1728. sql = (
  1729. "SELECT %(entity)s, MAX(%(stream)s) FROM %(table)s"
  1730. " WHERE %(stream)s > ? - %(limit)s"
  1731. " GROUP BY %(entity)s"
  1732. ) % {
  1733. "table": table,
  1734. "entity": entity_column,
  1735. "stream": stream_column,
  1736. "limit": limit,
  1737. }
  1738. txn = db_conn.cursor(txn_name="get_cache_dict")
  1739. txn.execute(sql, (int(max_value),))
  1740. cache = {row[0]: int(row[1]) for row in txn}
  1741. txn.close()
  1742. if cache:
  1743. min_val = min(cache.values())
  1744. else:
  1745. min_val = max_value
  1746. return cache, min_val
  1747. @classmethod
  1748. def simple_select_list_paginate_txn(
  1749. cls,
  1750. txn: LoggingTransaction,
  1751. table: str,
  1752. orderby: str,
  1753. start: int,
  1754. limit: int,
  1755. retcols: Iterable[str],
  1756. filters: Optional[Dict[str, Any]] = None,
  1757. keyvalues: Optional[Dict[str, Any]] = None,
  1758. exclude_keyvalues: Optional[Dict[str, Any]] = None,
  1759. order_direction: str = "ASC",
  1760. ) -> List[Dict[str, Any]]:
  1761. """
  1762. Executes a SELECT query on the named table with start and limit,
  1763. of row numbers, which may return zero or number of rows from start to limit,
  1764. returning the result as a list of dicts.
  1765. Use `filters` to search attributes using SQL wildcards and/or `keyvalues` to
  1766. select attributes with exact matches. All constraints are joined together
  1767. using 'AND'.
  1768. Args:
  1769. txn: Transaction object
  1770. table: the table name
  1771. orderby: Column to order the results by.
  1772. start: Index to begin the query at.
  1773. limit: Number of results to return.
  1774. retcols: the names of the columns to return
  1775. filters:
  1776. column names and values to filter the rows with, or None to not
  1777. apply a WHERE ? LIKE ? clause.
  1778. keyvalues:
  1779. column names and values to select the rows with, or None to not
  1780. apply a WHERE key = value clause.
  1781. exclude_keyvalues:
  1782. column names and values to exclude rows with, or None to not
  1783. apply a WHERE key != value clause.
  1784. order_direction: Whether the results should be ordered "ASC" or "DESC".
  1785. Returns:
  1786. The result as a list of dictionaries.
  1787. """
  1788. if order_direction not in ["ASC", "DESC"]:
  1789. raise ValueError("order_direction must be one of 'ASC' or 'DESC'.")
  1790. where_clause = "WHERE " if filters or keyvalues or exclude_keyvalues else ""
  1791. arg_list: List[Any] = []
  1792. if filters:
  1793. where_clause += " AND ".join("%s LIKE ?" % (k,) for k in filters)
  1794. arg_list += list(filters.values())
  1795. where_clause += " AND " if filters and keyvalues else ""
  1796. if keyvalues:
  1797. where_clause += " AND ".join("%s = ?" % (k,) for k in keyvalues)
  1798. arg_list += list(keyvalues.values())
  1799. if exclude_keyvalues:
  1800. where_clause += " AND ".join("%s != ?" % (k,) for k in exclude_keyvalues)
  1801. arg_list += list(exclude_keyvalues.values())
  1802. sql = "SELECT %s FROM %s %s ORDER BY %s %s LIMIT ? OFFSET ?" % (
  1803. ", ".join(retcols),
  1804. table,
  1805. where_clause,
  1806. orderby,
  1807. order_direction,
  1808. )
  1809. txn.execute(sql, arg_list + [limit, start])
  1810. return cls.cursor_to_dict(txn)
  1811. async def simple_search_list(
  1812. self,
  1813. table: str,
  1814. term: Optional[str],
  1815. col: str,
  1816. retcols: Collection[str],
  1817. desc="simple_search_list",
  1818. ) -> Optional[List[Dict[str, Any]]]:
  1819. """Executes a SELECT query on the named table, which may return zero or
  1820. more rows, returning the result as a list of dicts.
  1821. Args:
  1822. table: the table name
  1823. term: term for searching the table matched to a column.
  1824. col: column to query term should be matched to
  1825. retcols: the names of the columns to return
  1826. Returns:
  1827. A list of dictionaries or None.
  1828. """
  1829. return await self.runInteraction(
  1830. desc,
  1831. self.simple_search_list_txn,
  1832. table,
  1833. term,
  1834. col,
  1835. retcols,
  1836. db_autocommit=True,
  1837. )
  1838. @classmethod
  1839. def simple_search_list_txn(
  1840. cls,
  1841. txn: LoggingTransaction,
  1842. table: str,
  1843. term: Optional[str],
  1844. col: str,
  1845. retcols: Iterable[str],
  1846. ) -> Optional[List[Dict[str, Any]]]:
  1847. """Executes a SELECT query on the named table, which may return zero or
  1848. more rows, returning the result as a list of dicts.
  1849. Args:
  1850. txn: Transaction object
  1851. table: the table name
  1852. term: term for searching the table matched to a column.
  1853. col: column to query term should be matched to
  1854. retcols: the names of the columns to return
  1855. Returns:
  1856. None if no term is given, otherwise a list of dictionaries.
  1857. """
  1858. if term:
  1859. sql = "SELECT %s FROM %s WHERE %s LIKE ?" % (", ".join(retcols), table, col)
  1860. termvalues = ["%%" + term + "%%"]
  1861. txn.execute(sql, termvalues)
  1862. else:
  1863. return None
  1864. return cls.cursor_to_dict(txn)
  1865. def make_in_list_sql_clause(
  1866. database_engine: BaseDatabaseEngine, column: str, iterable: Collection[Any]
  1867. ) -> Tuple[str, list]:
  1868. """Returns an SQL clause that checks the given column is in the iterable.
  1869. On SQLite this expands to `column IN (?, ?, ...)`, whereas on Postgres
  1870. it expands to `column = ANY(?)`. While both DBs support the `IN` form,
  1871. using the `ANY` form on postgres means that it views queries with
  1872. different length iterables as the same, helping the query stats.
  1873. Args:
  1874. database_engine
  1875. column: Name of the column
  1876. iterable: The values to check the column against.
  1877. Returns:
  1878. A tuple of SQL query and the args
  1879. """
  1880. if database_engine.supports_using_any_list:
  1881. # This should hopefully be faster, but also makes postgres query
  1882. # stats easier to understand.
  1883. return "%s = ANY(?)" % (column,), [list(iterable)]
  1884. else:
  1885. return "%s IN (%s)" % (column, ",".join("?" for _ in iterable)), list(iterable)
  1886. KV = TypeVar("KV")
  1887. def make_tuple_comparison_clause(keys: List[Tuple[str, KV]]) -> Tuple[str, List[KV]]:
  1888. """Returns a tuple comparison SQL clause
  1889. Builds a SQL clause that looks like "(a, b) > (?, ?)"
  1890. Args:
  1891. keys: A set of (column, value) pairs to be compared.
  1892. Returns:
  1893. A tuple of SQL query and the args
  1894. """
  1895. return (
  1896. "(%s) > (%s)" % (",".join(k[0] for k in keys), ",".join("?" for _ in keys)),
  1897. [k[1] for k in keys],
  1898. )