1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465 |
- # Copyright 2016 OpenMarket Ltd
- # Copyright 2019 New Vector Ltd
- # Copyright 2019,2020 The Matrix.org Foundation C.I.C.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import logging
- from http import HTTPStatus
- from typing import (
- TYPE_CHECKING,
- Any,
- Dict,
- Iterable,
- List,
- Mapping,
- Optional,
- Set,
- Tuple,
- )
- from synapse.api import errors
- from synapse.api.constants import EduTypes, EventTypes
- from synapse.api.errors import (
- Codes,
- FederationDeniedError,
- HttpResponseException,
- InvalidAPICallError,
- RequestSendFailed,
- SynapseError,
- )
- from synapse.logging.opentracing import log_kv, set_tag, trace
- from synapse.metrics.background_process_metrics import (
- run_as_background_process,
- wrap_as_background_process,
- )
- from synapse.types import (
- JsonDict,
- StrCollection,
- StreamKeyType,
- StreamToken,
- UserID,
- get_domain_from_id,
- get_verify_key_from_cross_signing_key,
- )
- from synapse.util import stringutils
- from synapse.util.async_helpers import Linearizer
- from synapse.util.caches.expiringcache import ExpiringCache
- from synapse.util.cancellation import cancellable
- from synapse.util.metrics import measure_func
- from synapse.util.retryutils import NotRetryingDestination
- if TYPE_CHECKING:
- from synapse.server import HomeServer
- logger = logging.getLogger(__name__)
- MAX_DEVICE_DISPLAY_NAME_LEN = 100
- DELETE_STALE_DEVICES_INTERVAL_MS = 24 * 60 * 60 * 1000
- class DeviceWorkerHandler:
- device_list_updater: "DeviceListWorkerUpdater"
- def __init__(self, hs: "HomeServer"):
- self.clock = hs.get_clock()
- self.hs = hs
- self.store = hs.get_datastores().main
- self.notifier = hs.get_notifier()
- self.state = hs.get_state_handler()
- self._state_storage = hs.get_storage_controllers().state
- self._auth_handler = hs.get_auth_handler()
- self.server_name = hs.hostname
- self._msc3852_enabled = hs.config.experimental.msc3852_enabled
- self.device_list_updater = DeviceListWorkerUpdater(hs)
- @trace
- async def get_devices_by_user(self, user_id: str) -> List[JsonDict]:
- """
- Retrieve the given user's devices
- Args:
- user_id: The user ID to query for devices.
- Returns:
- info on each device
- """
- set_tag("user_id", user_id)
- device_map = await self.store.get_devices_by_user(user_id)
- ips = await self.store.get_last_client_ip_by_device(user_id, device_id=None)
- devices = list(device_map.values())
- for device in devices:
- _update_device_from_client_ips(device, ips)
- log_kv(device_map)
- return devices
- async def get_dehydrated_device(
- self, user_id: str
- ) -> Optional[Tuple[str, JsonDict]]:
- """Retrieve the information for a dehydrated device.
- Args:
- user_id: the user whose dehydrated device we are looking for
- Returns:
- a tuple whose first item is the device ID, and the second item is
- the dehydrated device information
- """
- return await self.store.get_dehydrated_device(user_id)
- @trace
- async def get_device(self, user_id: str, device_id: str) -> JsonDict:
- """Retrieve the given device
- Args:
- user_id: The user to get the device from
- device_id: The device to fetch.
- Returns:
- info on the device
- Raises:
- errors.NotFoundError: if the device was not found
- """
- device = await self.store.get_device(user_id, device_id)
- if device is None:
- raise errors.NotFoundError()
- ips = await self.store.get_last_client_ip_by_device(user_id, device_id)
- _update_device_from_client_ips(device, ips)
- set_tag("device", str(device))
- set_tag("ips", str(ips))
- return device
- @cancellable
- async def get_device_changes_in_shared_rooms(
- self, user_id: str, room_ids: StrCollection, from_token: StreamToken
- ) -> Set[str]:
- """Get the set of users whose devices have changed who share a room with
- the given user.
- """
- changed_users = await self.store.get_device_list_changes_in_rooms(
- room_ids, from_token.device_list_key
- )
- if changed_users is not None:
- # We also check if the given user has changed their device. If
- # they're in no rooms then the above query won't include them.
- changed = await self.store.get_users_whose_devices_changed(
- from_token.device_list_key, [user_id]
- )
- changed_users.update(changed)
- return changed_users
- # If the DB returned None then the `from_token` is too old, so we fall
- # back on looking for device updates for all users.
- users_who_share_room = await self.store.get_users_who_share_room_with_user(
- user_id
- )
- tracked_users = set(users_who_share_room)
- # Always tell the user about their own devices
- tracked_users.add(user_id)
- changed = await self.store.get_users_whose_devices_changed(
- from_token.device_list_key, tracked_users
- )
- return changed
- @trace
- @measure_func("device.get_user_ids_changed")
- @cancellable
- async def get_user_ids_changed(
- self, user_id: str, from_token: StreamToken
- ) -> JsonDict:
- """Get list of users that have had the devices updated, or have newly
- joined a room, that `user_id` may be interested in.
- """
- set_tag("user_id", user_id)
- set_tag("from_token", str(from_token))
- now_room_key = self.store.get_room_max_token()
- room_ids = await self.store.get_rooms_for_user(user_id)
- changed = await self.get_device_changes_in_shared_rooms(
- user_id, room_ids, from_token
- )
- # Then work out if any users have since joined
- rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
- member_events = await self.store.get_membership_changes_for_user(
- user_id, from_token.room_key, now_room_key
- )
- rooms_changed.update(event.room_id for event in member_events)
- stream_ordering = from_token.room_key.stream
- possibly_changed = set(changed)
- possibly_left = set()
- for room_id in rooms_changed:
- current_state_ids = await self._state_storage.get_current_state_ids(
- room_id, await_full_state=False
- )
- # The user may have left the room
- # TODO: Check if they actually did or if we were just invited.
- if room_id not in room_ids:
- for etype, state_key in current_state_ids.keys():
- if etype != EventTypes.Member:
- continue
- possibly_left.add(state_key)
- continue
- # Fetch the current state at the time.
- try:
- event_ids = await self.store.get_forward_extremities_for_room_at_stream_ordering(
- room_id, stream_ordering=stream_ordering
- )
- except errors.StoreError:
- # we have purged the stream_ordering index since the stream
- # ordering: treat it the same as a new room
- event_ids = []
- # special-case for an empty prev state: include all members
- # in the changed list
- if not event_ids:
- log_kv(
- {"event": "encountered empty previous state", "room_id": room_id}
- )
- for etype, state_key in current_state_ids.keys():
- if etype != EventTypes.Member:
- continue
- possibly_changed.add(state_key)
- continue
- current_member_id = current_state_ids.get((EventTypes.Member, user_id))
- if not current_member_id:
- continue
- # mapping from event_id -> state_dict
- prev_state_ids = await self._state_storage.get_state_ids_for_events(
- event_ids,
- await_full_state=False,
- )
- # Check if we've joined the room? If so we just blindly add all the users to
- # the "possibly changed" users.
- for state_dict in prev_state_ids.values():
- member_event = state_dict.get((EventTypes.Member, user_id), None)
- if not member_event or member_event != current_member_id:
- for etype, state_key in current_state_ids.keys():
- if etype != EventTypes.Member:
- continue
- possibly_changed.add(state_key)
- break
- # If there has been any change in membership, include them in the
- # possibly changed list. We'll check if they are joined below,
- # and we're not toooo worried about spuriously adding users.
- for key, event_id in current_state_ids.items():
- etype, state_key = key
- if etype != EventTypes.Member:
- continue
- # check if this member has changed since any of the extremities
- # at the stream_ordering, and add them to the list if so.
- for state_dict in prev_state_ids.values():
- prev_event_id = state_dict.get(key, None)
- if not prev_event_id or prev_event_id != event_id:
- if state_key != user_id:
- possibly_changed.add(state_key)
- break
- if possibly_changed or possibly_left:
- possibly_joined = possibly_changed
- possibly_left = possibly_changed | possibly_left
- # Double check if we still share rooms with the given user.
- users_rooms = await self.store.get_rooms_for_users(possibly_left)
- for changed_user_id, entries in users_rooms.items():
- if any(rid in room_ids for rid in entries):
- possibly_left.discard(changed_user_id)
- else:
- possibly_joined.discard(changed_user_id)
- else:
- possibly_joined = set()
- possibly_left = set()
- result = {"changed": list(possibly_joined), "left": list(possibly_left)}
- log_kv(result)
- return result
- async def on_federation_query_user_devices(self, user_id: str) -> JsonDict:
- stream_id, devices = await self.store.get_e2e_device_keys_for_federation_query(
- user_id
- )
- master_key = await self.store.get_e2e_cross_signing_key(user_id, "master")
- self_signing_key = await self.store.get_e2e_cross_signing_key(
- user_id, "self_signing"
- )
- return {
- "user_id": user_id,
- "stream_id": stream_id,
- "devices": devices,
- "master_key": master_key,
- "self_signing_key": self_signing_key,
- }
- async def handle_room_un_partial_stated(self, room_id: str) -> None:
- """Handles sending appropriate device list updates in a room that has
- gone from partial to full state.
- """
- # TODO(faster_joins): worker mode support
- # https://github.com/matrix-org/synapse/issues/12994
- logger.error(
- "Trying handling device list state for partial join: not supported on workers."
- )
- class DeviceHandler(DeviceWorkerHandler):
- device_list_updater: "DeviceListUpdater"
- def __init__(self, hs: "HomeServer"):
- super().__init__(hs)
- self.federation_sender = hs.get_federation_sender()
- self._account_data_handler = hs.get_account_data_handler()
- self._storage_controllers = hs.get_storage_controllers()
- self.device_list_updater = DeviceListUpdater(hs, self)
- federation_registry = hs.get_federation_registry()
- federation_registry.register_edu_handler(
- EduTypes.DEVICE_LIST_UPDATE,
- self.device_list_updater.incoming_device_list_update,
- )
- # Whether `_handle_new_device_update_async` is currently processing.
- self._handle_new_device_update_is_processing = False
- # If a new device update may have happened while the loop was
- # processing.
- self._handle_new_device_update_new_data = False
- # On start up check if there are any updates pending.
- hs.get_reactor().callWhenRunning(self._handle_new_device_update_async)
- self._delete_stale_devices_after = hs.config.server.delete_stale_devices_after
- # Ideally we would run this on a worker and condition this on the
- # "run_background_tasks_on" setting, but this would mean making the notification
- # of device list changes over federation work on workers, which is nontrivial.
- if self._delete_stale_devices_after is not None:
- self.clock.looping_call(
- run_as_background_process,
- DELETE_STALE_DEVICES_INTERVAL_MS,
- "delete_stale_devices",
- self._delete_stale_devices,
- )
- def _check_device_name_length(self, name: Optional[str]) -> None:
- """
- Checks whether a device name is longer than the maximum allowed length.
- Args:
- name: The name of the device.
- Raises:
- SynapseError: if the device name is too long.
- """
- if name and len(name) > MAX_DEVICE_DISPLAY_NAME_LEN:
- raise SynapseError(
- 400,
- "Device display name is too long (max %i)"
- % (MAX_DEVICE_DISPLAY_NAME_LEN,),
- errcode=Codes.TOO_LARGE,
- )
- async def check_device_registered(
- self,
- user_id: str,
- device_id: Optional[str],
- initial_device_display_name: Optional[str] = None,
- auth_provider_id: Optional[str] = None,
- auth_provider_session_id: Optional[str] = None,
- ) -> str:
- """
- If the given device has not been registered, register it with the
- supplied display name.
- If no device_id is supplied, we make one up.
- Args:
- user_id: @user:id
- device_id: device id supplied by client
- initial_device_display_name: device display name from client
- auth_provider_id: The SSO IdP the user used, if any.
- auth_provider_session_id: The session ID (sid) got from the SSO IdP.
- Returns:
- device id (generated if none was supplied)
- """
- self._check_device_name_length(initial_device_display_name)
- if device_id is not None:
- new_device = await self.store.store_device(
- user_id=user_id,
- device_id=device_id,
- initial_device_display_name=initial_device_display_name,
- auth_provider_id=auth_provider_id,
- auth_provider_session_id=auth_provider_session_id,
- )
- if new_device:
- await self.notify_device_update(user_id, [device_id])
- return device_id
- # if the device id is not specified, we'll autogen one, but loop a few
- # times in case of a clash.
- attempts = 0
- while attempts < 5:
- new_device_id = stringutils.random_string(10).upper()
- new_device = await self.store.store_device(
- user_id=user_id,
- device_id=new_device_id,
- initial_device_display_name=initial_device_display_name,
- auth_provider_id=auth_provider_id,
- auth_provider_session_id=auth_provider_session_id,
- )
- if new_device:
- await self.notify_device_update(user_id, [new_device_id])
- return new_device_id
- attempts += 1
- raise errors.StoreError(500, "Couldn't generate a device ID.")
- async def _delete_stale_devices(self) -> None:
- """Background task that deletes devices which haven't been accessed for more than
- a configured time period.
- """
- # We should only be running this job if the config option is defined.
- assert self._delete_stale_devices_after is not None
- now_ms = self.clock.time_msec()
- since_ms = now_ms - self._delete_stale_devices_after
- devices = await self.store.get_local_devices_not_accessed_since(since_ms)
- for user_id, user_devices in devices.items():
- await self.delete_devices(user_id, user_devices)
- @trace
- async def delete_all_devices_for_user(
- self, user_id: str, except_device_id: Optional[str] = None
- ) -> None:
- """Delete all of the user's devices
- Args:
- user_id: The user to remove all devices from
- except_device_id: optional device id which should not be deleted
- """
- device_map = await self.store.get_devices_by_user(user_id)
- device_ids = list(device_map)
- if except_device_id is not None:
- device_ids = [d for d in device_ids if d != except_device_id]
- await self.delete_devices(user_id, device_ids)
- async def delete_devices(self, user_id: str, device_ids: List[str]) -> None:
- """Delete several devices
- Args:
- user_id: The user to delete devices from.
- device_ids: The list of device IDs to delete
- """
- try:
- await self.store.delete_devices(user_id, device_ids)
- except errors.StoreError as e:
- if e.code == 404:
- # no match
- set_tag("error", True)
- set_tag("reason", "User doesn't have that device id.")
- else:
- raise
- await self.hs.get_pusherpool().remove_pushers_by_devices(user_id, device_ids)
- # Delete data specific to each device. Not optimised as it is not
- # considered as part of a critical path.
- for device_id in device_ids:
- await self._auth_handler.delete_access_tokens_for_user(
- user_id, device_id=device_id
- )
- await self.store.delete_e2e_keys_by_device(
- user_id=user_id, device_id=device_id
- )
- if self.hs.config.experimental.msc3890_enabled:
- # Remove any local notification settings for this device in accordance
- # with MSC3890.
- await self._account_data_handler.remove_account_data_for_user(
- user_id,
- f"org.matrix.msc3890.local_notification_settings.{device_id}",
- )
- await self.notify_device_update(user_id, device_ids)
- async def update_device(self, user_id: str, device_id: str, content: dict) -> None:
- """Update the given device
- Args:
- user_id: The user to update devices of.
- device_id: The device to update.
- content: body of update request
- """
- # Reject a new displayname which is too long.
- new_display_name = content.get("display_name")
- self._check_device_name_length(new_display_name)
- try:
- await self.store.update_device(
- user_id, device_id, new_display_name=new_display_name
- )
- await self.notify_device_update(user_id, [device_id])
- except errors.StoreError as e:
- if e.code == 404:
- raise errors.NotFoundError()
- else:
- raise
- @trace
- @measure_func("notify_device_update")
- async def notify_device_update(
- self, user_id: str, device_ids: StrCollection
- ) -> None:
- """Notify that a user's device(s) has changed. Pokes the notifier, and
- remote servers if the user is local.
- Args:
- user_id: The Matrix ID of the user who's device list has been updated.
- device_ids: The device IDs that have changed.
- """
- if not device_ids:
- # No changes to notify about, so this is a no-op.
- return
- room_ids = await self.store.get_rooms_for_user(user_id)
- position = await self.store.add_device_change_to_streams(
- user_id,
- device_ids,
- room_ids=room_ids,
- )
- if not position:
- # This should only happen if there are no updates, so we bail.
- return
- for device_id in device_ids:
- logger.debug(
- "Notifying about update %r/%r, ID: %r", user_id, device_id, position
- )
- # specify the user ID too since the user should always get their own device list
- # updates, even if they aren't in any rooms.
- self.notifier.on_new_event(
- StreamKeyType.DEVICE_LIST, position, users={user_id}, rooms=room_ids
- )
- # We may need to do some processing asynchronously for local user IDs.
- if self.hs.is_mine_id(user_id):
- self._handle_new_device_update_async()
- async def notify_user_signature_update(
- self, from_user_id: str, user_ids: List[str]
- ) -> None:
- """Notify a user that they have made new signatures of other users.
- Args:
- from_user_id: the user who made the signature
- user_ids: the users IDs that have new signatures
- """
- position = await self.store.add_user_signature_change_to_streams(
- from_user_id, user_ids
- )
- self.notifier.on_new_event(
- StreamKeyType.DEVICE_LIST, position, users=[from_user_id]
- )
- async def store_dehydrated_device(
- self,
- user_id: str,
- device_data: JsonDict,
- initial_device_display_name: Optional[str] = None,
- ) -> str:
- """Store a dehydrated device for a user. If the user had a previous
- dehydrated device, it is removed.
- Args:
- user_id: the user that we are storing the device for
- device_data: the dehydrated device information
- initial_device_display_name: The display name to use for the device
- Returns:
- device id of the dehydrated device
- """
- device_id = await self.check_device_registered(
- user_id,
- None,
- initial_device_display_name,
- )
- old_device_id = await self.store.store_dehydrated_device(
- user_id, device_id, device_data
- )
- if old_device_id is not None:
- await self.delete_devices(user_id, [old_device_id])
- return device_id
- async def rehydrate_device(
- self, user_id: str, access_token: str, device_id: str
- ) -> dict:
- """Process a rehydration request from the user.
- Args:
- user_id: the user who is rehydrating the device
- access_token: the access token used for the request
- device_id: the ID of the device that will be rehydrated
- Returns:
- a dict containing {"success": True}
- """
- success = await self.store.remove_dehydrated_device(user_id, device_id)
- if not success:
- raise errors.NotFoundError()
- # If the dehydrated device was successfully deleted (the device ID
- # matched the stored dehydrated device), then modify the access
- # token to use the dehydrated device's ID and copy the old device
- # display name to the dehydrated device, and destroy the old device
- # ID
- old_device_id = await self.store.set_device_for_access_token(
- access_token, device_id
- )
- old_device = await self.store.get_device(user_id, old_device_id)
- if old_device is None:
- raise errors.NotFoundError()
- await self.store.update_device(user_id, device_id, old_device["display_name"])
- # can't call self.delete_device because that will clobber the
- # access token so call the storage layer directly
- await self.store.delete_devices(user_id, [old_device_id])
- await self.store.delete_e2e_keys_by_device(
- user_id=user_id, device_id=old_device_id
- )
- # tell everyone that the old device is gone and that the dehydrated
- # device has a new display name
- await self.notify_device_update(user_id, [old_device_id, device_id])
- return {"success": True}
- @wrap_as_background_process("_handle_new_device_update_async")
- async def _handle_new_device_update_async(self) -> None:
- """Called when we have a new local device list update that we need to
- send out over federation.
- This happens in the background so as not to block the original request
- that generated the device update.
- """
- if self._handle_new_device_update_is_processing:
- self._handle_new_device_update_new_data = True
- return
- self._handle_new_device_update_is_processing = True
- # The stream ID we processed previous iteration (if any), and the set of
- # hosts we've already poked about for this update. This is so that we
- # don't poke the same remote server about the same update repeatedly.
- current_stream_id = None
- hosts_already_sent_to: Set[str] = set()
- try:
- stream_id, room_id = await self.store.get_device_change_last_converted_pos()
- while True:
- self._handle_new_device_update_new_data = False
- max_stream_id = self.store.get_device_stream_token()
- rows = await self.store.get_uncoverted_outbound_room_pokes(
- stream_id, room_id
- )
- if not rows:
- # If the DB returned nothing then there is nothing left to
- # do, *unless* a new device list update happened during the
- # DB query.
- # Advance `(stream_id, room_id)`.
- # `max_stream_id` comes from *before* the query for unconverted
- # rows, which means that any unconverted rows must have a larger
- # stream ID.
- if max_stream_id > stream_id:
- stream_id, room_id = max_stream_id, ""
- await self.store.set_device_change_last_converted_pos(
- stream_id, room_id
- )
- else:
- assert max_stream_id == stream_id
- # Avoid moving `room_id` backwards.
- pass
- if self._handle_new_device_update_new_data:
- continue
- else:
- return
- for user_id, device_id, room_id, stream_id, opentracing_context in rows:
- hosts = set()
- # Ignore any users that aren't ours
- if self.hs.is_mine_id(user_id):
- hosts = set(
- await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation(
- room_id
- )
- )
- hosts.discard(self.server_name)
- # For rooms with partial state, `hosts` is merely an
- # approximation. When we transition to a full state room, we
- # will have to send out device list updates to any servers we
- # missed.
- # Check if we've already sent this update to some hosts
- if current_stream_id == stream_id:
- hosts -= hosts_already_sent_to
- await self.store.add_device_list_outbound_pokes(
- user_id=user_id,
- device_id=device_id,
- room_id=room_id,
- hosts=hosts,
- context=opentracing_context,
- )
- # Notify replication that we've updated the device list stream.
- self.notifier.notify_replication()
- if hosts:
- logger.info(
- "Sending device list update notif for %r to: %r",
- user_id,
- hosts,
- )
- for host in hosts:
- self.federation_sender.send_device_messages(
- host, immediate=False
- )
- # TODO: when called, this isn't in a logging context.
- # This leads to log spam, sentry event spam, and massive
- # memory usage.
- # See https://github.com/matrix-org/synapse/issues/12552.
- # log_kv(
- # {"message": "sent device update to host", "host": host}
- # )
- if current_stream_id != stream_id:
- # Clear the set of hosts we've already sent to as we're
- # processing a new update.
- hosts_already_sent_to.clear()
- hosts_already_sent_to.update(hosts)
- current_stream_id = stream_id
- # Advance `(stream_id, room_id)`.
- _, _, room_id, stream_id, _ = rows[-1]
- await self.store.set_device_change_last_converted_pos(
- stream_id, room_id
- )
- finally:
- self._handle_new_device_update_is_processing = False
- async def handle_room_un_partial_stated(self, room_id: str) -> None:
- """Handles sending appropriate device list updates in a room that has
- gone from partial to full state.
- """
- # We defer to the device list updater to handle pending remote device
- # list updates.
- await self.device_list_updater.handle_room_un_partial_stated(room_id)
- # Replay local updates.
- (
- join_event_id,
- device_lists_stream_id,
- ) = await self.store.get_join_event_id_and_device_lists_stream_id_for_partial_state(
- room_id
- )
- # Get the local device list changes that have happened in the room since
- # we started joining. If there are no updates there's nothing left to do.
- changes = await self.store.get_device_list_changes_in_room(
- room_id, device_lists_stream_id
- )
- local_changes = {(u, d) for u, d in changes if self.hs.is_mine_id(u)}
- if not local_changes:
- return
- # Note: We have persisted the full state at this point, we just haven't
- # cleared the `partial_room` flag.
- join_state_ids = await self._state_storage.get_state_ids_for_event(
- join_event_id, await_full_state=False
- )
- current_state_ids = await self.store.get_partial_current_state_ids(room_id)
- # Now we need to work out all servers that might have been in the room
- # at any point during our join.
- # First we look for any membership states that have changed between the
- # initial join and now...
- all_keys = set(join_state_ids)
- all_keys.update(current_state_ids)
- potentially_changed_hosts = set()
- for etype, state_key in all_keys:
- if etype != EventTypes.Member:
- continue
- prev = join_state_ids.get((etype, state_key))
- current = current_state_ids.get((etype, state_key))
- if prev != current:
- potentially_changed_hosts.add(get_domain_from_id(state_key))
- # ... then we add all the hosts that are currently joined to the room...
- current_hosts_in_room = await self.store.get_current_hosts_in_room(room_id)
- potentially_changed_hosts.update(current_hosts_in_room)
- # ... and finally we remove any hosts that we were told about, as we
- # will have sent device list updates to those hosts when they happened.
- known_hosts_at_join = await self.store.get_partial_state_servers_at_join(
- room_id
- )
- assert known_hosts_at_join is not None
- potentially_changed_hosts.difference_update(known_hosts_at_join)
- potentially_changed_hosts.discard(self.server_name)
- if not potentially_changed_hosts:
- # Nothing to do.
- return
- logger.info(
- "Found %d changed hosts to send device list updates to",
- len(potentially_changed_hosts),
- )
- for user_id, device_id in local_changes:
- await self.store.add_device_list_outbound_pokes(
- user_id=user_id,
- device_id=device_id,
- room_id=room_id,
- hosts=potentially_changed_hosts,
- context=None,
- )
- # Notify things that device lists need to be sent out.
- self.notifier.notify_replication()
- for host in potentially_changed_hosts:
- self.federation_sender.send_device_messages(host, immediate=False)
- def _update_device_from_client_ips(
- device: JsonDict, client_ips: Mapping[Tuple[str, str], Mapping[str, Any]]
- ) -> None:
- ip = client_ips.get((device["user_id"], device["device_id"]), {})
- device.update(
- {
- "last_seen_user_agent": ip.get("user_agent"),
- "last_seen_ts": ip.get("last_seen"),
- "last_seen_ip": ip.get("ip"),
- }
- )
- class DeviceListWorkerUpdater:
- "Handles incoming device list updates from federation and contacts the main process over replication"
- def __init__(self, hs: "HomeServer"):
- from synapse.replication.http.devices import (
- ReplicationMultiUserDevicesResyncRestServlet,
- ReplicationUserDevicesResyncRestServlet,
- )
- self._user_device_resync_client = (
- ReplicationUserDevicesResyncRestServlet.make_client(hs)
- )
- self._multi_user_device_resync_client = (
- ReplicationMultiUserDevicesResyncRestServlet.make_client(hs)
- )
- async def multi_user_device_resync(
- self, user_ids: List[str], mark_failed_as_stale: bool = True
- ) -> Dict[str, Optional[JsonDict]]:
- """
- Like `user_device_resync` but operates on multiple users **from the same origin**
- at once.
- Returns:
- Dict from User ID to the same Dict as `user_device_resync`.
- """
- # mark_failed_as_stale is not sent. Ensure this doesn't break expectations.
- assert mark_failed_as_stale
- if not user_ids:
- # Shortcut empty requests
- return {}
- try:
- return await self._multi_user_device_resync_client(user_ids=user_ids)
- except SynapseError as err:
- if not (
- err.code == HTTPStatus.NOT_FOUND and err.errcode == Codes.UNRECOGNIZED
- ):
- raise
- # Fall back to single requests
- result: Dict[str, Optional[JsonDict]] = {}
- for user_id in user_ids:
- result[user_id] = await self._user_device_resync_client(user_id=user_id)
- return result
- async def user_device_resync(
- self, user_id: str, mark_failed_as_stale: bool = True
- ) -> Optional[JsonDict]:
- """Fetches all devices for a user and updates the device cache with them.
- Args:
- user_id: The user's id whose device_list will be updated.
- mark_failed_as_stale: Whether to mark the user's device list as stale
- if the attempt to resync failed.
- Returns:
- A dict with device info as under the "devices" in the result of this
- request:
- https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
- None when we weren't able to fetch the device info for some reason,
- e.g. due to a connection problem.
- """
- return (await self.multi_user_device_resync([user_id]))[user_id]
- class DeviceListUpdater(DeviceListWorkerUpdater):
- "Handles incoming device list updates from federation and updates the DB"
- def __init__(self, hs: "HomeServer", device_handler: DeviceHandler):
- self.store = hs.get_datastores().main
- self.federation = hs.get_federation_client()
- self.clock = hs.get_clock()
- self.device_handler = device_handler
- self._notifier = hs.get_notifier()
- self._remote_edu_linearizer = Linearizer(name="remote_device_list")
- # user_id -> list of updates waiting to be handled.
- self._pending_updates: Dict[
- str, List[Tuple[str, str, Iterable[str], JsonDict]]
- ] = {}
- # Recently seen stream ids. We don't bother keeping these in the DB,
- # but they're useful to have them about to reduce the number of spurious
- # resyncs.
- self._seen_updates: ExpiringCache[str, Set[str]] = ExpiringCache(
- cache_name="device_update_edu",
- clock=self.clock,
- max_len=10000,
- expiry_ms=30 * 60 * 1000,
- iterable=True,
- )
- # Attempt to resync out of sync device lists every 30s.
- self._resync_retry_in_progress = False
- self.clock.looping_call(
- run_as_background_process,
- 30 * 1000,
- func=self._maybe_retry_device_resync,
- desc="_maybe_retry_device_resync",
- )
- @trace
- async def incoming_device_list_update(
- self, origin: str, edu_content: JsonDict
- ) -> None:
- """Called on incoming device list update from federation. Responsible
- for parsing the EDU and adding to pending updates list.
- """
- set_tag("origin", origin)
- set_tag("edu_content", str(edu_content))
- user_id = edu_content.pop("user_id")
- device_id = edu_content.pop("device_id")
- stream_id = str(edu_content.pop("stream_id")) # They may come as ints
- prev_ids = edu_content.pop("prev_id", [])
- if not isinstance(prev_ids, list):
- raise SynapseError(
- 400, "Device list update had an invalid 'prev_ids' field"
- )
- prev_ids = [str(p) for p in prev_ids] # They may come as ints
- if get_domain_from_id(user_id) != origin:
- # TODO: Raise?
- logger.warning(
- "Got device list update edu for %r/%r from %r",
- user_id,
- device_id,
- origin,
- )
- set_tag("error", True)
- log_kv(
- {
- "message": "Got a device list update edu from a user and "
- "device which does not match the origin of the request.",
- "user_id": user_id,
- "device_id": device_id,
- }
- )
- return
- # Check if we are partially joining any rooms. If so we need to store
- # all device list updates so that we can handle them correctly once we
- # know who is in the room.
- # TODO(faster_joins): this fetches and processes a bunch of data that we don't
- # use. Could be replaced by a tighter query e.g.
- # SELECT EXISTS(SELECT 1 FROM partial_state_rooms)
- partial_rooms = await self.store.get_partial_state_room_resync_info()
- if partial_rooms:
- await self.store.add_remote_device_list_to_pending(
- user_id,
- device_id,
- )
- self._notifier.notify_replication()
- room_ids = await self.store.get_rooms_for_user(user_id)
- if not room_ids:
- # We don't share any rooms with this user. Ignore update, as we
- # probably won't get any further updates.
- set_tag("error", True)
- log_kv(
- {
- "message": "Got an update from a user for which "
- "we don't share any rooms",
- "other user_id": user_id,
- }
- )
- logger.warning(
- "Got device list update edu for %r/%r, but don't share a room",
- user_id,
- device_id,
- )
- return
- logger.debug("Received device list update for %r/%r", user_id, device_id)
- self._pending_updates.setdefault(user_id, []).append(
- (device_id, stream_id, prev_ids, edu_content)
- )
- await self._handle_device_updates(user_id)
- @measure_func("_incoming_device_list_update")
- async def _handle_device_updates(self, user_id: str) -> None:
- "Actually handle pending updates."
- async with self._remote_edu_linearizer.queue(user_id):
- pending_updates = self._pending_updates.pop(user_id, [])
- if not pending_updates:
- # This can happen since we batch updates
- return
- for device_id, stream_id, prev_ids, _ in pending_updates:
- logger.debug(
- "Handling update %r/%r, ID: %r, prev: %r ",
- user_id,
- device_id,
- stream_id,
- prev_ids,
- )
- # Given a list of updates we check if we need to resync. This
- # happens if we've missed updates.
- resync = await self._need_to_do_resync(user_id, pending_updates)
- if logger.isEnabledFor(logging.INFO):
- logger.info(
- "Received device list update for %s, requiring resync: %s. Devices: %s",
- user_id,
- resync,
- ", ".join(u[0] for u in pending_updates),
- )
- if resync:
- await self.user_device_resync(user_id)
- else:
- # Simply update the single device, since we know that is the only
- # change (because of the single prev_id matching the current cache)
- for device_id, stream_id, _, content in pending_updates:
- await self.store.update_remote_device_list_cache_entry(
- user_id, device_id, content, stream_id
- )
- await self.device_handler.notify_device_update(
- user_id, [device_id for device_id, _, _, _ in pending_updates]
- )
- self._seen_updates.setdefault(user_id, set()).update(
- stream_id for _, stream_id, _, _ in pending_updates
- )
- async def _need_to_do_resync(
- self, user_id: str, updates: Iterable[Tuple[str, str, Iterable[str], JsonDict]]
- ) -> bool:
- """Given a list of updates for a user figure out if we need to do a full
- resync, or whether we have enough data that we can just apply the delta.
- """
- seen_updates: Set[str] = self._seen_updates.get(user_id, set())
- extremity = await self.store.get_device_list_last_stream_id_for_remote(user_id)
- logger.debug("Current extremity for %r: %r", user_id, extremity)
- stream_id_in_updates = set() # stream_ids in updates list
- for _, stream_id, prev_ids, _ in updates:
- if not prev_ids:
- # We always do a resync if there are no previous IDs
- return True
- for prev_id in prev_ids:
- if prev_id == extremity:
- continue
- elif prev_id in seen_updates:
- continue
- elif prev_id in stream_id_in_updates:
- continue
- else:
- return True
- stream_id_in_updates.add(stream_id)
- return False
- @trace
- async def _maybe_retry_device_resync(self) -> None:
- """Retry to resync device lists that are out of sync, except if another retry is
- in progress.
- """
- if self._resync_retry_in_progress:
- return
- try:
- # Prevent another call of this function to retry resyncing device lists so
- # we don't send too many requests.
- self._resync_retry_in_progress = True
- # Get all of the users that need resyncing.
- need_resync = await self.store.get_user_ids_requiring_device_list_resync()
- # Iterate over the set of user IDs.
- for user_id in need_resync:
- try:
- # Try to resync the current user's devices list.
- result = await self.user_device_resync(
- user_id=user_id,
- mark_failed_as_stale=False,
- )
- # user_device_resync only returns a result if it managed to
- # successfully resync and update the database. Updating the table
- # of users requiring resync isn't necessary here as
- # user_device_resync already does it (through
- # self.store.update_remote_device_list_cache).
- if result:
- logger.debug(
- "Successfully resynced the device list for %s",
- user_id,
- )
- except Exception as e:
- # If there was an issue resyncing this user, e.g. if the remote
- # server sent a malformed result, just log the error instead of
- # aborting all the subsequent resyncs.
- logger.debug(
- "Could not resync the device list for %s: %s",
- user_id,
- e,
- )
- finally:
- # Allow future calls to retry resyncinc out of sync device lists.
- self._resync_retry_in_progress = False
- async def multi_user_device_resync(
- self, user_ids: List[str], mark_failed_as_stale: bool = True
- ) -> Dict[str, Optional[JsonDict]]:
- """
- Like `user_device_resync` but operates on multiple users **from the same origin**
- at once.
- Returns:
- Dict from User ID to the same Dict as `user_device_resync`.
- """
- if not user_ids:
- return {}
- origins = {UserID.from_string(user_id).domain for user_id in user_ids}
- if len(origins) != 1:
- raise InvalidAPICallError(f"Only one origin permitted, got {origins!r}")
- result = {}
- failed = set()
- # TODO(Perf): Actually batch these up
- for user_id in user_ids:
- user_result, user_failed = await self._user_device_resync_returning_failed(
- user_id
- )
- result[user_id] = user_result
- if user_failed:
- failed.add(user_id)
- if mark_failed_as_stale:
- await self.store.mark_remote_users_device_caches_as_stale(failed)
- return result
- async def user_device_resync(
- self, user_id: str, mark_failed_as_stale: bool = True
- ) -> Optional[JsonDict]:
- result, failed = await self._user_device_resync_returning_failed(user_id)
- if failed and mark_failed_as_stale:
- # Mark the remote user's device list as stale so we know we need to retry
- # it later.
- await self.store.mark_remote_users_device_caches_as_stale((user_id,))
- return result
- async def _user_device_resync_returning_failed(
- self, user_id: str
- ) -> Tuple[Optional[JsonDict], bool]:
- """Fetches all devices for a user and updates the device cache with them.
- Args:
- user_id: The user's id whose device_list will be updated.
- Returns:
- - A dict with device info as under the "devices" in the result of this
- request:
- https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
- None when we weren't able to fetch the device info for some reason,
- e.g. due to a connection problem.
- - True iff the resync failed and the device list should be marked as stale.
- """
- logger.debug("Attempting to resync the device list for %s", user_id)
- log_kv({"message": "Doing resync to update device list."})
- # Fetch all devices for the user.
- origin = get_domain_from_id(user_id)
- try:
- result = await self.federation.query_user_devices(origin, user_id)
- except NotRetryingDestination:
- return None, True
- except (RequestSendFailed, HttpResponseException) as e:
- logger.warning(
- "Failed to handle device list update for %s: %s",
- user_id,
- e,
- )
- # We abort on exceptions rather than accepting the update
- # as otherwise synapse will 'forget' that its device list
- # is out of date. If we bail then we will retry the resync
- # next time we get a device list update for this user_id.
- # This makes it more likely that the device lists will
- # eventually become consistent.
- return None, True
- except FederationDeniedError as e:
- set_tag("error", True)
- log_kv({"reason": "FederationDeniedError"})
- logger.info(e)
- return None, False
- except Exception as e:
- set_tag("error", True)
- log_kv(
- {"message": "Exception raised by federation request", "exception": e}
- )
- logger.exception("Failed to handle device list update for %s", user_id)
- return None, True
- log_kv({"result": result})
- stream_id = result["stream_id"]
- devices = result["devices"]
- # Get the master key and the self-signing key for this user if provided in the
- # response (None if not in the response).
- # The response will not contain the user signing key, as this key is only used by
- # its owner, thus it doesn't make sense to send it over federation.
- master_key = result.get("master_key")
- self_signing_key = result.get("self_signing_key")
- ignore_devices = False
- # If the remote server has more than ~1000 devices for this user
- # we assume that something is going horribly wrong (e.g. a bot
- # that logs in and creates a new device every time it tries to
- # send a message). Maintaining lots of devices per user in the
- # cache can cause serious performance issues as if this request
- # takes more than 60s to complete, internal replication from the
- # inbound federation worker to the synapse master may time out
- # causing the inbound federation to fail and causing the remote
- # server to retry, causing a DoS. So in this scenario we give
- # up on storing the total list of devices and only handle the
- # delta instead.
- if len(devices) > 1000:
- logger.warning(
- "Ignoring device list snapshot for %s as it has >1K devs (%d)",
- user_id,
- len(devices),
- )
- devices = []
- ignore_devices = True
- else:
- prev_stream_id = await self.store.get_device_list_last_stream_id_for_remote(
- user_id
- )
- cached_devices = await self.store.get_cached_devices_for_user(user_id)
- # To ensure that a user with no devices is cached, we skip the resync only
- # if we have a stream_id from previously writing a cache entry.
- if prev_stream_id is not None and cached_devices == {
- d["device_id"]: d for d in devices
- }:
- logging.info(
- "Skipping device list resync for %s, as our cache matches already",
- user_id,
- )
- devices = []
- ignore_devices = True
- for device in devices:
- logger.debug(
- "Handling resync update %r/%r, ID: %r",
- user_id,
- device["device_id"],
- stream_id,
- )
- if not ignore_devices:
- await self.store.update_remote_device_list_cache(
- user_id, devices, stream_id
- )
- # mark the cache as valid, whether or not we actually processed any device
- # list updates.
- await self.store.mark_remote_user_device_cache_as_valid(user_id)
- device_ids = [device["device_id"] for device in devices]
- # Handle cross-signing keys.
- cross_signing_device_ids = await self.process_cross_signing_key_update(
- user_id,
- master_key,
- self_signing_key,
- )
- device_ids = device_ids + cross_signing_device_ids
- if device_ids:
- await self.device_handler.notify_device_update(user_id, device_ids)
- # We clobber the seen updates since we've re-synced from a given
- # point.
- self._seen_updates[user_id] = {stream_id}
- return result, False
- async def process_cross_signing_key_update(
- self,
- user_id: str,
- master_key: Optional[JsonDict],
- self_signing_key: Optional[JsonDict],
- ) -> List[str]:
- """Process the given new master and self-signing key for the given remote user.
- Args:
- user_id: The ID of the user these keys are for.
- master_key: The dict of the cross-signing master key as returned by the
- remote server.
- self_signing_key: The dict of the cross-signing self-signing key as returned
- by the remote server.
- Return:
- The device IDs for the given keys.
- """
- device_ids = []
- current_keys_map = await self.store.get_e2e_cross_signing_keys_bulk([user_id])
- current_keys = current_keys_map.get(user_id) or {}
- if master_key and master_key != current_keys.get("master"):
- await self.store.set_e2e_cross_signing_key(user_id, "master", master_key)
- _, verify_key = get_verify_key_from_cross_signing_key(master_key)
- # verify_key is a VerifyKey from signedjson, which uses
- # .version to denote the portion of the key ID after the
- # algorithm and colon, which is the device ID
- device_ids.append(verify_key.version)
- if self_signing_key and self_signing_key != current_keys.get("self_signing"):
- await self.store.set_e2e_cross_signing_key(
- user_id, "self_signing", self_signing_key
- )
- _, verify_key = get_verify_key_from_cross_signing_key(self_signing_key)
- device_ids.append(verify_key.version)
- return device_ids
- async def handle_room_un_partial_stated(self, room_id: str) -> None:
- """Handles sending appropriate device list updates in a room that has
- gone from partial to full state.
- """
- pending_updates = (
- await self.store.get_pending_remote_device_list_updates_for_room(room_id)
- )
- for user_id, device_id in pending_updates:
- logger.info(
- "Got pending device list update in room %s: %s / %s",
- room_id,
- user_id,
- device_id,
- )
- position = await self.store.add_device_change_to_streams(
- user_id,
- [device_id],
- room_ids=[room_id],
- )
- if not position:
- # This should only happen if there are no updates, which
- # shouldn't happen when we've passed in a non-empty set of
- # device IDs.
- continue
- self.device_handler.notifier.on_new_event(
- StreamKeyType.DEVICE_LIST, position, rooms=[room_id]
- )
|