|
@@ -26,6 +26,7 @@ from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
|
|
|
from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
|
|
|
from synapse.storage.data_stores.main.signatures import SignatureWorkerStore
|
|
|
from synapse.storage.database import Database
|
|
|
+from synapse.storage.engines import PostgresEngine
|
|
|
from synapse.util.caches.descriptors import cached
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
@@ -61,6 +62,28 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
|
|
)
|
|
|
|
|
|
def _get_auth_chain_ids_txn(self, txn, event_ids, include_given):
|
|
|
+ if isinstance(self.database_engine, PostgresEngine):
|
|
|
+ # For efficiency we make the database do this if we can.
|
|
|
+ sql = """
|
|
|
+ WITH RECURSIVE auth_chain(event_id) AS (
|
|
|
+ SELECT auth_id FROM event_auth WHERE event_id = ANY(?)
|
|
|
+ UNION
|
|
|
+ SELECT auth_id FROM event_auth
|
|
|
+ INNER JOIN auth_chain USING (event_id)
|
|
|
+ )
|
|
|
+ SELECT event_id FROM auth_chain
|
|
|
+ """
|
|
|
+ txn.execute(sql, (list(event_ids),))
|
|
|
+
|
|
|
+ results = set(event_id for event_id, in txn)
|
|
|
+
|
|
|
+ if include_given:
|
|
|
+ results.update(event_ids)
|
|
|
+
|
|
|
+ return list(results)
|
|
|
+
|
|
|
+ # Database doesn't necessarily support recursive CTE, so we fall
|
|
|
+ # back to do doing it manually.
|
|
|
if include_given:
|
|
|
results = set(event_ids)
|
|
|
else:
|