|
@@ -32,7 +32,11 @@ from synapse.logging.opentracing import (
|
|
|
)
|
|
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
|
|
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
|
|
|
-from synapse.storage.database import Database, LoggingTransaction
|
|
|
+from synapse.storage.database import (
|
|
|
+ Database,
|
|
|
+ LoggingTransaction,
|
|
|
+ make_tuple_comparison_clause,
|
|
|
+)
|
|
|
from synapse.types import Collection, get_verify_key_from_cross_signing_key
|
|
|
from synapse.util.caches.descriptors import (
|
|
|
Cache,
|
|
@@ -49,6 +53,8 @@ DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = (
|
|
|
"drop_device_list_streams_non_unique_indexes"
|
|
|
)
|
|
|
|
|
|
+BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES = "remove_dup_outbound_pokes"
|
|
|
+
|
|
|
|
|
|
class DeviceWorkerStore(SQLBaseStore):
|
|
|
def get_device(self, user_id, device_id):
|
|
@@ -714,6 +720,11 @@ class DeviceBackgroundUpdateStore(SQLBaseStore):
|
|
|
self._drop_device_list_streams_non_unique_indexes,
|
|
|
)
|
|
|
|
|
|
+ # clear out duplicate device list outbound pokes
|
|
|
+ self.db.updates.register_background_update_handler(
|
|
|
+ BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES, self._remove_duplicate_outbound_pokes,
|
|
|
+ )
|
|
|
+
|
|
|
@defer.inlineCallbacks
|
|
|
def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size):
|
|
|
def f(conn):
|
|
@@ -728,6 +739,66 @@ class DeviceBackgroundUpdateStore(SQLBaseStore):
|
|
|
)
|
|
|
return 1
|
|
|
|
|
|
+ async def _remove_duplicate_outbound_pokes(self, progress, batch_size):
|
|
|
+ # for some reason, we have accumulated duplicate entries in
|
|
|
+ # device_lists_outbound_pokes, which makes prune_outbound_device_list_pokes less
|
|
|
+ # efficient.
|
|
|
+ #
|
|
|
+ # For each duplicate, we delete all the existing rows and put one back.
|
|
|
+
|
|
|
+ KEY_COLS = ["stream_id", "destination", "user_id", "device_id"]
|
|
|
+ last_row = progress.get(
|
|
|
+ "last_row",
|
|
|
+ {"stream_id": 0, "destination": "", "user_id": "", "device_id": ""},
|
|
|
+ )
|
|
|
+
|
|
|
+ def _txn(txn):
|
|
|
+ clause, args = make_tuple_comparison_clause(
|
|
|
+ self.db.engine, [(x, last_row[x]) for x in KEY_COLS]
|
|
|
+ )
|
|
|
+ sql = """
|
|
|
+ SELECT stream_id, destination, user_id, device_id, MAX(ts) AS ts
|
|
|
+ FROM device_lists_outbound_pokes
|
|
|
+ WHERE %s
|
|
|
+ GROUP BY %s
|
|
|
+ HAVING count(*) > 1
|
|
|
+ ORDER BY %s
|
|
|
+ LIMIT ?
|
|
|
+ """ % (
|
|
|
+ clause, # WHERE
|
|
|
+ ",".join(KEY_COLS), # GROUP BY
|
|
|
+ ",".join(KEY_COLS), # ORDER BY
|
|
|
+ )
|
|
|
+ txn.execute(sql, args + [batch_size])
|
|
|
+ rows = self.db.cursor_to_dict(txn)
|
|
|
+
|
|
|
+ row = None
|
|
|
+ for row in rows:
|
|
|
+ self.db.simple_delete_txn(
|
|
|
+ txn, "device_lists_outbound_pokes", {x: row[x] for x in KEY_COLS},
|
|
|
+ )
|
|
|
+
|
|
|
+ row["sent"] = False
|
|
|
+ self.db.simple_insert_txn(
|
|
|
+ txn, "device_lists_outbound_pokes", row,
|
|
|
+ )
|
|
|
+
|
|
|
+ if row:
|
|
|
+ self.db.updates._background_update_progress_txn(
|
|
|
+ txn, BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES, {"last_row": row},
|
|
|
+ )
|
|
|
+
|
|
|
+ return len(rows)
|
|
|
+
|
|
|
+ rows = await self.db.runInteraction(BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES, _txn)
|
|
|
+
|
|
|
+ if not rows:
|
|
|
+ await self.db.updates._end_background_update(
|
|
|
+ BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES
|
|
|
+ )
|
|
|
+
|
|
|
+ return rows
|
|
|
+
|
|
|
|
|
|
class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
|
|
def __init__(self, database: Database, db_conn, hs):
|