psycopg.py 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. # Copyright 2022-2023 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. from typing import Any, Mapping, Optional, Tuple
  16. import psycopg
  17. import psycopg.errors
  18. import psycopg.sql
  19. from twisted.enterprise.adbapi import Connection as TxConnection
  20. from synapse.storage.engines import PostgresEngine
  21. from synapse.storage.engines._base import IsolationLevel
  22. logger = logging.getLogger(__name__)
  23. class PsycopgEngine(
  24. # mypy doesn't seem to like that the psycopg Connection and Cursor are Generics.
  25. PostgresEngine[ # type: ignore[type-var]
  26. psycopg.Connection[Tuple], psycopg.Cursor[Tuple], psycopg.IsolationLevel
  27. ]
  28. ):
  29. def __init__(self, database_config: Mapping[str, Any]):
  30. super().__init__(psycopg, database_config) # type: ignore[arg-type]
  31. # psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
  32. # Disables passing `bytes` to txn.execute, c.f. #6186. If you do
  33. # actually want to use bytes than wrap it in `bytearray`.
  34. # def _disable_bytes_adapter(_: bytes) -> NoReturn:
  35. # raise Exception("Passing bytes to DB is disabled.")
  36. self.isolation_level_map = {
  37. IsolationLevel.READ_COMMITTED: psycopg.IsolationLevel.READ_COMMITTED,
  38. IsolationLevel.REPEATABLE_READ: psycopg.IsolationLevel.REPEATABLE_READ,
  39. IsolationLevel.SERIALIZABLE: psycopg.IsolationLevel.SERIALIZABLE,
  40. }
  41. self.default_isolation_level = psycopg.IsolationLevel.REPEATABLE_READ
  42. def get_server_version(self, db_conn: psycopg.Connection) -> int:
  43. return db_conn.info.server_version
  44. def convert_param_style(self, sql: str) -> str:
  45. # if isinstance(sql, psycopg.sql.Composed):
  46. # return sql
  47. return sql.replace("?", "%s")
  48. def is_deadlock(self, error: Exception) -> bool:
  49. if isinstance(error, psycopg.errors.Error):
  50. # https://www.postgresql.org/docs/current/static/errcodes-appendix.html
  51. # "40001" serialization_failure
  52. # "40P01" deadlock_detected
  53. return error.sqlstate in ["40001", "40P01"]
  54. return False
  55. def in_transaction(self, conn: psycopg.Connection) -> bool:
  56. return conn.info.transaction_status != psycopg.pq.TransactionStatus.IDLE
  57. def attempt_to_set_autocommit(
  58. self, conn: psycopg.Connection, autocommit: bool
  59. ) -> None:
  60. # Sometimes this gets called with a Twisted connection instead, unwrap
  61. # it because it doesn't support __setattr__.
  62. if isinstance(conn, TxConnection):
  63. conn = conn._connection
  64. conn.autocommit = autocommit
  65. def attempt_to_set_isolation_level(
  66. self, conn: psycopg.Connection, isolation_level: Optional[int]
  67. ) -> None:
  68. if isolation_level is None:
  69. pg_isolation_level = self.default_isolation_level
  70. else:
  71. pg_isolation_level = self.isolation_level_map[isolation_level]
  72. conn.isolation_level = pg_isolation_level