devices.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. import simplejson as json
  17. from twisted.internet import defer
  18. from synapse.api.errors import StoreError
  19. from ._base import SQLBaseStore, Cache
  20. from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
  21. from six import itervalues, iteritems
  22. logger = logging.getLogger(__name__)
  23. class DeviceStore(SQLBaseStore):
  24. def __init__(self, db_conn, hs):
  25. super(DeviceStore, self).__init__(db_conn, hs)
  26. # Map of (user_id, device_id) -> bool. If there is an entry that implies
  27. # the device exists.
  28. self.device_id_exists_cache = Cache(
  29. name="device_id_exists",
  30. keylen=2,
  31. max_entries=10000,
  32. )
  33. self._clock.looping_call(
  34. self._prune_old_outbound_device_pokes, 60 * 60 * 1000
  35. )
  36. self.register_background_index_update(
  37. "device_lists_stream_idx",
  38. index_name="device_lists_stream_user_id",
  39. table="device_lists_stream",
  40. columns=["user_id", "device_id"],
  41. )
  42. @defer.inlineCallbacks
  43. def store_device(self, user_id, device_id,
  44. initial_device_display_name):
  45. """Ensure the given device is known; add it to the store if not
  46. Args:
  47. user_id (str): id of user associated with the device
  48. device_id (str): id of device
  49. initial_device_display_name (str): initial displayname of the
  50. device. Ignored if device exists.
  51. Returns:
  52. defer.Deferred: boolean whether the device was inserted or an
  53. existing device existed with that ID.
  54. """
  55. key = (user_id, device_id)
  56. if self.device_id_exists_cache.get(key, None):
  57. defer.returnValue(False)
  58. try:
  59. inserted = yield self._simple_insert(
  60. "devices",
  61. values={
  62. "user_id": user_id,
  63. "device_id": device_id,
  64. "display_name": initial_device_display_name
  65. },
  66. desc="store_device",
  67. or_ignore=True,
  68. )
  69. self.device_id_exists_cache.prefill(key, True)
  70. defer.returnValue(inserted)
  71. except Exception as e:
  72. logger.error("store_device with device_id=%s(%r) user_id=%s(%r)"
  73. " display_name=%s(%r) failed: %s",
  74. type(device_id).__name__, device_id,
  75. type(user_id).__name__, user_id,
  76. type(initial_device_display_name).__name__,
  77. initial_device_display_name, e)
  78. raise StoreError(500, "Problem storing device.")
  79. def get_device(self, user_id, device_id):
  80. """Retrieve a device.
  81. Args:
  82. user_id (str): The ID of the user which owns the device
  83. device_id (str): The ID of the device to retrieve
  84. Returns:
  85. defer.Deferred for a dict containing the device information
  86. Raises:
  87. StoreError: if the device is not found
  88. """
  89. return self._simple_select_one(
  90. table="devices",
  91. keyvalues={"user_id": user_id, "device_id": device_id},
  92. retcols=("user_id", "device_id", "display_name"),
  93. desc="get_device",
  94. )
  95. @defer.inlineCallbacks
  96. def delete_device(self, user_id, device_id):
  97. """Delete a device.
  98. Args:
  99. user_id (str): The ID of the user which owns the device
  100. device_id (str): The ID of the device to delete
  101. Returns:
  102. defer.Deferred
  103. """
  104. yield self._simple_delete_one(
  105. table="devices",
  106. keyvalues={"user_id": user_id, "device_id": device_id},
  107. desc="delete_device",
  108. )
  109. self.device_id_exists_cache.invalidate((user_id, device_id))
  110. @defer.inlineCallbacks
  111. def delete_devices(self, user_id, device_ids):
  112. """Deletes several devices.
  113. Args:
  114. user_id (str): The ID of the user which owns the devices
  115. device_ids (list): The IDs of the devices to delete
  116. Returns:
  117. defer.Deferred
  118. """
  119. yield self._simple_delete_many(
  120. table="devices",
  121. column="device_id",
  122. iterable=device_ids,
  123. keyvalues={"user_id": user_id},
  124. desc="delete_devices",
  125. )
  126. for device_id in device_ids:
  127. self.device_id_exists_cache.invalidate((user_id, device_id))
  128. def update_device(self, user_id, device_id, new_display_name=None):
  129. """Update a device.
  130. Args:
  131. user_id (str): The ID of the user which owns the device
  132. device_id (str): The ID of the device to update
  133. new_display_name (str|None): new displayname for device; None
  134. to leave unchanged
  135. Raises:
  136. StoreError: if the device is not found
  137. Returns:
  138. defer.Deferred
  139. """
  140. updates = {}
  141. if new_display_name is not None:
  142. updates["display_name"] = new_display_name
  143. if not updates:
  144. return defer.succeed(None)
  145. return self._simple_update_one(
  146. table="devices",
  147. keyvalues={"user_id": user_id, "device_id": device_id},
  148. updatevalues=updates,
  149. desc="update_device",
  150. )
  151. @defer.inlineCallbacks
  152. def get_devices_by_user(self, user_id):
  153. """Retrieve all of a user's registered devices.
  154. Args:
  155. user_id (str):
  156. Returns:
  157. defer.Deferred: resolves to a dict from device_id to a dict
  158. containing "device_id", "user_id" and "display_name" for each
  159. device.
  160. """
  161. devices = yield self._simple_select_list(
  162. table="devices",
  163. keyvalues={"user_id": user_id},
  164. retcols=("user_id", "device_id", "display_name"),
  165. desc="get_devices_by_user"
  166. )
  167. defer.returnValue({d["device_id"]: d for d in devices})
  168. @cached(max_entries=10000)
  169. def get_device_list_last_stream_id_for_remote(self, user_id):
  170. """Get the last stream_id we got for a user. May be None if we haven't
  171. got any information for them.
  172. """
  173. return self._simple_select_one_onecol(
  174. table="device_lists_remote_extremeties",
  175. keyvalues={"user_id": user_id},
  176. retcol="stream_id",
  177. desc="get_device_list_remote_extremity",
  178. allow_none=True,
  179. )
  180. @cachedList(cached_method_name="get_device_list_last_stream_id_for_remote",
  181. list_name="user_ids", inlineCallbacks=True)
  182. def get_device_list_last_stream_id_for_remotes(self, user_ids):
  183. rows = yield self._simple_select_many_batch(
  184. table="device_lists_remote_extremeties",
  185. column="user_id",
  186. iterable=user_ids,
  187. retcols=("user_id", "stream_id",),
  188. desc="get_user_devices_from_cache",
  189. )
  190. results = {user_id: None for user_id in user_ids}
  191. results.update({
  192. row["user_id"]: row["stream_id"] for row in rows
  193. })
  194. defer.returnValue(results)
  195. @defer.inlineCallbacks
  196. def mark_remote_user_device_list_as_unsubscribed(self, user_id):
  197. """Mark that we no longer track device lists for remote user.
  198. """
  199. yield self._simple_delete(
  200. table="device_lists_remote_extremeties",
  201. keyvalues={
  202. "user_id": user_id,
  203. },
  204. desc="mark_remote_user_device_list_as_unsubscribed",
  205. )
  206. self.get_device_list_last_stream_id_for_remote.invalidate((user_id,))
  207. def update_remote_device_list_cache_entry(self, user_id, device_id, content,
  208. stream_id):
  209. """Updates a single user's device in the cache.
  210. """
  211. return self.runInteraction(
  212. "update_remote_device_list_cache_entry",
  213. self._update_remote_device_list_cache_entry_txn,
  214. user_id, device_id, content, stream_id,
  215. )
  216. def _update_remote_device_list_cache_entry_txn(self, txn, user_id, device_id,
  217. content, stream_id):
  218. self._simple_upsert_txn(
  219. txn,
  220. table="device_lists_remote_cache",
  221. keyvalues={
  222. "user_id": user_id,
  223. "device_id": device_id,
  224. },
  225. values={
  226. "content": json.dumps(content),
  227. }
  228. )
  229. txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id,))
  230. txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,))
  231. txn.call_after(
  232. self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,)
  233. )
  234. self._simple_upsert_txn(
  235. txn,
  236. table="device_lists_remote_extremeties",
  237. keyvalues={
  238. "user_id": user_id,
  239. },
  240. values={
  241. "stream_id": stream_id,
  242. }
  243. )
  244. def update_remote_device_list_cache(self, user_id, devices, stream_id):
  245. """Replace the cache of the remote user's devices.
  246. """
  247. return self.runInteraction(
  248. "update_remote_device_list_cache",
  249. self._update_remote_device_list_cache_txn,
  250. user_id, devices, stream_id,
  251. )
  252. def _update_remote_device_list_cache_txn(self, txn, user_id, devices,
  253. stream_id):
  254. self._simple_delete_txn(
  255. txn,
  256. table="device_lists_remote_cache",
  257. keyvalues={
  258. "user_id": user_id,
  259. },
  260. )
  261. self._simple_insert_many_txn(
  262. txn,
  263. table="device_lists_remote_cache",
  264. values=[
  265. {
  266. "user_id": user_id,
  267. "device_id": content["device_id"],
  268. "content": json.dumps(content),
  269. }
  270. for content in devices
  271. ]
  272. )
  273. txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,))
  274. txn.call_after(self._get_cached_user_device.invalidate_many, (user_id,))
  275. txn.call_after(
  276. self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,)
  277. )
  278. self._simple_upsert_txn(
  279. txn,
  280. table="device_lists_remote_extremeties",
  281. keyvalues={
  282. "user_id": user_id,
  283. },
  284. values={
  285. "stream_id": stream_id,
  286. }
  287. )
  288. def get_devices_by_remote(self, destination, from_stream_id):
  289. """Get stream of updates to send to remote servers
  290. Returns:
  291. (int, list[dict]): current stream id and list of updates
  292. """
  293. now_stream_id = self._device_list_id_gen.get_current_token()
  294. has_changed = self._device_list_federation_stream_cache.has_entity_changed(
  295. destination, int(from_stream_id)
  296. )
  297. if not has_changed:
  298. return (now_stream_id, [])
  299. return self.runInteraction(
  300. "get_devices_by_remote", self._get_devices_by_remote_txn,
  301. destination, from_stream_id, now_stream_id,
  302. )
  303. def _get_devices_by_remote_txn(self, txn, destination, from_stream_id,
  304. now_stream_id):
  305. sql = """
  306. SELECT user_id, device_id, max(stream_id) FROM device_lists_outbound_pokes
  307. WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ?
  308. GROUP BY user_id, device_id
  309. LIMIT 20
  310. """
  311. txn.execute(
  312. sql, (destination, from_stream_id, now_stream_id, False)
  313. )
  314. # maps (user_id, device_id) -> stream_id
  315. query_map = {(r[0], r[1]): r[2] for r in txn}
  316. if not query_map:
  317. return (now_stream_id, [])
  318. if len(query_map) >= 20:
  319. now_stream_id = max(stream_id for stream_id in itervalues(query_map))
  320. devices = self._get_e2e_device_keys_txn(
  321. txn, query_map.keys(), include_all_devices=True
  322. )
  323. prev_sent_id_sql = """
  324. SELECT coalesce(max(stream_id), 0) as stream_id
  325. FROM device_lists_outbound_last_success
  326. WHERE destination = ? AND user_id = ? AND stream_id <= ?
  327. """
  328. results = []
  329. for user_id, user_devices in iteritems(devices):
  330. # The prev_id for the first row is always the last row before
  331. # `from_stream_id`
  332. txn.execute(prev_sent_id_sql, (destination, user_id, from_stream_id))
  333. rows = txn.fetchall()
  334. prev_id = rows[0][0]
  335. for device_id, device in iteritems(user_devices):
  336. stream_id = query_map[(user_id, device_id)]
  337. result = {
  338. "user_id": user_id,
  339. "device_id": device_id,
  340. "prev_id": [prev_id] if prev_id else [],
  341. "stream_id": stream_id,
  342. }
  343. prev_id = stream_id
  344. key_json = device.get("key_json", None)
  345. if key_json:
  346. result["keys"] = json.loads(key_json)
  347. device_display_name = device.get("device_display_name", None)
  348. if device_display_name:
  349. result["device_display_name"] = device_display_name
  350. results.append(result)
  351. return (now_stream_id, results)
  352. @defer.inlineCallbacks
  353. def get_user_devices_from_cache(self, query_list):
  354. """Get the devices (and keys if any) for remote users from the cache.
  355. Args:
  356. query_list(list): List of (user_id, device_ids), if device_ids is
  357. falsey then return all device ids for that user.
  358. Returns:
  359. (user_ids_not_in_cache, results_map), where user_ids_not_in_cache is
  360. a set of user_ids and results_map is a mapping of
  361. user_id -> device_id -> device_info
  362. """
  363. user_ids = set(user_id for user_id, _ in query_list)
  364. user_map = yield self.get_device_list_last_stream_id_for_remotes(list(user_ids))
  365. user_ids_in_cache = set(
  366. user_id for user_id, stream_id in user_map.items() if stream_id
  367. )
  368. user_ids_not_in_cache = user_ids - user_ids_in_cache
  369. results = {}
  370. for user_id, device_id in query_list:
  371. if user_id not in user_ids_in_cache:
  372. continue
  373. if device_id:
  374. device = yield self._get_cached_user_device(user_id, device_id)
  375. results.setdefault(user_id, {})[device_id] = device
  376. else:
  377. results[user_id] = yield self._get_cached_devices_for_user(user_id)
  378. defer.returnValue((user_ids_not_in_cache, results))
  379. @cachedInlineCallbacks(num_args=2, tree=True)
  380. def _get_cached_user_device(self, user_id, device_id):
  381. content = yield self._simple_select_one_onecol(
  382. table="device_lists_remote_cache",
  383. keyvalues={
  384. "user_id": user_id,
  385. "device_id": device_id,
  386. },
  387. retcol="content",
  388. desc="_get_cached_user_device",
  389. )
  390. defer.returnValue(json.loads(content))
  391. @cachedInlineCallbacks()
  392. def _get_cached_devices_for_user(self, user_id):
  393. devices = yield self._simple_select_list(
  394. table="device_lists_remote_cache",
  395. keyvalues={
  396. "user_id": user_id,
  397. },
  398. retcols=("device_id", "content"),
  399. desc="_get_cached_devices_for_user",
  400. )
  401. defer.returnValue({
  402. device["device_id"]: json.loads(device["content"])
  403. for device in devices
  404. })
  405. def get_devices_with_keys_by_user(self, user_id):
  406. """Get all devices (with any device keys) for a user
  407. Returns:
  408. (stream_id, devices)
  409. """
  410. return self.runInteraction(
  411. "get_devices_with_keys_by_user",
  412. self._get_devices_with_keys_by_user_txn, user_id,
  413. )
  414. def _get_devices_with_keys_by_user_txn(self, txn, user_id):
  415. now_stream_id = self._device_list_id_gen.get_current_token()
  416. devices = self._get_e2e_device_keys_txn(
  417. txn, [(user_id, None)], include_all_devices=True
  418. )
  419. if devices:
  420. user_devices = devices[user_id]
  421. results = []
  422. for device_id, device in iteritems(user_devices):
  423. result = {
  424. "device_id": device_id,
  425. }
  426. key_json = device.get("key_json", None)
  427. if key_json:
  428. result["keys"] = json.loads(key_json)
  429. device_display_name = device.get("device_display_name", None)
  430. if device_display_name:
  431. result["device_display_name"] = device_display_name
  432. results.append(result)
  433. return now_stream_id, results
  434. return now_stream_id, []
  435. def mark_as_sent_devices_by_remote(self, destination, stream_id):
  436. """Mark that updates have successfully been sent to the destination.
  437. """
  438. return self.runInteraction(
  439. "mark_as_sent_devices_by_remote", self._mark_as_sent_devices_by_remote_txn,
  440. destination, stream_id,
  441. )
  442. def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id):
  443. # We update the device_lists_outbound_last_success with the successfully
  444. # poked users. We do the join to see which users need to be inserted and
  445. # which updated.
  446. sql = """
  447. SELECT user_id, coalesce(max(o.stream_id), 0), (max(s.stream_id) IS NOT NULL)
  448. FROM device_lists_outbound_pokes as o
  449. LEFT JOIN device_lists_outbound_last_success as s
  450. USING (destination, user_id)
  451. WHERE destination = ? AND o.stream_id <= ?
  452. GROUP BY user_id
  453. """
  454. txn.execute(sql, (destination, stream_id,))
  455. rows = txn.fetchall()
  456. sql = """
  457. UPDATE device_lists_outbound_last_success
  458. SET stream_id = ?
  459. WHERE destination = ? AND user_id = ?
  460. """
  461. txn.executemany(
  462. sql, ((row[1], destination, row[0],) for row in rows if row[2])
  463. )
  464. sql = """
  465. INSERT INTO device_lists_outbound_last_success
  466. (destination, user_id, stream_id) VALUES (?, ?, ?)
  467. """
  468. txn.executemany(
  469. sql, ((destination, row[0], row[1],) for row in rows if not row[2])
  470. )
  471. # Delete all sent outbound pokes
  472. sql = """
  473. DELETE FROM device_lists_outbound_pokes
  474. WHERE destination = ? AND stream_id <= ?
  475. """
  476. txn.execute(sql, (destination, stream_id,))
  477. @defer.inlineCallbacks
  478. def get_user_whose_devices_changed(self, from_key):
  479. """Get set of users whose devices have changed since `from_key`.
  480. """
  481. from_key = int(from_key)
  482. changed = self._device_list_stream_cache.get_all_entities_changed(from_key)
  483. if changed is not None:
  484. defer.returnValue(set(changed))
  485. sql = """
  486. SELECT DISTINCT user_id FROM device_lists_stream WHERE stream_id > ?
  487. """
  488. rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key)
  489. defer.returnValue(set(row[0] for row in rows))
  490. def get_all_device_list_changes_for_remotes(self, from_key, to_key):
  491. """Return a list of `(stream_id, user_id, destination)` which is the
  492. combined list of changes to devices, and which destinations need to be
  493. poked. `destination` may be None if no destinations need to be poked.
  494. """
  495. sql = """
  496. SELECT stream_id, user_id, destination FROM device_lists_stream
  497. LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id)
  498. WHERE ? < stream_id AND stream_id <= ?
  499. """
  500. return self._execute(
  501. "get_all_device_list_changes_for_remotes", None,
  502. sql, from_key, to_key
  503. )
  504. @defer.inlineCallbacks
  505. def add_device_change_to_streams(self, user_id, device_ids, hosts):
  506. """Persist that a user's devices have been updated, and which hosts
  507. (if any) should be poked.
  508. """
  509. with self._device_list_id_gen.get_next() as stream_id:
  510. yield self.runInteraction(
  511. "add_device_change_to_streams", self._add_device_change_txn,
  512. user_id, device_ids, hosts, stream_id,
  513. )
  514. defer.returnValue(stream_id)
  515. def _add_device_change_txn(self, txn, user_id, device_ids, hosts, stream_id):
  516. now = self._clock.time_msec()
  517. txn.call_after(
  518. self._device_list_stream_cache.entity_has_changed,
  519. user_id, stream_id,
  520. )
  521. for host in hosts:
  522. txn.call_after(
  523. self._device_list_federation_stream_cache.entity_has_changed,
  524. host, stream_id,
  525. )
  526. # Delete older entries in the table, as we really only care about
  527. # when the latest change happened.
  528. txn.executemany(
  529. """
  530. DELETE FROM device_lists_stream
  531. WHERE user_id = ? AND device_id = ? AND stream_id < ?
  532. """,
  533. [(user_id, device_id, stream_id) for device_id in device_ids]
  534. )
  535. self._simple_insert_many_txn(
  536. txn,
  537. table="device_lists_stream",
  538. values=[
  539. {
  540. "stream_id": stream_id,
  541. "user_id": user_id,
  542. "device_id": device_id,
  543. }
  544. for device_id in device_ids
  545. ]
  546. )
  547. self._simple_insert_many_txn(
  548. txn,
  549. table="device_lists_outbound_pokes",
  550. values=[
  551. {
  552. "destination": destination,
  553. "stream_id": stream_id,
  554. "user_id": user_id,
  555. "device_id": device_id,
  556. "sent": False,
  557. "ts": now,
  558. }
  559. for destination in hosts
  560. for device_id in device_ids
  561. ]
  562. )
  563. def get_device_stream_token(self):
  564. return self._device_list_id_gen.get_current_token()
  565. def _prune_old_outbound_device_pokes(self):
  566. """Delete old entries out of the device_lists_outbound_pokes to ensure
  567. that we don't fill up due to dead servers. We keep one entry per
  568. (destination, user_id) tuple to ensure that the prev_ids remain correct
  569. if the server does come back.
  570. """
  571. yesterday = self._clock.time_msec() - 24 * 60 * 60 * 1000
  572. def _prune_txn(txn):
  573. select_sql = """
  574. SELECT destination, user_id, max(stream_id) as stream_id
  575. FROM device_lists_outbound_pokes
  576. GROUP BY destination, user_id
  577. HAVING min(ts) < ? AND count(*) > 1
  578. """
  579. txn.execute(select_sql, (yesterday,))
  580. rows = txn.fetchall()
  581. if not rows:
  582. return
  583. delete_sql = """
  584. DELETE FROM device_lists_outbound_pokes
  585. WHERE ts < ? AND destination = ? AND user_id = ? AND stream_id < ?
  586. """
  587. txn.executemany(
  588. delete_sql,
  589. (
  590. (yesterday, row[0], row[1], row[2])
  591. for row in rows
  592. )
  593. )
  594. # Since we've deleted unsent deltas, we need to remove the entry
  595. # of last successful sent so that the prev_ids are correctly set.
  596. sql = """
  597. DELETE FROM device_lists_outbound_last_success
  598. WHERE destination = ? AND user_id = ?
  599. """
  600. txn.executemany(sql, ((row[0], row[1]) for row in rows))
  601. logger.info("Pruned %d device list outbound pokes", txn.rowcount)
  602. return self.runInteraction(
  603. "_prune_old_outbound_device_pokes", _prune_txn
  604. )