|
@@ -55,6 +55,7 @@ from synapse.storage.data_stores.main.stats import StatsStore
|
|
|
from synapse.storage.data_stores.main.user_directory import (
|
|
|
UserDirectoryBackgroundUpdateStore,
|
|
|
)
|
|
|
+from synapse.storage.database import Database
|
|
|
from synapse.storage.engines import create_engine
|
|
|
from synapse.storage.prepare_database import prepare_database
|
|
|
from synapse.util import Clock
|
|
@@ -139,39 +140,6 @@ class Store(
|
|
|
UserDirectoryBackgroundUpdateStore,
|
|
|
StatsStore,
|
|
|
):
|
|
|
- def __init__(self, db_conn, hs):
|
|
|
- super().__init__(db_conn, hs)
|
|
|
- self.db_pool = hs.get_db_pool()
|
|
|
-
|
|
|
- @defer.inlineCallbacks
|
|
|
- def runInteraction(self, desc, func, *args, **kwargs):
|
|
|
- def r(conn):
|
|
|
- try:
|
|
|
- i = 0
|
|
|
- N = 5
|
|
|
- while True:
|
|
|
- try:
|
|
|
- txn = conn.cursor()
|
|
|
- return func(
|
|
|
- LoggingTransaction(txn, desc, self.database_engine, [], []),
|
|
|
- *args,
|
|
|
- **kwargs
|
|
|
- )
|
|
|
- except self.database_engine.module.DatabaseError as e:
|
|
|
- if self.database_engine.is_deadlock(e):
|
|
|
- logger.warning("[TXN DEADLOCK] {%s} %d/%d", desc, i, N)
|
|
|
- if i < N:
|
|
|
- i += 1
|
|
|
- conn.rollback()
|
|
|
- continue
|
|
|
- raise
|
|
|
- except Exception as e:
|
|
|
- logger.debug("[TXN FAIL] {%s} %s", desc, e)
|
|
|
- raise
|
|
|
-
|
|
|
- with PreserveLoggingContext():
|
|
|
- return (yield self.db_pool.runWithConnection(r))
|
|
|
-
|
|
|
def execute(self, f, *args, **kwargs):
|
|
|
return self.db.runInteraction(f.__name__, f, *args, **kwargs)
|
|
|
|
|
@@ -512,7 +480,7 @@ class Porter(object):
|
|
|
|
|
|
hs = MockHomeserver(self.hs_config, engine, conn, db_pool)
|
|
|
|
|
|
- store = Store(conn, hs)
|
|
|
+ store = Store(Database(hs), conn, hs)
|
|
|
|
|
|
yield store.db.runInteraction(
|
|
|
"%s_engine.check_database" % config["name"], engine.check_database,
|