devices.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. import simplejson as json
  17. from twisted.internet import defer
  18. from synapse.api.errors import StoreError
  19. from ._base import SQLBaseStore, Cache
  20. from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
  21. logger = logging.getLogger(__name__)
  22. class DeviceStore(SQLBaseStore):
  23. def __init__(self, db_conn, hs):
  24. super(DeviceStore, self).__init__(db_conn, hs)
  25. # Map of (user_id, device_id) -> bool. If there is an entry that implies
  26. # the device exists.
  27. self.device_id_exists_cache = Cache(
  28. name="device_id_exists",
  29. keylen=2,
  30. max_entries=10000,
  31. )
  32. self._clock.looping_call(
  33. self._prune_old_outbound_device_pokes, 60 * 60 * 1000
  34. )
  35. self.register_background_index_update(
  36. "device_lists_stream_idx",
  37. index_name="device_lists_stream_user_id",
  38. table="device_lists_stream",
  39. columns=["user_id", "device_id"],
  40. )
  41. @defer.inlineCallbacks
  42. def store_device(self, user_id, device_id,
  43. initial_device_display_name):
  44. """Ensure the given device is known; add it to the store if not
  45. Args:
  46. user_id (str): id of user associated with the device
  47. device_id (str): id of device
  48. initial_device_display_name (str): initial displayname of the
  49. device. Ignored if device exists.
  50. Returns:
  51. defer.Deferred: boolean whether the device was inserted or an
  52. existing device existed with that ID.
  53. """
  54. key = (user_id, device_id)
  55. if self.device_id_exists_cache.get(key, None):
  56. defer.returnValue(False)
  57. try:
  58. inserted = yield self._simple_insert(
  59. "devices",
  60. values={
  61. "user_id": user_id,
  62. "device_id": device_id,
  63. "display_name": initial_device_display_name
  64. },
  65. desc="store_device",
  66. or_ignore=True,
  67. )
  68. self.device_id_exists_cache.prefill(key, True)
  69. defer.returnValue(inserted)
  70. except Exception as e:
  71. logger.error("store_device with device_id=%s(%r) user_id=%s(%r)"
  72. " display_name=%s(%r) failed: %s",
  73. type(device_id).__name__, device_id,
  74. type(user_id).__name__, user_id,
  75. type(initial_device_display_name).__name__,
  76. initial_device_display_name, e)
  77. raise StoreError(500, "Problem storing device.")
  78. def get_device(self, user_id, device_id):
  79. """Retrieve a device.
  80. Args:
  81. user_id (str): The ID of the user which owns the device
  82. device_id (str): The ID of the device to retrieve
  83. Returns:
  84. defer.Deferred for a dict containing the device information
  85. Raises:
  86. StoreError: if the device is not found
  87. """
  88. return self._simple_select_one(
  89. table="devices",
  90. keyvalues={"user_id": user_id, "device_id": device_id},
  91. retcols=("user_id", "device_id", "display_name"),
  92. desc="get_device",
  93. )
  94. @defer.inlineCallbacks
  95. def delete_device(self, user_id, device_id):
  96. """Delete a device.
  97. Args:
  98. user_id (str): The ID of the user which owns the device
  99. device_id (str): The ID of the device to delete
  100. Returns:
  101. defer.Deferred
  102. """
  103. yield self._simple_delete_one(
  104. table="devices",
  105. keyvalues={"user_id": user_id, "device_id": device_id},
  106. desc="delete_device",
  107. )
  108. self.device_id_exists_cache.invalidate((user_id, device_id))
  109. @defer.inlineCallbacks
  110. def delete_devices(self, user_id, device_ids):
  111. """Deletes several devices.
  112. Args:
  113. user_id (str): The ID of the user which owns the devices
  114. device_ids (list): The IDs of the devices to delete
  115. Returns:
  116. defer.Deferred
  117. """
  118. yield self._simple_delete_many(
  119. table="devices",
  120. column="device_id",
  121. iterable=device_ids,
  122. keyvalues={"user_id": user_id},
  123. desc="delete_devices",
  124. )
  125. for device_id in device_ids:
  126. self.device_id_exists_cache.invalidate((user_id, device_id))
  127. def update_device(self, user_id, device_id, new_display_name=None):
  128. """Update a device.
  129. Args:
  130. user_id (str): The ID of the user which owns the device
  131. device_id (str): The ID of the device to update
  132. new_display_name (str|None): new displayname for device; None
  133. to leave unchanged
  134. Raises:
  135. StoreError: if the device is not found
  136. Returns:
  137. defer.Deferred
  138. """
  139. updates = {}
  140. if new_display_name is not None:
  141. updates["display_name"] = new_display_name
  142. if not updates:
  143. return defer.succeed(None)
  144. return self._simple_update_one(
  145. table="devices",
  146. keyvalues={"user_id": user_id, "device_id": device_id},
  147. updatevalues=updates,
  148. desc="update_device",
  149. )
  150. @defer.inlineCallbacks
  151. def get_devices_by_user(self, user_id):
  152. """Retrieve all of a user's registered devices.
  153. Args:
  154. user_id (str):
  155. Returns:
  156. defer.Deferred: resolves to a dict from device_id to a dict
  157. containing "device_id", "user_id" and "display_name" for each
  158. device.
  159. """
  160. devices = yield self._simple_select_list(
  161. table="devices",
  162. keyvalues={"user_id": user_id},
  163. retcols=("user_id", "device_id", "display_name"),
  164. desc="get_devices_by_user"
  165. )
  166. defer.returnValue({d["device_id"]: d for d in devices})
  167. @cached(max_entries=10000)
  168. def get_device_list_last_stream_id_for_remote(self, user_id):
  169. """Get the last stream_id we got for a user. May be None if we haven't
  170. got any information for them.
  171. """
  172. return self._simple_select_one_onecol(
  173. table="device_lists_remote_extremeties",
  174. keyvalues={"user_id": user_id},
  175. retcol="stream_id",
  176. desc="get_device_list_remote_extremity",
  177. allow_none=True,
  178. )
  179. @cachedList(cached_method_name="get_device_list_last_stream_id_for_remote",
  180. list_name="user_ids", inlineCallbacks=True)
  181. def get_device_list_last_stream_id_for_remotes(self, user_ids):
  182. rows = yield self._simple_select_many_batch(
  183. table="device_lists_remote_extremeties",
  184. column="user_id",
  185. iterable=user_ids,
  186. retcols=("user_id", "stream_id",),
  187. desc="get_user_devices_from_cache",
  188. )
  189. results = {user_id: None for user_id in user_ids}
  190. results.update({
  191. row["user_id"]: row["stream_id"] for row in rows
  192. })
  193. defer.returnValue(results)
  194. @defer.inlineCallbacks
  195. def mark_remote_user_device_list_as_unsubscribed(self, user_id):
  196. """Mark that we no longer track device lists for remote user.
  197. """
  198. yield self._simple_delete(
  199. table="device_lists_remote_extremeties",
  200. keyvalues={
  201. "user_id": user_id,
  202. },
  203. desc="mark_remote_user_device_list_as_unsubscribed",
  204. )
  205. self.get_device_list_last_stream_id_for_remote.invalidate((user_id,))
  206. def update_remote_device_list_cache_entry(self, user_id, device_id, content,
  207. stream_id):
  208. """Updates a single user's device in the cache.
  209. """
  210. return self.runInteraction(
  211. "update_remote_device_list_cache_entry",
  212. self._update_remote_device_list_cache_entry_txn,
  213. user_id, device_id, content, stream_id,
  214. )
  215. def _update_remote_device_list_cache_entry_txn(self, txn, user_id, device_id,
  216. content, stream_id):
  217. self._simple_upsert_txn(
  218. txn,
  219. table="device_lists_remote_cache",
  220. keyvalues={
  221. "user_id": user_id,
  222. "device_id": device_id,
  223. },
  224. values={
  225. "content": json.dumps(content),
  226. }
  227. )
  228. txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id,))
  229. txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,))
  230. txn.call_after(
  231. self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,)
  232. )
  233. self._simple_upsert_txn(
  234. txn,
  235. table="device_lists_remote_extremeties",
  236. keyvalues={
  237. "user_id": user_id,
  238. },
  239. values={
  240. "stream_id": stream_id,
  241. }
  242. )
  243. def update_remote_device_list_cache(self, user_id, devices, stream_id):
  244. """Replace the cache of the remote user's devices.
  245. """
  246. return self.runInteraction(
  247. "update_remote_device_list_cache",
  248. self._update_remote_device_list_cache_txn,
  249. user_id, devices, stream_id,
  250. )
  251. def _update_remote_device_list_cache_txn(self, txn, user_id, devices,
  252. stream_id):
  253. self._simple_delete_txn(
  254. txn,
  255. table="device_lists_remote_cache",
  256. keyvalues={
  257. "user_id": user_id,
  258. },
  259. )
  260. self._simple_insert_many_txn(
  261. txn,
  262. table="device_lists_remote_cache",
  263. values=[
  264. {
  265. "user_id": user_id,
  266. "device_id": content["device_id"],
  267. "content": json.dumps(content),
  268. }
  269. for content in devices
  270. ]
  271. )
  272. txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,))
  273. txn.call_after(self._get_cached_user_device.invalidate_many, (user_id,))
  274. txn.call_after(
  275. self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,)
  276. )
  277. self._simple_upsert_txn(
  278. txn,
  279. table="device_lists_remote_extremeties",
  280. keyvalues={
  281. "user_id": user_id,
  282. },
  283. values={
  284. "stream_id": stream_id,
  285. }
  286. )
  287. def get_devices_by_remote(self, destination, from_stream_id):
  288. """Get stream of updates to send to remote servers
  289. Returns:
  290. (int, list[dict]): current stream id and list of updates
  291. """
  292. now_stream_id = self._device_list_id_gen.get_current_token()
  293. has_changed = self._device_list_federation_stream_cache.has_entity_changed(
  294. destination, int(from_stream_id)
  295. )
  296. if not has_changed:
  297. return (now_stream_id, [])
  298. return self.runInteraction(
  299. "get_devices_by_remote", self._get_devices_by_remote_txn,
  300. destination, from_stream_id, now_stream_id,
  301. )
  302. def _get_devices_by_remote_txn(self, txn, destination, from_stream_id,
  303. now_stream_id):
  304. sql = """
  305. SELECT user_id, device_id, max(stream_id) FROM device_lists_outbound_pokes
  306. WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ?
  307. GROUP BY user_id, device_id
  308. LIMIT 20
  309. """
  310. txn.execute(
  311. sql, (destination, from_stream_id, now_stream_id, False)
  312. )
  313. # maps (user_id, device_id) -> stream_id
  314. query_map = {(r[0], r[1]): r[2] for r in txn}
  315. if not query_map:
  316. return (now_stream_id, [])
  317. if len(query_map) >= 20:
  318. now_stream_id = max(stream_id for stream_id in query_map.itervalues())
  319. devices = self._get_e2e_device_keys_txn(
  320. txn, query_map.keys(), include_all_devices=True
  321. )
  322. prev_sent_id_sql = """
  323. SELECT coalesce(max(stream_id), 0) as stream_id
  324. FROM device_lists_outbound_last_success
  325. WHERE destination = ? AND user_id = ? AND stream_id <= ?
  326. """
  327. results = []
  328. for user_id, user_devices in devices.iteritems():
  329. # The prev_id for the first row is always the last row before
  330. # `from_stream_id`
  331. txn.execute(prev_sent_id_sql, (destination, user_id, from_stream_id))
  332. rows = txn.fetchall()
  333. prev_id = rows[0][0]
  334. for device_id, device in user_devices.iteritems():
  335. stream_id = query_map[(user_id, device_id)]
  336. result = {
  337. "user_id": user_id,
  338. "device_id": device_id,
  339. "prev_id": [prev_id] if prev_id else [],
  340. "stream_id": stream_id,
  341. }
  342. prev_id = stream_id
  343. key_json = device.get("key_json", None)
  344. if key_json:
  345. result["keys"] = json.loads(key_json)
  346. device_display_name = device.get("device_display_name", None)
  347. if device_display_name:
  348. result["device_display_name"] = device_display_name
  349. results.append(result)
  350. return (now_stream_id, results)
  351. @defer.inlineCallbacks
  352. def get_user_devices_from_cache(self, query_list):
  353. """Get the devices (and keys if any) for remote users from the cache.
  354. Args:
  355. query_list(list): List of (user_id, device_ids), if device_ids is
  356. falsey then return all device ids for that user.
  357. Returns:
  358. (user_ids_not_in_cache, results_map), where user_ids_not_in_cache is
  359. a set of user_ids and results_map is a mapping of
  360. user_id -> device_id -> device_info
  361. """
  362. user_ids = set(user_id for user_id, _ in query_list)
  363. user_map = yield self.get_device_list_last_stream_id_for_remotes(list(user_ids))
  364. user_ids_in_cache = set(
  365. user_id for user_id, stream_id in user_map.items() if stream_id
  366. )
  367. user_ids_not_in_cache = user_ids - user_ids_in_cache
  368. results = {}
  369. for user_id, device_id in query_list:
  370. if user_id not in user_ids_in_cache:
  371. continue
  372. if device_id:
  373. device = yield self._get_cached_user_device(user_id, device_id)
  374. results.setdefault(user_id, {})[device_id] = device
  375. else:
  376. results[user_id] = yield self._get_cached_devices_for_user(user_id)
  377. defer.returnValue((user_ids_not_in_cache, results))
  378. @cachedInlineCallbacks(num_args=2, tree=True)
  379. def _get_cached_user_device(self, user_id, device_id):
  380. content = yield self._simple_select_one_onecol(
  381. table="device_lists_remote_cache",
  382. keyvalues={
  383. "user_id": user_id,
  384. "device_id": device_id,
  385. },
  386. retcol="content",
  387. desc="_get_cached_user_device",
  388. )
  389. defer.returnValue(json.loads(content))
  390. @cachedInlineCallbacks()
  391. def _get_cached_devices_for_user(self, user_id):
  392. devices = yield self._simple_select_list(
  393. table="device_lists_remote_cache",
  394. keyvalues={
  395. "user_id": user_id,
  396. },
  397. retcols=("device_id", "content"),
  398. desc="_get_cached_devices_for_user",
  399. )
  400. defer.returnValue({
  401. device["device_id"]: json.loads(device["content"])
  402. for device in devices
  403. })
  404. def get_devices_with_keys_by_user(self, user_id):
  405. """Get all devices (with any device keys) for a user
  406. Returns:
  407. (stream_id, devices)
  408. """
  409. return self.runInteraction(
  410. "get_devices_with_keys_by_user",
  411. self._get_devices_with_keys_by_user_txn, user_id,
  412. )
  413. def _get_devices_with_keys_by_user_txn(self, txn, user_id):
  414. now_stream_id = self._device_list_id_gen.get_current_token()
  415. devices = self._get_e2e_device_keys_txn(
  416. txn, [(user_id, None)], include_all_devices=True
  417. )
  418. if devices:
  419. user_devices = devices[user_id]
  420. results = []
  421. for device_id, device in user_devices.iteritems():
  422. result = {
  423. "device_id": device_id,
  424. }
  425. key_json = device.get("key_json", None)
  426. if key_json:
  427. result["keys"] = json.loads(key_json)
  428. device_display_name = device.get("device_display_name", None)
  429. if device_display_name:
  430. result["device_display_name"] = device_display_name
  431. results.append(result)
  432. return now_stream_id, results
  433. return now_stream_id, []
  434. def mark_as_sent_devices_by_remote(self, destination, stream_id):
  435. """Mark that updates have successfully been sent to the destination.
  436. """
  437. return self.runInteraction(
  438. "mark_as_sent_devices_by_remote", self._mark_as_sent_devices_by_remote_txn,
  439. destination, stream_id,
  440. )
  441. def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id):
  442. # We update the device_lists_outbound_last_success with the successfully
  443. # poked users. We do the join to see which users need to be inserted and
  444. # which updated.
  445. sql = """
  446. SELECT user_id, coalesce(max(o.stream_id), 0), (max(s.stream_id) IS NOT NULL)
  447. FROM device_lists_outbound_pokes as o
  448. LEFT JOIN device_lists_outbound_last_success as s
  449. USING (destination, user_id)
  450. WHERE destination = ? AND o.stream_id <= ?
  451. GROUP BY user_id
  452. """
  453. txn.execute(sql, (destination, stream_id,))
  454. rows = txn.fetchall()
  455. sql = """
  456. UPDATE device_lists_outbound_last_success
  457. SET stream_id = ?
  458. WHERE destination = ? AND user_id = ?
  459. """
  460. txn.executemany(
  461. sql, ((row[1], destination, row[0],) for row in rows if row[2])
  462. )
  463. sql = """
  464. INSERT INTO device_lists_outbound_last_success
  465. (destination, user_id, stream_id) VALUES (?, ?, ?)
  466. """
  467. txn.executemany(
  468. sql, ((destination, row[0], row[1],) for row in rows if not row[2])
  469. )
  470. # Delete all sent outbound pokes
  471. sql = """
  472. DELETE FROM device_lists_outbound_pokes
  473. WHERE destination = ? AND stream_id <= ?
  474. """
  475. txn.execute(sql, (destination, stream_id,))
  476. @defer.inlineCallbacks
  477. def get_user_whose_devices_changed(self, from_key):
  478. """Get set of users whose devices have changed since `from_key`.
  479. """
  480. from_key = int(from_key)
  481. changed = self._device_list_stream_cache.get_all_entities_changed(from_key)
  482. if changed is not None:
  483. defer.returnValue(set(changed))
  484. sql = """
  485. SELECT DISTINCT user_id FROM device_lists_stream WHERE stream_id > ?
  486. """
  487. rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key)
  488. defer.returnValue(set(row[0] for row in rows))
  489. def get_all_device_list_changes_for_remotes(self, from_key, to_key):
  490. """Return a list of `(stream_id, user_id, destination)` which is the
  491. combined list of changes to devices, and which destinations need to be
  492. poked. `destination` may be None if no destinations need to be poked.
  493. """
  494. sql = """
  495. SELECT stream_id, user_id, destination FROM device_lists_stream
  496. LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id)
  497. WHERE ? < stream_id AND stream_id <= ?
  498. """
  499. return self._execute(
  500. "get_all_device_list_changes_for_remotes", None,
  501. sql, from_key, to_key
  502. )
  503. @defer.inlineCallbacks
  504. def add_device_change_to_streams(self, user_id, device_ids, hosts):
  505. """Persist that a user's devices have been updated, and which hosts
  506. (if any) should be poked.
  507. """
  508. with self._device_list_id_gen.get_next() as stream_id:
  509. yield self.runInteraction(
  510. "add_device_change_to_streams", self._add_device_change_txn,
  511. user_id, device_ids, hosts, stream_id,
  512. )
  513. defer.returnValue(stream_id)
  514. def _add_device_change_txn(self, txn, user_id, device_ids, hosts, stream_id):
  515. now = self._clock.time_msec()
  516. txn.call_after(
  517. self._device_list_stream_cache.entity_has_changed,
  518. user_id, stream_id,
  519. )
  520. for host in hosts:
  521. txn.call_after(
  522. self._device_list_federation_stream_cache.entity_has_changed,
  523. host, stream_id,
  524. )
  525. # Delete older entries in the table, as we really only care about
  526. # when the latest change happened.
  527. txn.executemany(
  528. """
  529. DELETE FROM device_lists_stream
  530. WHERE user_id = ? AND device_id = ? AND stream_id < ?
  531. """,
  532. [(user_id, device_id, stream_id) for device_id in device_ids]
  533. )
  534. self._simple_insert_many_txn(
  535. txn,
  536. table="device_lists_stream",
  537. values=[
  538. {
  539. "stream_id": stream_id,
  540. "user_id": user_id,
  541. "device_id": device_id,
  542. }
  543. for device_id in device_ids
  544. ]
  545. )
  546. self._simple_insert_many_txn(
  547. txn,
  548. table="device_lists_outbound_pokes",
  549. values=[
  550. {
  551. "destination": destination,
  552. "stream_id": stream_id,
  553. "user_id": user_id,
  554. "device_id": device_id,
  555. "sent": False,
  556. "ts": now,
  557. }
  558. for destination in hosts
  559. for device_id in device_ids
  560. ]
  561. )
  562. def get_device_stream_token(self):
  563. return self._device_list_id_gen.get_current_token()
  564. def _prune_old_outbound_device_pokes(self):
  565. """Delete old entries out of the device_lists_outbound_pokes to ensure
  566. that we don't fill up due to dead servers. We keep one entry per
  567. (destination, user_id) tuple to ensure that the prev_ids remain correct
  568. if the server does come back.
  569. """
  570. yesterday = self._clock.time_msec() - 24 * 60 * 60 * 1000
  571. def _prune_txn(txn):
  572. select_sql = """
  573. SELECT destination, user_id, max(stream_id) as stream_id
  574. FROM device_lists_outbound_pokes
  575. GROUP BY destination, user_id
  576. HAVING min(ts) < ? AND count(*) > 1
  577. """
  578. txn.execute(select_sql, (yesterday,))
  579. rows = txn.fetchall()
  580. if not rows:
  581. return
  582. delete_sql = """
  583. DELETE FROM device_lists_outbound_pokes
  584. WHERE ts < ? AND destination = ? AND user_id = ? AND stream_id < ?
  585. """
  586. txn.executemany(
  587. delete_sql,
  588. (
  589. (yesterday, row[0], row[1], row[2])
  590. for row in rows
  591. )
  592. )
  593. # Since we've deleted unsent deltas, we need to remove the entry
  594. # of last successful sent so that the prev_ids are correctly set.
  595. sql = """
  596. DELETE FROM device_lists_outbound_last_success
  597. WHERE destination = ? AND user_id = ?
  598. """
  599. txn.executemany(sql, ((row[0], row[1]) for row in rows))
  600. logger.info("Pruned %d device list outbound pokes", txn.rowcount)
  601. return self.runInteraction(
  602. "_prune_old_outbound_device_pokes", _prune_txn
  603. )