psycopg2.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. # Copyright 2015, 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. from typing import Any, Mapping, Optional, NoReturn
  16. import psycopg2.extensions
  17. from synapse.storage.engines import PostgresEngine
  18. from synapse.storage.engines._base import IsolationLevel
  19. logger = logging.getLogger(__name__)
  20. class Psycopg2Engine(
  21. PostgresEngine[psycopg2.extensions.connection, psycopg2.extensions.cursor, int]
  22. ):
  23. def __init__(self, database_config: Mapping[str, Any]):
  24. super().__init__(psycopg2, database_config)
  25. psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
  26. # Disables passing `bytes` to txn.execute, c.f.
  27. # https://github.com/matrix-org/synapse/issues/6186. If you do
  28. # actually want to use bytes than wrap it in `bytearray`.
  29. def _disable_bytes_adapter(_: bytes) -> NoReturn:
  30. raise Exception("Passing bytes to DB is disabled.")
  31. psycopg2.extensions.register_adapter(bytes, _disable_bytes_adapter)
  32. self.isolation_level_map = {
  33. IsolationLevel.READ_COMMITTED: psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED,
  34. IsolationLevel.REPEATABLE_READ: psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ,
  35. IsolationLevel.SERIALIZABLE: psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE,
  36. }
  37. self.default_isolation_level = (
  38. psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ
  39. )
  40. self.config = database_config
  41. def get_server_version(self, db_conn: psycopg2.extensions.connection) -> int:
  42. return db_conn.server_version
  43. def convert_param_style(self, sql: str) -> str:
  44. return sql.replace("?", "%s")
  45. def is_deadlock(self, error: Exception) -> bool:
  46. if isinstance(error, psycopg2.DatabaseError):
  47. # https://www.postgresql.org/docs/current/static/errcodes-appendix.html
  48. # "40001" serialization_failure
  49. # "40P01" deadlock_detected
  50. return error.pgcode in ["40001", "40P01"]
  51. return False
  52. def in_transaction(self, conn: psycopg2.extensions.connection) -> bool:
  53. return conn.status != psycopg2.extensions.STATUS_READY
  54. def attempt_to_set_autocommit(
  55. self, conn: psycopg2.extensions.connection, autocommit: bool
  56. ) -> None:
  57. return conn.set_session(autocommit=autocommit)
  58. def attempt_to_set_isolation_level(
  59. self, conn: psycopg2.extensions.connection, isolation_level: Optional[int]
  60. ) -> None:
  61. if isolation_level is None:
  62. isolation_level = self.default_isolation_level
  63. else:
  64. isolation_level = self.isolation_level_map[isolation_level]
  65. return conn.set_isolation_level(isolation_level)
  66. @staticmethod
  67. def executescript(cursor: psycopg2.extensions.cursor, script: str) -> None:
  68. """Execute a chunk of SQL containing multiple semicolon-delimited statements.
  69. Psycopg2 seems happy to do this in DBAPI2's `execute()` function.
  70. For consistency with SQLite, any ongoing transaction is committed before
  71. executing the script in its own transaction. The script transaction is
  72. left open and it is the responsibility of the caller to commit it.
  73. """
  74. cursor.execute(f"COMMIT; BEGIN TRANSACTION; {script}")