|
@@ -39,6 +39,11 @@ class PusherWorkerStore(SQLBaseStore):
|
|
|
db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
|
|
|
)
|
|
|
|
|
|
+ self.db_pool.updates.register_background_update_handler(
|
|
|
+ "remove_deactivated_pushers",
|
|
|
+ self._remove_deactivated_pushers,
|
|
|
+ )
|
|
|
+
|
|
|
def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]:
|
|
|
"""JSON-decode the data in the rows returned from the `pushers` table
|
|
|
|
|
@@ -284,6 +289,54 @@ class PusherWorkerStore(SQLBaseStore):
|
|
|
lock=False,
|
|
|
)
|
|
|
|
|
|
+ async def _remove_deactivated_pushers(self, progress: dict, batch_size: int) -> int:
|
|
|
+ """A background update that deletes all pushers for deactivated users.
|
|
|
+
|
|
|
+ Note that we don't proacively tell the pusherpool that we've deleted
|
|
|
+ these (just because its a bit off a faff to do from here), but they will
|
|
|
+ get cleaned up at the next restart
|
|
|
+ """
|
|
|
+
|
|
|
+ last_user = progress.get("last_user", "")
|
|
|
+
|
|
|
+ def _delete_pushers(txn) -> int:
|
|
|
+
|
|
|
+ sql = """
|
|
|
+ SELECT name FROM users
|
|
|
+ WHERE deactivated = ? and name > ?
|
|
|
+ ORDER BY name ASC
|
|
|
+ LIMIT ?
|
|
|
+ """
|
|
|
+
|
|
|
+ txn.execute(sql, (1, last_user, batch_size))
|
|
|
+ users = [row[0] for row in txn]
|
|
|
+
|
|
|
+ self.db_pool.simple_delete_many_txn(
|
|
|
+ txn,
|
|
|
+ table="pushers",
|
|
|
+ column="user_name",
|
|
|
+ iterable=users,
|
|
|
+ keyvalues={},
|
|
|
+ )
|
|
|
+
|
|
|
+ if users:
|
|
|
+ self.db_pool.updates._background_update_progress_txn(
|
|
|
+ txn, "remove_deactivated_pushers", {"last_user": users[-1]}
|
|
|
+ )
|
|
|
+
|
|
|
+ return len(users)
|
|
|
+
|
|
|
+ number_deleted = await self.db_pool.runInteraction(
|
|
|
+ "_remove_deactivated_pushers", _delete_pushers
|
|
|
+ )
|
|
|
+
|
|
|
+ if number_deleted < batch_size:
|
|
|
+ await self.db_pool.updates._end_background_update(
|
|
|
+ "remove_deactivated_pushers"
|
|
|
+ )
|
|
|
+
|
|
|
+ return number_deleted
|
|
|
+
|
|
|
|
|
|
class PusherStore(PusherWorkerStore):
|
|
|
def get_pushers_stream_token(self) -> int:
|