|
@@ -310,11 +310,11 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
|
|
|
event_push_actions_done = progress.get("event_push_actions_done", False)
|
|
|
|
|
|
def add_thread_id_txn(
|
|
|
- txn: LoggingTransaction, table_name: str, start_stream_ordering: int
|
|
|
+ txn: LoggingTransaction, start_stream_ordering: int
|
|
|
) -> int:
|
|
|
- sql = f"""
|
|
|
+ sql = """
|
|
|
SELECT stream_ordering
|
|
|
- FROM {table_name}
|
|
|
+ FROM event_push_actions
|
|
|
WHERE
|
|
|
thread_id IS NULL
|
|
|
AND stream_ordering > ?
|
|
@@ -326,7 +326,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
|
|
|
# No more rows to process.
|
|
|
rows = txn.fetchall()
|
|
|
if not rows:
|
|
|
- progress[f"{table_name}_done"] = True
|
|
|
+ progress["event_push_actions_done"] = True
|
|
|
self.db_pool.updates._background_update_progress_txn(
|
|
|
txn, "event_push_backfill_thread_id", progress
|
|
|
)
|
|
@@ -335,16 +335,65 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
|
|
|
# Update the thread ID for any of those rows.
|
|
|
max_stream_ordering = rows[-1][0]
|
|
|
|
|
|
- sql = f"""
|
|
|
- UPDATE {table_name}
|
|
|
+ sql = """
|
|
|
+ UPDATE event_push_actions
|
|
|
SET thread_id = 'main'
|
|
|
- WHERE stream_ordering <= ? AND thread_id IS NULL
|
|
|
+ WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL
|
|
|
"""
|
|
|
- txn.execute(sql, (max_stream_ordering,))
|
|
|
+ txn.execute(
|
|
|
+ sql,
|
|
|
+ (
|
|
|
+ start_stream_ordering,
|
|
|
+ max_stream_ordering,
|
|
|
+ ),
|
|
|
+ )
|
|
|
|
|
|
# Update progress.
|
|
|
processed_rows = txn.rowcount
|
|
|
- progress[f"max_{table_name}_stream_ordering"] = max_stream_ordering
|
|
|
+ progress["max_event_push_actions_stream_ordering"] = max_stream_ordering
|
|
|
+ self.db_pool.updates._background_update_progress_txn(
|
|
|
+ txn, "event_push_backfill_thread_id", progress
|
|
|
+ )
|
|
|
+
|
|
|
+ return processed_rows
|
|
|
+
|
|
|
+ def add_thread_id_summary_txn(txn: LoggingTransaction) -> int:
|
|
|
+ min_user_id = progress.get("max_summary_user_id", "")
|
|
|
+ min_room_id = progress.get("max_summary_room_id", "")
|
|
|
+
|
|
|
+ # Slightly overcomplicated query for getting the Nth user ID / room
|
|
|
+ # ID tuple, or the last if there are less than N remaining.
|
|
|
+ sql = """
|
|
|
+ SELECT user_id, room_id FROM (
|
|
|
+ SELECT user_id, room_id FROM event_push_summary
|
|
|
+ WHERE (user_id, room_id) > (?, ?)
|
|
|
+ AND thread_id IS NULL
|
|
|
+ ORDER BY user_id, room_id
|
|
|
+ LIMIT ?
|
|
|
+ ) AS e
|
|
|
+ ORDER BY user_id DESC, room_id DESC
|
|
|
+ LIMIT 1
|
|
|
+ """
|
|
|
+
|
|
|
+ txn.execute(sql, (min_user_id, min_room_id, batch_size))
|
|
|
+ row = txn.fetchone()
|
|
|
+ if not row:
|
|
|
+ return 0
|
|
|
+
|
|
|
+ max_user_id, max_room_id = row
|
|
|
+
|
|
|
+ sql = """
|
|
|
+ UPDATE event_push_summary
|
|
|
+ SET thread_id = 'main'
|
|
|
+ WHERE
|
|
|
+ (?, ?) < (user_id, room_id) AND (user_id, room_id) <= (?, ?)
|
|
|
+ AND thread_id IS NULL
|
|
|
+ """
|
|
|
+ txn.execute(sql, (min_user_id, min_room_id, max_user_id, max_room_id))
|
|
|
+ processed_rows = txn.rowcount
|
|
|
+
|
|
|
+ progress["max_summary_user_id"] = max_user_id
|
|
|
+ progress["max_summary_room_id"] = max_room_id
|
|
|
self.db_pool.updates._background_update_progress_txn(
|
|
|
txn, "event_push_backfill_thread_id", progress
|
|
|
)
|
|
@@ -360,15 +409,12 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
|
|
|
result = await self.db_pool.runInteraction(
|
|
|
"event_push_backfill_thread_id",
|
|
|
add_thread_id_txn,
|
|
|
- "event_push_actions",
|
|
|
progress.get("max_event_push_actions_stream_ordering", 0),
|
|
|
)
|
|
|
else:
|
|
|
result = await self.db_pool.runInteraction(
|
|
|
"event_push_backfill_thread_id",
|
|
|
- add_thread_id_txn,
|
|
|
- "event_push_summary",
|
|
|
- progress.get("max_event_push_summary_stream_ordering", 0),
|
|
|
+ add_thread_id_summary_txn,
|
|
|
)
|
|
|
|
|
|
# Only done after the event_push_summary table is done.
|