pusher.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713
  1. # Copyright 2014-2016 OpenMarket Ltd
  2. # Copyright 2018 New Vector Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. from typing import (
  17. TYPE_CHECKING,
  18. Any,
  19. Dict,
  20. Iterable,
  21. Iterator,
  22. List,
  23. Optional,
  24. Tuple,
  25. cast,
  26. )
  27. from synapse.push import PusherConfig, ThrottleParams
  28. from synapse.replication.tcp.streams import PushersStream
  29. from synapse.storage._base import SQLBaseStore, db_to_json
  30. from synapse.storage.database import (
  31. DatabasePool,
  32. LoggingDatabaseConnection,
  33. LoggingTransaction,
  34. )
  35. from synapse.storage.util.id_generators import (
  36. AbstractStreamIdGenerator,
  37. StreamIdGenerator,
  38. )
  39. from synapse.types import JsonDict
  40. from synapse.util import json_encoder
  41. from synapse.util.caches.descriptors import cached
  42. if TYPE_CHECKING:
  43. from synapse.server import HomeServer
  44. logger = logging.getLogger(__name__)
  45. class PusherWorkerStore(SQLBaseStore):
  46. def __init__(
  47. self,
  48. database: DatabasePool,
  49. db_conn: LoggingDatabaseConnection,
  50. hs: "HomeServer",
  51. ):
  52. super().__init__(database, db_conn, hs)
  53. # In the worker store this is an ID tracker which we overwrite in the non-worker
  54. # class below that is used on the main process.
  55. self._pushers_id_gen = StreamIdGenerator(
  56. db_conn,
  57. hs.get_replication_notifier(),
  58. "pushers",
  59. "id",
  60. extra_tables=[("deleted_pushers", "stream_id")],
  61. is_writer=hs.config.worker.worker_app is None,
  62. )
  63. self.db_pool.updates.register_background_update_handler(
  64. "remove_deactivated_pushers",
  65. self._remove_deactivated_pushers,
  66. )
  67. self.db_pool.updates.register_background_update_handler(
  68. "remove_stale_pushers",
  69. self._remove_stale_pushers,
  70. )
  71. self.db_pool.updates.register_background_update_handler(
  72. "remove_deleted_email_pushers",
  73. self._remove_deleted_email_pushers,
  74. )
  75. def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]:
  76. """JSON-decode the data in the rows returned from the `pushers` table
  77. Drops any rows whose data cannot be decoded
  78. """
  79. for r in rows:
  80. data_json = r["data"]
  81. try:
  82. r["data"] = db_to_json(data_json)
  83. except Exception as e:
  84. logger.warning(
  85. "Invalid JSON in data for pusher %d: %s, %s",
  86. r["id"],
  87. data_json,
  88. e.args[0],
  89. )
  90. continue
  91. # If we're using SQLite, then boolean values are integers. This is
  92. # troublesome since some code using the return value of this method might
  93. # expect it to be a boolean, or will expose it to clients (in responses).
  94. r["enabled"] = bool(r["enabled"])
  95. yield PusherConfig(**r)
  96. def get_pushers_stream_token(self) -> int:
  97. return self._pushers_id_gen.get_current_token()
  98. def process_replication_position(
  99. self, stream_name: str, instance_name: str, token: int
  100. ) -> None:
  101. if stream_name == PushersStream.NAME:
  102. self._pushers_id_gen.advance(instance_name, token)
  103. super().process_replication_position(stream_name, instance_name, token)
  104. async def get_pushers_by_app_id_and_pushkey(
  105. self, app_id: str, pushkey: str
  106. ) -> Iterator[PusherConfig]:
  107. return await self.get_pushers_by({"app_id": app_id, "pushkey": pushkey})
  108. async def get_pushers_by_user_id(self, user_id: str) -> Iterator[PusherConfig]:
  109. return await self.get_pushers_by({"user_name": user_id})
  110. async def get_pushers_by(self, keyvalues: Dict[str, Any]) -> Iterator[PusherConfig]:
  111. """Retrieve pushers that match the given criteria.
  112. Args:
  113. keyvalues: A {column: value} dictionary.
  114. Returns:
  115. The pushers for which the given columns have the given values.
  116. """
  117. def get_pushers_by_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]:
  118. # We could technically use simple_select_list here, but we need to call
  119. # COALESCE on the 'enabled' column. While it is technically possible to give
  120. # simple_select_list the whole `COALESCE(...) AS ...` as a column name, it
  121. # feels a bit hacky, so it's probably better to just inline the query.
  122. sql = """
  123. SELECT
  124. id, user_name, access_token, profile_tag, kind, app_id,
  125. app_display_name, device_display_name, pushkey, ts, lang, data,
  126. last_stream_ordering, last_success, failing_since,
  127. COALESCE(enabled, TRUE) AS enabled, device_id
  128. FROM pushers
  129. """
  130. sql += "WHERE %s" % (" AND ".join("%s = ?" % (k,) for k in keyvalues),)
  131. txn.execute(sql, list(keyvalues.values()))
  132. return self.db_pool.cursor_to_dict(txn)
  133. ret = await self.db_pool.runInteraction(
  134. desc="get_pushers_by",
  135. func=get_pushers_by_txn,
  136. )
  137. return self._decode_pushers_rows(ret)
  138. async def get_enabled_pushers(self) -> Iterator[PusherConfig]:
  139. def get_enabled_pushers_txn(txn: LoggingTransaction) -> Iterator[PusherConfig]:
  140. txn.execute("SELECT * FROM pushers WHERE COALESCE(enabled, TRUE)")
  141. rows = self.db_pool.cursor_to_dict(txn)
  142. return self._decode_pushers_rows(rows)
  143. return await self.db_pool.runInteraction(
  144. "get_enabled_pushers", get_enabled_pushers_txn
  145. )
  146. async def get_all_updated_pushers_rows(
  147. self, instance_name: str, last_id: int, current_id: int, limit: int
  148. ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
  149. """Get updates for pushers replication stream.
  150. Args:
  151. instance_name: The writer we want to fetch updates from. Unused
  152. here since there is only ever one writer.
  153. last_id: The token to fetch updates from. Exclusive.
  154. current_id: The token to fetch updates up to. Inclusive.
  155. limit: The requested limit for the number of rows to return. The
  156. function may return more or fewer rows.
  157. Returns:
  158. A tuple consisting of: the updates, a token to use to fetch
  159. subsequent updates, and whether we returned fewer rows than exists
  160. between the requested tokens due to the limit.
  161. The token returned can be used in a subsequent call to this
  162. function to get further updatees.
  163. The updates are a list of 2-tuples of stream ID and the row data
  164. """
  165. if last_id == current_id:
  166. return [], current_id, False
  167. def get_all_updated_pushers_rows_txn(
  168. txn: LoggingTransaction,
  169. ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
  170. sql = """
  171. SELECT id, user_name, app_id, pushkey
  172. FROM pushers
  173. WHERE ? < id AND id <= ?
  174. ORDER BY id ASC LIMIT ?
  175. """
  176. txn.execute(sql, (last_id, current_id, limit))
  177. updates = cast(
  178. List[Tuple[int, tuple]],
  179. [
  180. (stream_id, (user_name, app_id, pushkey, False))
  181. for stream_id, user_name, app_id, pushkey in txn
  182. ],
  183. )
  184. sql = """
  185. SELECT stream_id, user_id, app_id, pushkey
  186. FROM deleted_pushers
  187. WHERE ? < stream_id AND stream_id <= ?
  188. ORDER BY stream_id ASC LIMIT ?
  189. """
  190. txn.execute(sql, (last_id, current_id, limit))
  191. updates.extend(
  192. (stream_id, (user_name, app_id, pushkey, True))
  193. for stream_id, user_name, app_id, pushkey in txn
  194. )
  195. updates.sort() # Sort so that they're ordered by stream id
  196. limited = False
  197. upper_bound = current_id
  198. if len(updates) >= limit:
  199. limited = True
  200. upper_bound = updates[-1][0]
  201. return updates, upper_bound, limited
  202. return await self.db_pool.runInteraction(
  203. "get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn
  204. )
  205. @cached(num_args=1, max_entries=15000)
  206. async def get_if_user_has_pusher(self, user_id: str) -> None:
  207. # This only exists for the cachedList decorator
  208. raise NotImplementedError()
  209. async def update_pusher_last_stream_ordering(
  210. self, app_id: str, pushkey: str, user_id: str, last_stream_ordering: int
  211. ) -> None:
  212. await self.db_pool.simple_update_one(
  213. "pushers",
  214. {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
  215. {"last_stream_ordering": last_stream_ordering},
  216. desc="update_pusher_last_stream_ordering",
  217. )
  218. async def update_pusher_last_stream_ordering_and_success(
  219. self,
  220. app_id: str,
  221. pushkey: str,
  222. user_id: str,
  223. last_stream_ordering: int,
  224. last_success: int,
  225. ) -> bool:
  226. """Update the last stream ordering position we've processed up to for
  227. the given pusher.
  228. Args:
  229. app_id
  230. pushkey
  231. user_id
  232. last_stream_ordering
  233. last_success
  234. Returns:
  235. True if the pusher still exists; False if it has been deleted.
  236. """
  237. updated = await self.db_pool.simple_update(
  238. table="pushers",
  239. keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
  240. updatevalues={
  241. "last_stream_ordering": last_stream_ordering,
  242. "last_success": last_success,
  243. },
  244. desc="update_pusher_last_stream_ordering_and_success",
  245. )
  246. return bool(updated)
  247. async def update_pusher_failing_since(
  248. self, app_id: str, pushkey: str, user_id: str, failing_since: Optional[int]
  249. ) -> None:
  250. await self.db_pool.simple_update(
  251. table="pushers",
  252. keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
  253. updatevalues={"failing_since": failing_since},
  254. desc="update_pusher_failing_since",
  255. )
  256. async def get_throttle_params_by_room(
  257. self, pusher_id: str
  258. ) -> Dict[str, ThrottleParams]:
  259. res = await self.db_pool.simple_select_list(
  260. "pusher_throttle",
  261. {"pusher": pusher_id},
  262. ["room_id", "last_sent_ts", "throttle_ms"],
  263. desc="get_throttle_params_by_room",
  264. )
  265. params_by_room = {}
  266. for row in res:
  267. params_by_room[row["room_id"]] = ThrottleParams(
  268. row["last_sent_ts"],
  269. row["throttle_ms"],
  270. )
  271. return params_by_room
  272. async def set_throttle_params(
  273. self, pusher_id: str, room_id: str, params: ThrottleParams
  274. ) -> None:
  275. await self.db_pool.simple_upsert(
  276. "pusher_throttle",
  277. {"pusher": pusher_id, "room_id": room_id},
  278. {"last_sent_ts": params.last_sent_ts, "throttle_ms": params.throttle_ms},
  279. desc="set_throttle_params",
  280. )
  281. async def _remove_deactivated_pushers(self, progress: dict, batch_size: int) -> int:
  282. """A background update that deletes all pushers for deactivated users.
  283. Note that we don't proacively tell the pusherpool that we've deleted
  284. these (just because its a bit off a faff to do from here), but they will
  285. get cleaned up at the next restart
  286. """
  287. last_user = progress.get("last_user", "")
  288. def _delete_pushers(txn: LoggingTransaction) -> int:
  289. sql = """
  290. SELECT name FROM users
  291. WHERE deactivated = ? and name > ?
  292. ORDER BY name ASC
  293. LIMIT ?
  294. """
  295. txn.execute(sql, (1, last_user, batch_size))
  296. users = [row[0] for row in txn]
  297. self.db_pool.simple_delete_many_txn(
  298. txn,
  299. table="pushers",
  300. column="user_name",
  301. values=users,
  302. keyvalues={},
  303. )
  304. if users:
  305. self.db_pool.updates._background_update_progress_txn(
  306. txn, "remove_deactivated_pushers", {"last_user": users[-1]}
  307. )
  308. return len(users)
  309. number_deleted = await self.db_pool.runInteraction(
  310. "_remove_deactivated_pushers", _delete_pushers
  311. )
  312. if number_deleted < batch_size:
  313. await self.db_pool.updates._end_background_update(
  314. "remove_deactivated_pushers"
  315. )
  316. return number_deleted
  317. async def _remove_stale_pushers(self, progress: dict, batch_size: int) -> int:
  318. """A background update that deletes all pushers for logged out devices.
  319. Note that we don't proacively tell the pusherpool that we've deleted
  320. these (just because its a bit off a faff to do from here), but they will
  321. get cleaned up at the next restart
  322. """
  323. last_pusher = progress.get("last_pusher", 0)
  324. def _delete_pushers(txn: LoggingTransaction) -> int:
  325. sql = """
  326. SELECT p.id, access_token FROM pushers AS p
  327. LEFT JOIN access_tokens AS a ON (p.access_token = a.id)
  328. WHERE p.id > ?
  329. ORDER BY p.id ASC
  330. LIMIT ?
  331. """
  332. txn.execute(sql, (last_pusher, batch_size))
  333. pushers = [(row[0], row[1]) for row in txn]
  334. self.db_pool.simple_delete_many_txn(
  335. txn,
  336. table="pushers",
  337. column="id",
  338. values=[pusher_id for pusher_id, token in pushers if token is None],
  339. keyvalues={},
  340. )
  341. if pushers:
  342. self.db_pool.updates._background_update_progress_txn(
  343. txn, "remove_stale_pushers", {"last_pusher": pushers[-1][0]}
  344. )
  345. return len(pushers)
  346. number_deleted = await self.db_pool.runInteraction(
  347. "_remove_stale_pushers", _delete_pushers
  348. )
  349. if number_deleted < batch_size:
  350. await self.db_pool.updates._end_background_update("remove_stale_pushers")
  351. return number_deleted
  352. async def _remove_deleted_email_pushers(
  353. self, progress: dict, batch_size: int
  354. ) -> int:
  355. """A background update that deletes all pushers for deleted email addresses.
  356. In previous versions of synapse, when users deleted their email address, it didn't
  357. also delete all the pushers for that email address. This background update removes
  358. those to prevent unwanted emails. This should only need to be run once (when users
  359. upgrade to v1.42.0
  360. Args:
  361. progress: dict used to store progress of this background update
  362. batch_size: the maximum number of rows to retrieve in a single select query
  363. Returns:
  364. The number of deleted rows
  365. """
  366. last_pusher = progress.get("last_pusher", 0)
  367. def _delete_pushers(txn: LoggingTransaction) -> int:
  368. sql = """
  369. SELECT p.id, p.user_name, p.app_id, p.pushkey
  370. FROM pushers AS p
  371. LEFT JOIN user_threepids AS t
  372. ON t.user_id = p.user_name
  373. AND t.medium = 'email'
  374. AND t.address = p.pushkey
  375. WHERE t.user_id is NULL
  376. AND p.app_id = 'm.email'
  377. AND p.id > ?
  378. ORDER BY p.id ASC
  379. LIMIT ?
  380. """
  381. txn.execute(sql, (last_pusher, batch_size))
  382. rows = txn.fetchall()
  383. last = None
  384. num_deleted = 0
  385. for row in rows:
  386. last = row[0]
  387. num_deleted += 1
  388. self.db_pool.simple_delete_txn(
  389. txn,
  390. "pushers",
  391. {"user_name": row[1], "app_id": row[2], "pushkey": row[3]},
  392. )
  393. if last is not None:
  394. self.db_pool.updates._background_update_progress_txn(
  395. txn, "remove_deleted_email_pushers", {"last_pusher": last}
  396. )
  397. return num_deleted
  398. number_deleted = await self.db_pool.runInteraction(
  399. "_remove_deleted_email_pushers", _delete_pushers
  400. )
  401. if number_deleted < batch_size:
  402. await self.db_pool.updates._end_background_update(
  403. "remove_deleted_email_pushers"
  404. )
  405. return number_deleted
  406. class PusherBackgroundUpdatesStore(SQLBaseStore):
  407. def __init__(
  408. self,
  409. database: DatabasePool,
  410. db_conn: LoggingDatabaseConnection,
  411. hs: "HomeServer",
  412. ):
  413. super().__init__(database, db_conn, hs)
  414. self.db_pool.updates.register_background_update_handler(
  415. "set_device_id_for_pushers", self._set_device_id_for_pushers
  416. )
  417. async def _set_device_id_for_pushers(
  418. self, progress: JsonDict, batch_size: int
  419. ) -> int:
  420. """
  421. Background update to populate the device_id column and clear the access_token
  422. column for the pushers table.
  423. """
  424. last_pusher_id = progress.get("pusher_id", 0)
  425. def set_device_id_for_pushers_txn(txn: LoggingTransaction) -> int:
  426. txn.execute(
  427. """
  428. SELECT
  429. p.id AS pusher_id,
  430. p.device_id AS pusher_device_id,
  431. at.device_id AS token_device_id
  432. FROM pushers AS p
  433. LEFT JOIN access_tokens AS at
  434. ON p.access_token = at.id
  435. WHERE
  436. p.access_token IS NOT NULL
  437. AND p.id > ?
  438. ORDER BY p.id
  439. LIMIT ?
  440. """,
  441. (last_pusher_id, batch_size),
  442. )
  443. rows = self.db_pool.cursor_to_dict(txn)
  444. if len(rows) == 0:
  445. return 0
  446. # The reason we're clearing the access_token column here is a bit subtle.
  447. # When a user logs out, we:
  448. # (1) delete the access token
  449. # (2) delete the device
  450. #
  451. # Ideally, we would delete the pushers only via its link to the device
  452. # during (2), but since this background update might not have fully run yet,
  453. # we're still deleting the pushers via the access token during (1).
  454. self.db_pool.simple_update_many_txn(
  455. txn=txn,
  456. table="pushers",
  457. key_names=("id",),
  458. key_values=[(row["pusher_id"],) for row in rows],
  459. value_names=("device_id", "access_token"),
  460. # If there was already a device_id on the pusher, we only want to clear
  461. # the access_token column, so we keep the existing device_id. Otherwise,
  462. # we set the device_id we got from joining the access_tokens table.
  463. value_values=[
  464. (row["pusher_device_id"] or row["token_device_id"], None)
  465. for row in rows
  466. ],
  467. )
  468. self.db_pool.updates._background_update_progress_txn(
  469. txn, "set_device_id_for_pushers", {"pusher_id": rows[-1]["id"]}
  470. )
  471. return len(rows)
  472. nb_processed = await self.db_pool.runInteraction(
  473. "set_device_id_for_pushers", set_device_id_for_pushers_txn
  474. )
  475. if nb_processed < batch_size:
  476. await self.db_pool.updates._end_background_update(
  477. "set_device_id_for_pushers"
  478. )
  479. return nb_processed
  480. class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
  481. # Because we have write access, this will be a StreamIdGenerator
  482. # (see PusherWorkerStore.__init__)
  483. _pushers_id_gen: AbstractStreamIdGenerator
  484. async def add_pusher(
  485. self,
  486. user_id: str,
  487. kind: str,
  488. app_id: str,
  489. app_display_name: str,
  490. device_display_name: str,
  491. pushkey: str,
  492. pushkey_ts: int,
  493. lang: Optional[str],
  494. data: Optional[JsonDict],
  495. last_stream_ordering: int,
  496. profile_tag: str = "",
  497. enabled: bool = True,
  498. device_id: Optional[str] = None,
  499. access_token_id: Optional[int] = None,
  500. ) -> None:
  501. async with self._pushers_id_gen.get_next() as stream_id:
  502. await self.db_pool.simple_upsert(
  503. table="pushers",
  504. keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
  505. values={
  506. "kind": kind,
  507. "app_display_name": app_display_name,
  508. "device_display_name": device_display_name,
  509. "ts": pushkey_ts,
  510. "lang": lang,
  511. "data": json_encoder.encode(data),
  512. "last_stream_ordering": last_stream_ordering,
  513. "profile_tag": profile_tag,
  514. "id": stream_id,
  515. "enabled": enabled,
  516. "device_id": device_id,
  517. # XXX(quenting): We're only really persisting the access token ID
  518. # when updating an existing pusher. This is in case the
  519. # 'set_device_id_for_pushers' background update hasn't finished yet.
  520. "access_token": access_token_id,
  521. },
  522. desc="add_pusher",
  523. )
  524. user_has_pusher = self.get_if_user_has_pusher.cache.get_immediate(
  525. (user_id,), None, update_metrics=False
  526. )
  527. if user_has_pusher is not True:
  528. # invalidate, since we the user might not have had a pusher before
  529. await self.db_pool.runInteraction(
  530. "add_pusher",
  531. self._invalidate_cache_and_stream, # type: ignore[attr-defined]
  532. self.get_if_user_has_pusher,
  533. (user_id,),
  534. )
  535. async def delete_pusher_by_app_id_pushkey_user_id(
  536. self, app_id: str, pushkey: str, user_id: str
  537. ) -> None:
  538. def delete_pusher_txn(txn: LoggingTransaction, stream_id: int) -> None:
  539. self._invalidate_cache_and_stream( # type: ignore[attr-defined]
  540. txn, self.get_if_user_has_pusher, (user_id,)
  541. )
  542. # It is expected that there is exactly one pusher to delete, but
  543. # if it isn't there (or there are multiple) delete them all.
  544. self.db_pool.simple_delete_txn(
  545. txn,
  546. "pushers",
  547. {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
  548. )
  549. # it's possible for us to end up with duplicate rows for
  550. # (app_id, pushkey, user_id) at different stream_ids, but that
  551. # doesn't really matter.
  552. self.db_pool.simple_insert_txn(
  553. txn,
  554. table="deleted_pushers",
  555. values={
  556. "stream_id": stream_id,
  557. "app_id": app_id,
  558. "pushkey": pushkey,
  559. "user_id": user_id,
  560. },
  561. )
  562. async with self._pushers_id_gen.get_next() as stream_id:
  563. await self.db_pool.runInteraction(
  564. "delete_pusher", delete_pusher_txn, stream_id
  565. )
  566. async def delete_all_pushers_for_user(self, user_id: str) -> None:
  567. """Delete all pushers associated with an account."""
  568. # We want to generate a row in `deleted_pushers` for each pusher we're
  569. # deleting, so we fetch the list now so we can generate the appropriate
  570. # number of stream IDs.
  571. #
  572. # Note: technically there could be a race here between adding/deleting
  573. # pushers, but a) the worst case if we don't stop a pusher until the
  574. # next restart and b) this is only called when we're deactivating an
  575. # account.
  576. pushers = list(await self.get_pushers_by_user_id(user_id))
  577. def delete_pushers_txn(txn: LoggingTransaction, stream_ids: List[int]) -> None:
  578. self._invalidate_cache_and_stream( # type: ignore[attr-defined]
  579. txn, self.get_if_user_has_pusher, (user_id,)
  580. )
  581. self.db_pool.simple_delete_txn(
  582. txn,
  583. table="pushers",
  584. keyvalues={"user_name": user_id},
  585. )
  586. self.db_pool.simple_insert_many_txn(
  587. txn,
  588. table="deleted_pushers",
  589. keys=("stream_id", "app_id", "pushkey", "user_id"),
  590. values=[
  591. (stream_id, pusher.app_id, pusher.pushkey, user_id)
  592. for stream_id, pusher in zip(stream_ids, pushers)
  593. ],
  594. )
  595. async with self._pushers_id_gen.get_next_mult(len(pushers)) as stream_ids:
  596. await self.db_pool.runInteraction(
  597. "delete_all_pushers_for_user", delete_pushers_txn, stream_ids
  598. )