device.py 56 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465
  1. # Copyright 2016 OpenMarket Ltd
  2. # Copyright 2019 New Vector Ltd
  3. # Copyright 2019,2020 The Matrix.org Foundation C.I.C.
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import logging
  17. from http import HTTPStatus
  18. from typing import (
  19. TYPE_CHECKING,
  20. Any,
  21. Dict,
  22. Iterable,
  23. List,
  24. Mapping,
  25. Optional,
  26. Set,
  27. Tuple,
  28. )
  29. from synapse.api import errors
  30. from synapse.api.constants import EduTypes, EventTypes
  31. from synapse.api.errors import (
  32. Codes,
  33. FederationDeniedError,
  34. HttpResponseException,
  35. InvalidAPICallError,
  36. RequestSendFailed,
  37. SynapseError,
  38. )
  39. from synapse.logging.opentracing import log_kv, set_tag, trace
  40. from synapse.metrics.background_process_metrics import (
  41. run_as_background_process,
  42. wrap_as_background_process,
  43. )
  44. from synapse.types import (
  45. JsonDict,
  46. StrCollection,
  47. StreamKeyType,
  48. StreamToken,
  49. UserID,
  50. get_domain_from_id,
  51. get_verify_key_from_cross_signing_key,
  52. )
  53. from synapse.util import stringutils
  54. from synapse.util.async_helpers import Linearizer
  55. from synapse.util.caches.expiringcache import ExpiringCache
  56. from synapse.util.cancellation import cancellable
  57. from synapse.util.metrics import measure_func
  58. from synapse.util.retryutils import NotRetryingDestination
  59. if TYPE_CHECKING:
  60. from synapse.server import HomeServer
  61. logger = logging.getLogger(__name__)
  62. MAX_DEVICE_DISPLAY_NAME_LEN = 100
  63. DELETE_STALE_DEVICES_INTERVAL_MS = 24 * 60 * 60 * 1000
  64. class DeviceWorkerHandler:
  65. device_list_updater: "DeviceListWorkerUpdater"
  66. def __init__(self, hs: "HomeServer"):
  67. self.clock = hs.get_clock()
  68. self.hs = hs
  69. self.store = hs.get_datastores().main
  70. self.notifier = hs.get_notifier()
  71. self.state = hs.get_state_handler()
  72. self._state_storage = hs.get_storage_controllers().state
  73. self._auth_handler = hs.get_auth_handler()
  74. self.server_name = hs.hostname
  75. self._msc3852_enabled = hs.config.experimental.msc3852_enabled
  76. self.device_list_updater = DeviceListWorkerUpdater(hs)
  77. @trace
  78. async def get_devices_by_user(self, user_id: str) -> List[JsonDict]:
  79. """
  80. Retrieve the given user's devices
  81. Args:
  82. user_id: The user ID to query for devices.
  83. Returns:
  84. info on each device
  85. """
  86. set_tag("user_id", user_id)
  87. device_map = await self.store.get_devices_by_user(user_id)
  88. ips = await self.store.get_last_client_ip_by_device(user_id, device_id=None)
  89. devices = list(device_map.values())
  90. for device in devices:
  91. _update_device_from_client_ips(device, ips)
  92. log_kv(device_map)
  93. return devices
  94. async def get_dehydrated_device(
  95. self, user_id: str
  96. ) -> Optional[Tuple[str, JsonDict]]:
  97. """Retrieve the information for a dehydrated device.
  98. Args:
  99. user_id: the user whose dehydrated device we are looking for
  100. Returns:
  101. a tuple whose first item is the device ID, and the second item is
  102. the dehydrated device information
  103. """
  104. return await self.store.get_dehydrated_device(user_id)
  105. @trace
  106. async def get_device(self, user_id: str, device_id: str) -> JsonDict:
  107. """Retrieve the given device
  108. Args:
  109. user_id: The user to get the device from
  110. device_id: The device to fetch.
  111. Returns:
  112. info on the device
  113. Raises:
  114. errors.NotFoundError: if the device was not found
  115. """
  116. device = await self.store.get_device(user_id, device_id)
  117. if device is None:
  118. raise errors.NotFoundError()
  119. ips = await self.store.get_last_client_ip_by_device(user_id, device_id)
  120. _update_device_from_client_ips(device, ips)
  121. set_tag("device", str(device))
  122. set_tag("ips", str(ips))
  123. return device
  124. @cancellable
  125. async def get_device_changes_in_shared_rooms(
  126. self, user_id: str, room_ids: StrCollection, from_token: StreamToken
  127. ) -> Set[str]:
  128. """Get the set of users whose devices have changed who share a room with
  129. the given user.
  130. """
  131. changed_users = await self.store.get_device_list_changes_in_rooms(
  132. room_ids, from_token.device_list_key
  133. )
  134. if changed_users is not None:
  135. # We also check if the given user has changed their device. If
  136. # they're in no rooms then the above query won't include them.
  137. changed = await self.store.get_users_whose_devices_changed(
  138. from_token.device_list_key, [user_id]
  139. )
  140. changed_users.update(changed)
  141. return changed_users
  142. # If the DB returned None then the `from_token` is too old, so we fall
  143. # back on looking for device updates for all users.
  144. users_who_share_room = await self.store.get_users_who_share_room_with_user(
  145. user_id
  146. )
  147. tracked_users = set(users_who_share_room)
  148. # Always tell the user about their own devices
  149. tracked_users.add(user_id)
  150. changed = await self.store.get_users_whose_devices_changed(
  151. from_token.device_list_key, tracked_users
  152. )
  153. return changed
  154. @trace
  155. @measure_func("device.get_user_ids_changed")
  156. @cancellable
  157. async def get_user_ids_changed(
  158. self, user_id: str, from_token: StreamToken
  159. ) -> JsonDict:
  160. """Get list of users that have had the devices updated, or have newly
  161. joined a room, that `user_id` may be interested in.
  162. """
  163. set_tag("user_id", user_id)
  164. set_tag("from_token", str(from_token))
  165. now_room_key = self.store.get_room_max_token()
  166. room_ids = await self.store.get_rooms_for_user(user_id)
  167. changed = await self.get_device_changes_in_shared_rooms(
  168. user_id, room_ids, from_token
  169. )
  170. # Then work out if any users have since joined
  171. rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
  172. member_events = await self.store.get_membership_changes_for_user(
  173. user_id, from_token.room_key, now_room_key
  174. )
  175. rooms_changed.update(event.room_id for event in member_events)
  176. stream_ordering = from_token.room_key.stream
  177. possibly_changed = set(changed)
  178. possibly_left = set()
  179. for room_id in rooms_changed:
  180. current_state_ids = await self._state_storage.get_current_state_ids(
  181. room_id, await_full_state=False
  182. )
  183. # The user may have left the room
  184. # TODO: Check if they actually did or if we were just invited.
  185. if room_id not in room_ids:
  186. for etype, state_key in current_state_ids.keys():
  187. if etype != EventTypes.Member:
  188. continue
  189. possibly_left.add(state_key)
  190. continue
  191. # Fetch the current state at the time.
  192. try:
  193. event_ids = await self.store.get_forward_extremities_for_room_at_stream_ordering(
  194. room_id, stream_ordering=stream_ordering
  195. )
  196. except errors.StoreError:
  197. # we have purged the stream_ordering index since the stream
  198. # ordering: treat it the same as a new room
  199. event_ids = []
  200. # special-case for an empty prev state: include all members
  201. # in the changed list
  202. if not event_ids:
  203. log_kv(
  204. {"event": "encountered empty previous state", "room_id": room_id}
  205. )
  206. for etype, state_key in current_state_ids.keys():
  207. if etype != EventTypes.Member:
  208. continue
  209. possibly_changed.add(state_key)
  210. continue
  211. current_member_id = current_state_ids.get((EventTypes.Member, user_id))
  212. if not current_member_id:
  213. continue
  214. # mapping from event_id -> state_dict
  215. prev_state_ids = await self._state_storage.get_state_ids_for_events(
  216. event_ids,
  217. await_full_state=False,
  218. )
  219. # Check if we've joined the room? If so we just blindly add all the users to
  220. # the "possibly changed" users.
  221. for state_dict in prev_state_ids.values():
  222. member_event = state_dict.get((EventTypes.Member, user_id), None)
  223. if not member_event or member_event != current_member_id:
  224. for etype, state_key in current_state_ids.keys():
  225. if etype != EventTypes.Member:
  226. continue
  227. possibly_changed.add(state_key)
  228. break
  229. # If there has been any change in membership, include them in the
  230. # possibly changed list. We'll check if they are joined below,
  231. # and we're not toooo worried about spuriously adding users.
  232. for key, event_id in current_state_ids.items():
  233. etype, state_key = key
  234. if etype != EventTypes.Member:
  235. continue
  236. # check if this member has changed since any of the extremities
  237. # at the stream_ordering, and add them to the list if so.
  238. for state_dict in prev_state_ids.values():
  239. prev_event_id = state_dict.get(key, None)
  240. if not prev_event_id or prev_event_id != event_id:
  241. if state_key != user_id:
  242. possibly_changed.add(state_key)
  243. break
  244. if possibly_changed or possibly_left:
  245. possibly_joined = possibly_changed
  246. possibly_left = possibly_changed | possibly_left
  247. # Double check if we still share rooms with the given user.
  248. users_rooms = await self.store.get_rooms_for_users(possibly_left)
  249. for changed_user_id, entries in users_rooms.items():
  250. if any(rid in room_ids for rid in entries):
  251. possibly_left.discard(changed_user_id)
  252. else:
  253. possibly_joined.discard(changed_user_id)
  254. else:
  255. possibly_joined = set()
  256. possibly_left = set()
  257. result = {"changed": list(possibly_joined), "left": list(possibly_left)}
  258. log_kv(result)
  259. return result
  260. async def on_federation_query_user_devices(self, user_id: str) -> JsonDict:
  261. stream_id, devices = await self.store.get_e2e_device_keys_for_federation_query(
  262. user_id
  263. )
  264. master_key = await self.store.get_e2e_cross_signing_key(user_id, "master")
  265. self_signing_key = await self.store.get_e2e_cross_signing_key(
  266. user_id, "self_signing"
  267. )
  268. return {
  269. "user_id": user_id,
  270. "stream_id": stream_id,
  271. "devices": devices,
  272. "master_key": master_key,
  273. "self_signing_key": self_signing_key,
  274. }
  275. async def handle_room_un_partial_stated(self, room_id: str) -> None:
  276. """Handles sending appropriate device list updates in a room that has
  277. gone from partial to full state.
  278. """
  279. # TODO(faster_joins): worker mode support
  280. # https://github.com/matrix-org/synapse/issues/12994
  281. logger.error(
  282. "Trying handling device list state for partial join: not supported on workers."
  283. )
  284. class DeviceHandler(DeviceWorkerHandler):
  285. device_list_updater: "DeviceListUpdater"
  286. def __init__(self, hs: "HomeServer"):
  287. super().__init__(hs)
  288. self.federation_sender = hs.get_federation_sender()
  289. self._account_data_handler = hs.get_account_data_handler()
  290. self._storage_controllers = hs.get_storage_controllers()
  291. self.device_list_updater = DeviceListUpdater(hs, self)
  292. federation_registry = hs.get_federation_registry()
  293. federation_registry.register_edu_handler(
  294. EduTypes.DEVICE_LIST_UPDATE,
  295. self.device_list_updater.incoming_device_list_update,
  296. )
  297. # Whether `_handle_new_device_update_async` is currently processing.
  298. self._handle_new_device_update_is_processing = False
  299. # If a new device update may have happened while the loop was
  300. # processing.
  301. self._handle_new_device_update_new_data = False
  302. # On start up check if there are any updates pending.
  303. hs.get_reactor().callWhenRunning(self._handle_new_device_update_async)
  304. self._delete_stale_devices_after = hs.config.server.delete_stale_devices_after
  305. # Ideally we would run this on a worker and condition this on the
  306. # "run_background_tasks_on" setting, but this would mean making the notification
  307. # of device list changes over federation work on workers, which is nontrivial.
  308. if self._delete_stale_devices_after is not None:
  309. self.clock.looping_call(
  310. run_as_background_process,
  311. DELETE_STALE_DEVICES_INTERVAL_MS,
  312. "delete_stale_devices",
  313. self._delete_stale_devices,
  314. )
  315. def _check_device_name_length(self, name: Optional[str]) -> None:
  316. """
  317. Checks whether a device name is longer than the maximum allowed length.
  318. Args:
  319. name: The name of the device.
  320. Raises:
  321. SynapseError: if the device name is too long.
  322. """
  323. if name and len(name) > MAX_DEVICE_DISPLAY_NAME_LEN:
  324. raise SynapseError(
  325. 400,
  326. "Device display name is too long (max %i)"
  327. % (MAX_DEVICE_DISPLAY_NAME_LEN,),
  328. errcode=Codes.TOO_LARGE,
  329. )
  330. async def check_device_registered(
  331. self,
  332. user_id: str,
  333. device_id: Optional[str],
  334. initial_device_display_name: Optional[str] = None,
  335. auth_provider_id: Optional[str] = None,
  336. auth_provider_session_id: Optional[str] = None,
  337. ) -> str:
  338. """
  339. If the given device has not been registered, register it with the
  340. supplied display name.
  341. If no device_id is supplied, we make one up.
  342. Args:
  343. user_id: @user:id
  344. device_id: device id supplied by client
  345. initial_device_display_name: device display name from client
  346. auth_provider_id: The SSO IdP the user used, if any.
  347. auth_provider_session_id: The session ID (sid) got from the SSO IdP.
  348. Returns:
  349. device id (generated if none was supplied)
  350. """
  351. self._check_device_name_length(initial_device_display_name)
  352. if device_id is not None:
  353. new_device = await self.store.store_device(
  354. user_id=user_id,
  355. device_id=device_id,
  356. initial_device_display_name=initial_device_display_name,
  357. auth_provider_id=auth_provider_id,
  358. auth_provider_session_id=auth_provider_session_id,
  359. )
  360. if new_device:
  361. await self.notify_device_update(user_id, [device_id])
  362. return device_id
  363. # if the device id is not specified, we'll autogen one, but loop a few
  364. # times in case of a clash.
  365. attempts = 0
  366. while attempts < 5:
  367. new_device_id = stringutils.random_string(10).upper()
  368. new_device = await self.store.store_device(
  369. user_id=user_id,
  370. device_id=new_device_id,
  371. initial_device_display_name=initial_device_display_name,
  372. auth_provider_id=auth_provider_id,
  373. auth_provider_session_id=auth_provider_session_id,
  374. )
  375. if new_device:
  376. await self.notify_device_update(user_id, [new_device_id])
  377. return new_device_id
  378. attempts += 1
  379. raise errors.StoreError(500, "Couldn't generate a device ID.")
  380. async def _delete_stale_devices(self) -> None:
  381. """Background task that deletes devices which haven't been accessed for more than
  382. a configured time period.
  383. """
  384. # We should only be running this job if the config option is defined.
  385. assert self._delete_stale_devices_after is not None
  386. now_ms = self.clock.time_msec()
  387. since_ms = now_ms - self._delete_stale_devices_after
  388. devices = await self.store.get_local_devices_not_accessed_since(since_ms)
  389. for user_id, user_devices in devices.items():
  390. await self.delete_devices(user_id, user_devices)
  391. @trace
  392. async def delete_all_devices_for_user(
  393. self, user_id: str, except_device_id: Optional[str] = None
  394. ) -> None:
  395. """Delete all of the user's devices
  396. Args:
  397. user_id: The user to remove all devices from
  398. except_device_id: optional device id which should not be deleted
  399. """
  400. device_map = await self.store.get_devices_by_user(user_id)
  401. device_ids = list(device_map)
  402. if except_device_id is not None:
  403. device_ids = [d for d in device_ids if d != except_device_id]
  404. await self.delete_devices(user_id, device_ids)
  405. async def delete_devices(self, user_id: str, device_ids: List[str]) -> None:
  406. """Delete several devices
  407. Args:
  408. user_id: The user to delete devices from.
  409. device_ids: The list of device IDs to delete
  410. """
  411. try:
  412. await self.store.delete_devices(user_id, device_ids)
  413. except errors.StoreError as e:
  414. if e.code == 404:
  415. # no match
  416. set_tag("error", True)
  417. set_tag("reason", "User doesn't have that device id.")
  418. else:
  419. raise
  420. await self.hs.get_pusherpool().remove_pushers_by_devices(user_id, device_ids)
  421. # Delete data specific to each device. Not optimised as it is not
  422. # considered as part of a critical path.
  423. for device_id in device_ids:
  424. await self._auth_handler.delete_access_tokens_for_user(
  425. user_id, device_id=device_id
  426. )
  427. await self.store.delete_e2e_keys_by_device(
  428. user_id=user_id, device_id=device_id
  429. )
  430. if self.hs.config.experimental.msc3890_enabled:
  431. # Remove any local notification settings for this device in accordance
  432. # with MSC3890.
  433. await self._account_data_handler.remove_account_data_for_user(
  434. user_id,
  435. f"org.matrix.msc3890.local_notification_settings.{device_id}",
  436. )
  437. await self.notify_device_update(user_id, device_ids)
  438. async def update_device(self, user_id: str, device_id: str, content: dict) -> None:
  439. """Update the given device
  440. Args:
  441. user_id: The user to update devices of.
  442. device_id: The device to update.
  443. content: body of update request
  444. """
  445. # Reject a new displayname which is too long.
  446. new_display_name = content.get("display_name")
  447. self._check_device_name_length(new_display_name)
  448. try:
  449. await self.store.update_device(
  450. user_id, device_id, new_display_name=new_display_name
  451. )
  452. await self.notify_device_update(user_id, [device_id])
  453. except errors.StoreError as e:
  454. if e.code == 404:
  455. raise errors.NotFoundError()
  456. else:
  457. raise
  458. @trace
  459. @measure_func("notify_device_update")
  460. async def notify_device_update(
  461. self, user_id: str, device_ids: StrCollection
  462. ) -> None:
  463. """Notify that a user's device(s) has changed. Pokes the notifier, and
  464. remote servers if the user is local.
  465. Args:
  466. user_id: The Matrix ID of the user who's device list has been updated.
  467. device_ids: The device IDs that have changed.
  468. """
  469. if not device_ids:
  470. # No changes to notify about, so this is a no-op.
  471. return
  472. room_ids = await self.store.get_rooms_for_user(user_id)
  473. position = await self.store.add_device_change_to_streams(
  474. user_id,
  475. device_ids,
  476. room_ids=room_ids,
  477. )
  478. if not position:
  479. # This should only happen if there are no updates, so we bail.
  480. return
  481. for device_id in device_ids:
  482. logger.debug(
  483. "Notifying about update %r/%r, ID: %r", user_id, device_id, position
  484. )
  485. # specify the user ID too since the user should always get their own device list
  486. # updates, even if they aren't in any rooms.
  487. self.notifier.on_new_event(
  488. StreamKeyType.DEVICE_LIST, position, users={user_id}, rooms=room_ids
  489. )
  490. # We may need to do some processing asynchronously for local user IDs.
  491. if self.hs.is_mine_id(user_id):
  492. self._handle_new_device_update_async()
  493. async def notify_user_signature_update(
  494. self, from_user_id: str, user_ids: List[str]
  495. ) -> None:
  496. """Notify a user that they have made new signatures of other users.
  497. Args:
  498. from_user_id: the user who made the signature
  499. user_ids: the users IDs that have new signatures
  500. """
  501. position = await self.store.add_user_signature_change_to_streams(
  502. from_user_id, user_ids
  503. )
  504. self.notifier.on_new_event(
  505. StreamKeyType.DEVICE_LIST, position, users=[from_user_id]
  506. )
  507. async def store_dehydrated_device(
  508. self,
  509. user_id: str,
  510. device_data: JsonDict,
  511. initial_device_display_name: Optional[str] = None,
  512. ) -> str:
  513. """Store a dehydrated device for a user. If the user had a previous
  514. dehydrated device, it is removed.
  515. Args:
  516. user_id: the user that we are storing the device for
  517. device_data: the dehydrated device information
  518. initial_device_display_name: The display name to use for the device
  519. Returns:
  520. device id of the dehydrated device
  521. """
  522. device_id = await self.check_device_registered(
  523. user_id,
  524. None,
  525. initial_device_display_name,
  526. )
  527. old_device_id = await self.store.store_dehydrated_device(
  528. user_id, device_id, device_data
  529. )
  530. if old_device_id is not None:
  531. await self.delete_devices(user_id, [old_device_id])
  532. return device_id
  533. async def rehydrate_device(
  534. self, user_id: str, access_token: str, device_id: str
  535. ) -> dict:
  536. """Process a rehydration request from the user.
  537. Args:
  538. user_id: the user who is rehydrating the device
  539. access_token: the access token used for the request
  540. device_id: the ID of the device that will be rehydrated
  541. Returns:
  542. a dict containing {"success": True}
  543. """
  544. success = await self.store.remove_dehydrated_device(user_id, device_id)
  545. if not success:
  546. raise errors.NotFoundError()
  547. # If the dehydrated device was successfully deleted (the device ID
  548. # matched the stored dehydrated device), then modify the access
  549. # token to use the dehydrated device's ID and copy the old device
  550. # display name to the dehydrated device, and destroy the old device
  551. # ID
  552. old_device_id = await self.store.set_device_for_access_token(
  553. access_token, device_id
  554. )
  555. old_device = await self.store.get_device(user_id, old_device_id)
  556. if old_device is None:
  557. raise errors.NotFoundError()
  558. await self.store.update_device(user_id, device_id, old_device["display_name"])
  559. # can't call self.delete_device because that will clobber the
  560. # access token so call the storage layer directly
  561. await self.store.delete_devices(user_id, [old_device_id])
  562. await self.store.delete_e2e_keys_by_device(
  563. user_id=user_id, device_id=old_device_id
  564. )
  565. # tell everyone that the old device is gone and that the dehydrated
  566. # device has a new display name
  567. await self.notify_device_update(user_id, [old_device_id, device_id])
  568. return {"success": True}
  569. @wrap_as_background_process("_handle_new_device_update_async")
  570. async def _handle_new_device_update_async(self) -> None:
  571. """Called when we have a new local device list update that we need to
  572. send out over federation.
  573. This happens in the background so as not to block the original request
  574. that generated the device update.
  575. """
  576. if self._handle_new_device_update_is_processing:
  577. self._handle_new_device_update_new_data = True
  578. return
  579. self._handle_new_device_update_is_processing = True
  580. # The stream ID we processed previous iteration (if any), and the set of
  581. # hosts we've already poked about for this update. This is so that we
  582. # don't poke the same remote server about the same update repeatedly.
  583. current_stream_id = None
  584. hosts_already_sent_to: Set[str] = set()
  585. try:
  586. stream_id, room_id = await self.store.get_device_change_last_converted_pos()
  587. while True:
  588. self._handle_new_device_update_new_data = False
  589. max_stream_id = self.store.get_device_stream_token()
  590. rows = await self.store.get_uncoverted_outbound_room_pokes(
  591. stream_id, room_id
  592. )
  593. if not rows:
  594. # If the DB returned nothing then there is nothing left to
  595. # do, *unless* a new device list update happened during the
  596. # DB query.
  597. # Advance `(stream_id, room_id)`.
  598. # `max_stream_id` comes from *before* the query for unconverted
  599. # rows, which means that any unconverted rows must have a larger
  600. # stream ID.
  601. if max_stream_id > stream_id:
  602. stream_id, room_id = max_stream_id, ""
  603. await self.store.set_device_change_last_converted_pos(
  604. stream_id, room_id
  605. )
  606. else:
  607. assert max_stream_id == stream_id
  608. # Avoid moving `room_id` backwards.
  609. pass
  610. if self._handle_new_device_update_new_data:
  611. continue
  612. else:
  613. return
  614. for user_id, device_id, room_id, stream_id, opentracing_context in rows:
  615. hosts = set()
  616. # Ignore any users that aren't ours
  617. if self.hs.is_mine_id(user_id):
  618. hosts = set(
  619. await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation(
  620. room_id
  621. )
  622. )
  623. hosts.discard(self.server_name)
  624. # For rooms with partial state, `hosts` is merely an
  625. # approximation. When we transition to a full state room, we
  626. # will have to send out device list updates to any servers we
  627. # missed.
  628. # Check if we've already sent this update to some hosts
  629. if current_stream_id == stream_id:
  630. hosts -= hosts_already_sent_to
  631. await self.store.add_device_list_outbound_pokes(
  632. user_id=user_id,
  633. device_id=device_id,
  634. room_id=room_id,
  635. hosts=hosts,
  636. context=opentracing_context,
  637. )
  638. # Notify replication that we've updated the device list stream.
  639. self.notifier.notify_replication()
  640. if hosts:
  641. logger.info(
  642. "Sending device list update notif for %r to: %r",
  643. user_id,
  644. hosts,
  645. )
  646. for host in hosts:
  647. self.federation_sender.send_device_messages(
  648. host, immediate=False
  649. )
  650. # TODO: when called, this isn't in a logging context.
  651. # This leads to log spam, sentry event spam, and massive
  652. # memory usage.
  653. # See https://github.com/matrix-org/synapse/issues/12552.
  654. # log_kv(
  655. # {"message": "sent device update to host", "host": host}
  656. # )
  657. if current_stream_id != stream_id:
  658. # Clear the set of hosts we've already sent to as we're
  659. # processing a new update.
  660. hosts_already_sent_to.clear()
  661. hosts_already_sent_to.update(hosts)
  662. current_stream_id = stream_id
  663. # Advance `(stream_id, room_id)`.
  664. _, _, room_id, stream_id, _ = rows[-1]
  665. await self.store.set_device_change_last_converted_pos(
  666. stream_id, room_id
  667. )
  668. finally:
  669. self._handle_new_device_update_is_processing = False
  670. async def handle_room_un_partial_stated(self, room_id: str) -> None:
  671. """Handles sending appropriate device list updates in a room that has
  672. gone from partial to full state.
  673. """
  674. # We defer to the device list updater to handle pending remote device
  675. # list updates.
  676. await self.device_list_updater.handle_room_un_partial_stated(room_id)
  677. # Replay local updates.
  678. (
  679. join_event_id,
  680. device_lists_stream_id,
  681. ) = await self.store.get_join_event_id_and_device_lists_stream_id_for_partial_state(
  682. room_id
  683. )
  684. # Get the local device list changes that have happened in the room since
  685. # we started joining. If there are no updates there's nothing left to do.
  686. changes = await self.store.get_device_list_changes_in_room(
  687. room_id, device_lists_stream_id
  688. )
  689. local_changes = {(u, d) for u, d in changes if self.hs.is_mine_id(u)}
  690. if not local_changes:
  691. return
  692. # Note: We have persisted the full state at this point, we just haven't
  693. # cleared the `partial_room` flag.
  694. join_state_ids = await self._state_storage.get_state_ids_for_event(
  695. join_event_id, await_full_state=False
  696. )
  697. current_state_ids = await self.store.get_partial_current_state_ids(room_id)
  698. # Now we need to work out all servers that might have been in the room
  699. # at any point during our join.
  700. # First we look for any membership states that have changed between the
  701. # initial join and now...
  702. all_keys = set(join_state_ids)
  703. all_keys.update(current_state_ids)
  704. potentially_changed_hosts = set()
  705. for etype, state_key in all_keys:
  706. if etype != EventTypes.Member:
  707. continue
  708. prev = join_state_ids.get((etype, state_key))
  709. current = current_state_ids.get((etype, state_key))
  710. if prev != current:
  711. potentially_changed_hosts.add(get_domain_from_id(state_key))
  712. # ... then we add all the hosts that are currently joined to the room...
  713. current_hosts_in_room = await self.store.get_current_hosts_in_room(room_id)
  714. potentially_changed_hosts.update(current_hosts_in_room)
  715. # ... and finally we remove any hosts that we were told about, as we
  716. # will have sent device list updates to those hosts when they happened.
  717. known_hosts_at_join = await self.store.get_partial_state_servers_at_join(
  718. room_id
  719. )
  720. assert known_hosts_at_join is not None
  721. potentially_changed_hosts.difference_update(known_hosts_at_join)
  722. potentially_changed_hosts.discard(self.server_name)
  723. if not potentially_changed_hosts:
  724. # Nothing to do.
  725. return
  726. logger.info(
  727. "Found %d changed hosts to send device list updates to",
  728. len(potentially_changed_hosts),
  729. )
  730. for user_id, device_id in local_changes:
  731. await self.store.add_device_list_outbound_pokes(
  732. user_id=user_id,
  733. device_id=device_id,
  734. room_id=room_id,
  735. hosts=potentially_changed_hosts,
  736. context=None,
  737. )
  738. # Notify things that device lists need to be sent out.
  739. self.notifier.notify_replication()
  740. for host in potentially_changed_hosts:
  741. self.federation_sender.send_device_messages(host, immediate=False)
  742. def _update_device_from_client_ips(
  743. device: JsonDict, client_ips: Mapping[Tuple[str, str], Mapping[str, Any]]
  744. ) -> None:
  745. ip = client_ips.get((device["user_id"], device["device_id"]), {})
  746. device.update(
  747. {
  748. "last_seen_user_agent": ip.get("user_agent"),
  749. "last_seen_ts": ip.get("last_seen"),
  750. "last_seen_ip": ip.get("ip"),
  751. }
  752. )
  753. class DeviceListWorkerUpdater:
  754. "Handles incoming device list updates from federation and contacts the main process over replication"
  755. def __init__(self, hs: "HomeServer"):
  756. from synapse.replication.http.devices import (
  757. ReplicationMultiUserDevicesResyncRestServlet,
  758. ReplicationUserDevicesResyncRestServlet,
  759. )
  760. self._user_device_resync_client = (
  761. ReplicationUserDevicesResyncRestServlet.make_client(hs)
  762. )
  763. self._multi_user_device_resync_client = (
  764. ReplicationMultiUserDevicesResyncRestServlet.make_client(hs)
  765. )
  766. async def multi_user_device_resync(
  767. self, user_ids: List[str], mark_failed_as_stale: bool = True
  768. ) -> Dict[str, Optional[JsonDict]]:
  769. """
  770. Like `user_device_resync` but operates on multiple users **from the same origin**
  771. at once.
  772. Returns:
  773. Dict from User ID to the same Dict as `user_device_resync`.
  774. """
  775. # mark_failed_as_stale is not sent. Ensure this doesn't break expectations.
  776. assert mark_failed_as_stale
  777. if not user_ids:
  778. # Shortcut empty requests
  779. return {}
  780. try:
  781. return await self._multi_user_device_resync_client(user_ids=user_ids)
  782. except SynapseError as err:
  783. if not (
  784. err.code == HTTPStatus.NOT_FOUND and err.errcode == Codes.UNRECOGNIZED
  785. ):
  786. raise
  787. # Fall back to single requests
  788. result: Dict[str, Optional[JsonDict]] = {}
  789. for user_id in user_ids:
  790. result[user_id] = await self._user_device_resync_client(user_id=user_id)
  791. return result
  792. async def user_device_resync(
  793. self, user_id: str, mark_failed_as_stale: bool = True
  794. ) -> Optional[JsonDict]:
  795. """Fetches all devices for a user and updates the device cache with them.
  796. Args:
  797. user_id: The user's id whose device_list will be updated.
  798. mark_failed_as_stale: Whether to mark the user's device list as stale
  799. if the attempt to resync failed.
  800. Returns:
  801. A dict with device info as under the "devices" in the result of this
  802. request:
  803. https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
  804. None when we weren't able to fetch the device info for some reason,
  805. e.g. due to a connection problem.
  806. """
  807. return (await self.multi_user_device_resync([user_id]))[user_id]
  808. class DeviceListUpdater(DeviceListWorkerUpdater):
  809. "Handles incoming device list updates from federation and updates the DB"
  810. def __init__(self, hs: "HomeServer", device_handler: DeviceHandler):
  811. self.store = hs.get_datastores().main
  812. self.federation = hs.get_federation_client()
  813. self.clock = hs.get_clock()
  814. self.device_handler = device_handler
  815. self._notifier = hs.get_notifier()
  816. self._remote_edu_linearizer = Linearizer(name="remote_device_list")
  817. # user_id -> list of updates waiting to be handled.
  818. self._pending_updates: Dict[
  819. str, List[Tuple[str, str, Iterable[str], JsonDict]]
  820. ] = {}
  821. # Recently seen stream ids. We don't bother keeping these in the DB,
  822. # but they're useful to have them about to reduce the number of spurious
  823. # resyncs.
  824. self._seen_updates: ExpiringCache[str, Set[str]] = ExpiringCache(
  825. cache_name="device_update_edu",
  826. clock=self.clock,
  827. max_len=10000,
  828. expiry_ms=30 * 60 * 1000,
  829. iterable=True,
  830. )
  831. # Attempt to resync out of sync device lists every 30s.
  832. self._resync_retry_in_progress = False
  833. self.clock.looping_call(
  834. run_as_background_process,
  835. 30 * 1000,
  836. func=self._maybe_retry_device_resync,
  837. desc="_maybe_retry_device_resync",
  838. )
  839. @trace
  840. async def incoming_device_list_update(
  841. self, origin: str, edu_content: JsonDict
  842. ) -> None:
  843. """Called on incoming device list update from federation. Responsible
  844. for parsing the EDU and adding to pending updates list.
  845. """
  846. set_tag("origin", origin)
  847. set_tag("edu_content", str(edu_content))
  848. user_id = edu_content.pop("user_id")
  849. device_id = edu_content.pop("device_id")
  850. stream_id = str(edu_content.pop("stream_id")) # They may come as ints
  851. prev_ids = edu_content.pop("prev_id", [])
  852. if not isinstance(prev_ids, list):
  853. raise SynapseError(
  854. 400, "Device list update had an invalid 'prev_ids' field"
  855. )
  856. prev_ids = [str(p) for p in prev_ids] # They may come as ints
  857. if get_domain_from_id(user_id) != origin:
  858. # TODO: Raise?
  859. logger.warning(
  860. "Got device list update edu for %r/%r from %r",
  861. user_id,
  862. device_id,
  863. origin,
  864. )
  865. set_tag("error", True)
  866. log_kv(
  867. {
  868. "message": "Got a device list update edu from a user and "
  869. "device which does not match the origin of the request.",
  870. "user_id": user_id,
  871. "device_id": device_id,
  872. }
  873. )
  874. return
  875. # Check if we are partially joining any rooms. If so we need to store
  876. # all device list updates so that we can handle them correctly once we
  877. # know who is in the room.
  878. # TODO(faster_joins): this fetches and processes a bunch of data that we don't
  879. # use. Could be replaced by a tighter query e.g.
  880. # SELECT EXISTS(SELECT 1 FROM partial_state_rooms)
  881. partial_rooms = await self.store.get_partial_state_room_resync_info()
  882. if partial_rooms:
  883. await self.store.add_remote_device_list_to_pending(
  884. user_id,
  885. device_id,
  886. )
  887. self._notifier.notify_replication()
  888. room_ids = await self.store.get_rooms_for_user(user_id)
  889. if not room_ids:
  890. # We don't share any rooms with this user. Ignore update, as we
  891. # probably won't get any further updates.
  892. set_tag("error", True)
  893. log_kv(
  894. {
  895. "message": "Got an update from a user for which "
  896. "we don't share any rooms",
  897. "other user_id": user_id,
  898. }
  899. )
  900. logger.warning(
  901. "Got device list update edu for %r/%r, but don't share a room",
  902. user_id,
  903. device_id,
  904. )
  905. return
  906. logger.debug("Received device list update for %r/%r", user_id, device_id)
  907. self._pending_updates.setdefault(user_id, []).append(
  908. (device_id, stream_id, prev_ids, edu_content)
  909. )
  910. await self._handle_device_updates(user_id)
  911. @measure_func("_incoming_device_list_update")
  912. async def _handle_device_updates(self, user_id: str) -> None:
  913. "Actually handle pending updates."
  914. async with self._remote_edu_linearizer.queue(user_id):
  915. pending_updates = self._pending_updates.pop(user_id, [])
  916. if not pending_updates:
  917. # This can happen since we batch updates
  918. return
  919. for device_id, stream_id, prev_ids, _ in pending_updates:
  920. logger.debug(
  921. "Handling update %r/%r, ID: %r, prev: %r ",
  922. user_id,
  923. device_id,
  924. stream_id,
  925. prev_ids,
  926. )
  927. # Given a list of updates we check if we need to resync. This
  928. # happens if we've missed updates.
  929. resync = await self._need_to_do_resync(user_id, pending_updates)
  930. if logger.isEnabledFor(logging.INFO):
  931. logger.info(
  932. "Received device list update for %s, requiring resync: %s. Devices: %s",
  933. user_id,
  934. resync,
  935. ", ".join(u[0] for u in pending_updates),
  936. )
  937. if resync:
  938. await self.user_device_resync(user_id)
  939. else:
  940. # Simply update the single device, since we know that is the only
  941. # change (because of the single prev_id matching the current cache)
  942. for device_id, stream_id, _, content in pending_updates:
  943. await self.store.update_remote_device_list_cache_entry(
  944. user_id, device_id, content, stream_id
  945. )
  946. await self.device_handler.notify_device_update(
  947. user_id, [device_id for device_id, _, _, _ in pending_updates]
  948. )
  949. self._seen_updates.setdefault(user_id, set()).update(
  950. stream_id for _, stream_id, _, _ in pending_updates
  951. )
  952. async def _need_to_do_resync(
  953. self, user_id: str, updates: Iterable[Tuple[str, str, Iterable[str], JsonDict]]
  954. ) -> bool:
  955. """Given a list of updates for a user figure out if we need to do a full
  956. resync, or whether we have enough data that we can just apply the delta.
  957. """
  958. seen_updates: Set[str] = self._seen_updates.get(user_id, set())
  959. extremity = await self.store.get_device_list_last_stream_id_for_remote(user_id)
  960. logger.debug("Current extremity for %r: %r", user_id, extremity)
  961. stream_id_in_updates = set() # stream_ids in updates list
  962. for _, stream_id, prev_ids, _ in updates:
  963. if not prev_ids:
  964. # We always do a resync if there are no previous IDs
  965. return True
  966. for prev_id in prev_ids:
  967. if prev_id == extremity:
  968. continue
  969. elif prev_id in seen_updates:
  970. continue
  971. elif prev_id in stream_id_in_updates:
  972. continue
  973. else:
  974. return True
  975. stream_id_in_updates.add(stream_id)
  976. return False
  977. @trace
  978. async def _maybe_retry_device_resync(self) -> None:
  979. """Retry to resync device lists that are out of sync, except if another retry is
  980. in progress.
  981. """
  982. if self._resync_retry_in_progress:
  983. return
  984. try:
  985. # Prevent another call of this function to retry resyncing device lists so
  986. # we don't send too many requests.
  987. self._resync_retry_in_progress = True
  988. # Get all of the users that need resyncing.
  989. need_resync = await self.store.get_user_ids_requiring_device_list_resync()
  990. # Iterate over the set of user IDs.
  991. for user_id in need_resync:
  992. try:
  993. # Try to resync the current user's devices list.
  994. result = await self.user_device_resync(
  995. user_id=user_id,
  996. mark_failed_as_stale=False,
  997. )
  998. # user_device_resync only returns a result if it managed to
  999. # successfully resync and update the database. Updating the table
  1000. # of users requiring resync isn't necessary here as
  1001. # user_device_resync already does it (through
  1002. # self.store.update_remote_device_list_cache).
  1003. if result:
  1004. logger.debug(
  1005. "Successfully resynced the device list for %s",
  1006. user_id,
  1007. )
  1008. except Exception as e:
  1009. # If there was an issue resyncing this user, e.g. if the remote
  1010. # server sent a malformed result, just log the error instead of
  1011. # aborting all the subsequent resyncs.
  1012. logger.debug(
  1013. "Could not resync the device list for %s: %s",
  1014. user_id,
  1015. e,
  1016. )
  1017. finally:
  1018. # Allow future calls to retry resyncinc out of sync device lists.
  1019. self._resync_retry_in_progress = False
  1020. async def multi_user_device_resync(
  1021. self, user_ids: List[str], mark_failed_as_stale: bool = True
  1022. ) -> Dict[str, Optional[JsonDict]]:
  1023. """
  1024. Like `user_device_resync` but operates on multiple users **from the same origin**
  1025. at once.
  1026. Returns:
  1027. Dict from User ID to the same Dict as `user_device_resync`.
  1028. """
  1029. if not user_ids:
  1030. return {}
  1031. origins = {UserID.from_string(user_id).domain for user_id in user_ids}
  1032. if len(origins) != 1:
  1033. raise InvalidAPICallError(f"Only one origin permitted, got {origins!r}")
  1034. result = {}
  1035. failed = set()
  1036. # TODO(Perf): Actually batch these up
  1037. for user_id in user_ids:
  1038. user_result, user_failed = await self._user_device_resync_returning_failed(
  1039. user_id
  1040. )
  1041. result[user_id] = user_result
  1042. if user_failed:
  1043. failed.add(user_id)
  1044. if mark_failed_as_stale:
  1045. await self.store.mark_remote_users_device_caches_as_stale(failed)
  1046. return result
  1047. async def user_device_resync(
  1048. self, user_id: str, mark_failed_as_stale: bool = True
  1049. ) -> Optional[JsonDict]:
  1050. result, failed = await self._user_device_resync_returning_failed(user_id)
  1051. if failed and mark_failed_as_stale:
  1052. # Mark the remote user's device list as stale so we know we need to retry
  1053. # it later.
  1054. await self.store.mark_remote_users_device_caches_as_stale((user_id,))
  1055. return result
  1056. async def _user_device_resync_returning_failed(
  1057. self, user_id: str
  1058. ) -> Tuple[Optional[JsonDict], bool]:
  1059. """Fetches all devices for a user and updates the device cache with them.
  1060. Args:
  1061. user_id: The user's id whose device_list will be updated.
  1062. Returns:
  1063. - A dict with device info as under the "devices" in the result of this
  1064. request:
  1065. https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
  1066. None when we weren't able to fetch the device info for some reason,
  1067. e.g. due to a connection problem.
  1068. - True iff the resync failed and the device list should be marked as stale.
  1069. """
  1070. logger.debug("Attempting to resync the device list for %s", user_id)
  1071. log_kv({"message": "Doing resync to update device list."})
  1072. # Fetch all devices for the user.
  1073. origin = get_domain_from_id(user_id)
  1074. try:
  1075. result = await self.federation.query_user_devices(origin, user_id)
  1076. except NotRetryingDestination:
  1077. return None, True
  1078. except (RequestSendFailed, HttpResponseException) as e:
  1079. logger.warning(
  1080. "Failed to handle device list update for %s: %s",
  1081. user_id,
  1082. e,
  1083. )
  1084. # We abort on exceptions rather than accepting the update
  1085. # as otherwise synapse will 'forget' that its device list
  1086. # is out of date. If we bail then we will retry the resync
  1087. # next time we get a device list update for this user_id.
  1088. # This makes it more likely that the device lists will
  1089. # eventually become consistent.
  1090. return None, True
  1091. except FederationDeniedError as e:
  1092. set_tag("error", True)
  1093. log_kv({"reason": "FederationDeniedError"})
  1094. logger.info(e)
  1095. return None, False
  1096. except Exception as e:
  1097. set_tag("error", True)
  1098. log_kv(
  1099. {"message": "Exception raised by federation request", "exception": e}
  1100. )
  1101. logger.exception("Failed to handle device list update for %s", user_id)
  1102. return None, True
  1103. log_kv({"result": result})
  1104. stream_id = result["stream_id"]
  1105. devices = result["devices"]
  1106. # Get the master key and the self-signing key for this user if provided in the
  1107. # response (None if not in the response).
  1108. # The response will not contain the user signing key, as this key is only used by
  1109. # its owner, thus it doesn't make sense to send it over federation.
  1110. master_key = result.get("master_key")
  1111. self_signing_key = result.get("self_signing_key")
  1112. ignore_devices = False
  1113. # If the remote server has more than ~1000 devices for this user
  1114. # we assume that something is going horribly wrong (e.g. a bot
  1115. # that logs in and creates a new device every time it tries to
  1116. # send a message). Maintaining lots of devices per user in the
  1117. # cache can cause serious performance issues as if this request
  1118. # takes more than 60s to complete, internal replication from the
  1119. # inbound federation worker to the synapse master may time out
  1120. # causing the inbound federation to fail and causing the remote
  1121. # server to retry, causing a DoS. So in this scenario we give
  1122. # up on storing the total list of devices and only handle the
  1123. # delta instead.
  1124. if len(devices) > 1000:
  1125. logger.warning(
  1126. "Ignoring device list snapshot for %s as it has >1K devs (%d)",
  1127. user_id,
  1128. len(devices),
  1129. )
  1130. devices = []
  1131. ignore_devices = True
  1132. else:
  1133. prev_stream_id = await self.store.get_device_list_last_stream_id_for_remote(
  1134. user_id
  1135. )
  1136. cached_devices = await self.store.get_cached_devices_for_user(user_id)
  1137. # To ensure that a user with no devices is cached, we skip the resync only
  1138. # if we have a stream_id from previously writing a cache entry.
  1139. if prev_stream_id is not None and cached_devices == {
  1140. d["device_id"]: d for d in devices
  1141. }:
  1142. logging.info(
  1143. "Skipping device list resync for %s, as our cache matches already",
  1144. user_id,
  1145. )
  1146. devices = []
  1147. ignore_devices = True
  1148. for device in devices:
  1149. logger.debug(
  1150. "Handling resync update %r/%r, ID: %r",
  1151. user_id,
  1152. device["device_id"],
  1153. stream_id,
  1154. )
  1155. if not ignore_devices:
  1156. await self.store.update_remote_device_list_cache(
  1157. user_id, devices, stream_id
  1158. )
  1159. # mark the cache as valid, whether or not we actually processed any device
  1160. # list updates.
  1161. await self.store.mark_remote_user_device_cache_as_valid(user_id)
  1162. device_ids = [device["device_id"] for device in devices]
  1163. # Handle cross-signing keys.
  1164. cross_signing_device_ids = await self.process_cross_signing_key_update(
  1165. user_id,
  1166. master_key,
  1167. self_signing_key,
  1168. )
  1169. device_ids = device_ids + cross_signing_device_ids
  1170. if device_ids:
  1171. await self.device_handler.notify_device_update(user_id, device_ids)
  1172. # We clobber the seen updates since we've re-synced from a given
  1173. # point.
  1174. self._seen_updates[user_id] = {stream_id}
  1175. return result, False
  1176. async def process_cross_signing_key_update(
  1177. self,
  1178. user_id: str,
  1179. master_key: Optional[JsonDict],
  1180. self_signing_key: Optional[JsonDict],
  1181. ) -> List[str]:
  1182. """Process the given new master and self-signing key for the given remote user.
  1183. Args:
  1184. user_id: The ID of the user these keys are for.
  1185. master_key: The dict of the cross-signing master key as returned by the
  1186. remote server.
  1187. self_signing_key: The dict of the cross-signing self-signing key as returned
  1188. by the remote server.
  1189. Return:
  1190. The device IDs for the given keys.
  1191. """
  1192. device_ids = []
  1193. current_keys_map = await self.store.get_e2e_cross_signing_keys_bulk([user_id])
  1194. current_keys = current_keys_map.get(user_id) or {}
  1195. if master_key and master_key != current_keys.get("master"):
  1196. await self.store.set_e2e_cross_signing_key(user_id, "master", master_key)
  1197. _, verify_key = get_verify_key_from_cross_signing_key(master_key)
  1198. # verify_key is a VerifyKey from signedjson, which uses
  1199. # .version to denote the portion of the key ID after the
  1200. # algorithm and colon, which is the device ID
  1201. device_ids.append(verify_key.version)
  1202. if self_signing_key and self_signing_key != current_keys.get("self_signing"):
  1203. await self.store.set_e2e_cross_signing_key(
  1204. user_id, "self_signing", self_signing_key
  1205. )
  1206. _, verify_key = get_verify_key_from_cross_signing_key(self_signing_key)
  1207. device_ids.append(verify_key.version)
  1208. return device_ids
  1209. async def handle_room_un_partial_stated(self, room_id: str) -> None:
  1210. """Handles sending appropriate device list updates in a room that has
  1211. gone from partial to full state.
  1212. """
  1213. pending_updates = (
  1214. await self.store.get_pending_remote_device_list_updates_for_room(room_id)
  1215. )
  1216. for user_id, device_id in pending_updates:
  1217. logger.info(
  1218. "Got pending device list update in room %s: %s / %s",
  1219. room_id,
  1220. user_id,
  1221. device_id,
  1222. )
  1223. position = await self.store.add_device_change_to_streams(
  1224. user_id,
  1225. [device_id],
  1226. room_ids=[room_id],
  1227. )
  1228. if not position:
  1229. # This should only happen if there are no updates, which
  1230. # shouldn't happen when we've passed in a non-empty set of
  1231. # device IDs.
  1232. continue
  1233. self.device_handler.notifier.on_new_event(
  1234. StreamKeyType.DEVICE_LIST, position, rooms=[room_id]
  1235. )