123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352 |
- # -*- coding: utf-8 -*-
- # Copyright 2016 OpenMarket Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- from synapse.api import errors
- from synapse.api.constants import EventTypes
- from synapse.util import stringutils
- from synapse.util.async import Linearizer
- from synapse.types import get_domain_from_id
- from twisted.internet import defer
- from ._base import BaseHandler
- import logging
- logger = logging.getLogger(__name__)
- class DeviceHandler(BaseHandler):
- def __init__(self, hs):
- super(DeviceHandler, self).__init__(hs)
- self.hs = hs
- self.state = hs.get_state_handler()
- self.federation_sender = hs.get_federation_sender()
- self.federation = hs.get_replication_layer()
- self._remote_edue_linearizer = Linearizer(name="remote_device_list")
- self.federation.register_edu_handler(
- "m.device_list_update", self._incoming_device_list_update,
- )
- self.federation.register_query_handler(
- "user_devices", self.on_federation_query_user_devices,
- )
- hs.get_distributor().observe("user_left_room", self.user_left_room)
- @defer.inlineCallbacks
- def check_device_registered(self, user_id, device_id,
- initial_device_display_name=None):
- """
- If the given device has not been registered, register it with the
- supplied display name.
- If no device_id is supplied, we make one up.
- Args:
- user_id (str): @user:id
- device_id (str | None): device id supplied by client
- initial_device_display_name (str | None): device display name from
- client
- Returns:
- str: device id (generated if none was supplied)
- """
- if device_id is not None:
- new_device = yield self.store.store_device(
- user_id=user_id,
- device_id=device_id,
- initial_device_display_name=initial_device_display_name,
- )
- if new_device:
- yield self.notify_device_update(user_id, [device_id])
- defer.returnValue(device_id)
- # if the device id is not specified, we'll autogen one, but loop a few
- # times in case of a clash.
- attempts = 0
- while attempts < 5:
- device_id = stringutils.random_string(10).upper()
- new_device = yield self.store.store_device(
- user_id=user_id,
- device_id=device_id,
- initial_device_display_name=initial_device_display_name,
- )
- if new_device:
- yield self.notify_device_update(user_id, [device_id])
- defer.returnValue(device_id)
- attempts += 1
- raise errors.StoreError(500, "Couldn't generate a device ID.")
- @defer.inlineCallbacks
- def get_devices_by_user(self, user_id):
- """
- Retrieve the given user's devices
- Args:
- user_id (str):
- Returns:
- defer.Deferred: list[dict[str, X]]: info on each device
- """
- device_map = yield self.store.get_devices_by_user(user_id)
- ips = yield self.store.get_last_client_ip_by_device(
- devices=((user_id, device_id) for device_id in device_map.keys())
- )
- devices = device_map.values()
- for device in devices:
- _update_device_from_client_ips(device, ips)
- defer.returnValue(devices)
- @defer.inlineCallbacks
- def get_device(self, user_id, device_id):
- """ Retrieve the given device
- Args:
- user_id (str):
- device_id (str):
- Returns:
- defer.Deferred: dict[str, X]: info on the device
- Raises:
- errors.NotFoundError: if the device was not found
- """
- try:
- device = yield self.store.get_device(user_id, device_id)
- except errors.StoreError:
- raise errors.NotFoundError
- ips = yield self.store.get_last_client_ip_by_device(
- devices=((user_id, device_id),)
- )
- _update_device_from_client_ips(device, ips)
- defer.returnValue(device)
- @defer.inlineCallbacks
- def delete_device(self, user_id, device_id):
- """ Delete the given device
- Args:
- user_id (str):
- device_id (str):
- Returns:
- defer.Deferred:
- """
- try:
- yield self.store.delete_device(user_id, device_id)
- except errors.StoreError, e:
- if e.code == 404:
- # no match
- pass
- else:
- raise
- yield self.store.user_delete_access_tokens(
- user_id, device_id=device_id,
- delete_refresh_tokens=True,
- )
- yield self.store.delete_e2e_keys_by_device(
- user_id=user_id, device_id=device_id
- )
- yield self.notify_device_update(user_id, [device_id])
- @defer.inlineCallbacks
- def update_device(self, user_id, device_id, content):
- """ Update the given device
- Args:
- user_id (str):
- device_id (str):
- content (dict): body of update request
- Returns:
- defer.Deferred:
- """
- try:
- yield self.store.update_device(
- user_id,
- device_id,
- new_display_name=content.get("display_name")
- )
- yield self.notify_device_update(user_id, [device_id])
- except errors.StoreError, e:
- if e.code == 404:
- raise errors.NotFoundError()
- else:
- raise
- @defer.inlineCallbacks
- def notify_device_update(self, user_id, device_ids):
- """Notify that a user's device(s) has changed. Pokes the notifier, and
- remote servers if the user is local.
- """
- rooms = yield self.store.get_rooms_for_user(user_id)
- room_ids = [r.room_id for r in rooms]
- hosts = set()
- if self.hs.is_mine_id(user_id):
- for room_id in room_ids:
- users = yield self.store.get_users_in_room(room_id)
- hosts.update(get_domain_from_id(u) for u in users)
- hosts.discard(self.server_name)
- position = yield self.store.add_device_change_to_streams(
- user_id, device_ids, list(hosts)
- )
- yield self.notifier.on_new_event(
- "device_list_key", position, rooms=room_ids,
- )
- if hosts:
- logger.info("Sending device list update notif to: %r", hosts)
- for host in hosts:
- self.federation_sender.send_device_messages(host)
- @defer.inlineCallbacks
- def get_user_ids_changed(self, user_id, from_token):
- """Get list of users that have had the devices updated, or have newly
- joined a room, that `user_id` may be interested in.
- Args:
- user_id (str)
- from_token (StreamToken)
- """
- rooms = yield self.store.get_rooms_for_user(user_id)
- room_ids = set(r.room_id for r in rooms)
- # First we check if any devices have changed
- changed = yield self.store.get_user_whose_devices_changed(
- from_token.device_list_key
- )
- # Then work out if any users have since joined
- rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
- possibly_changed = set(changed)
- for room_id in rooms_changed:
- # Fetch (an approximation) of the current state at the time.
- event_rows, token = yield self.store.get_recent_event_ids_for_room(
- room_id, end_token=from_token.room_key, limit=1,
- )
- if event_rows:
- last_event_id = event_rows[-1]["event_id"]
- prev_state_ids = yield self.store.get_state_ids_for_event(last_event_id)
- else:
- prev_state_ids = {}
- current_state_ids = yield self.state.get_current_state_ids(room_id)
- # If there has been any change in membership, include them in the
- # possibly changed list. We'll check if they are joined below,
- # and we're not toooo worried about spuriously adding users.
- for key, event_id in current_state_ids.iteritems():
- etype, state_key = key
- if etype == EventTypes.Member:
- prev_event_id = prev_state_ids.get(key, None)
- if not prev_event_id or prev_event_id != event_id:
- possibly_changed.add(state_key)
- user_ids_changed = set()
- for other_user_id in possibly_changed:
- other_rooms = yield self.store.get_rooms_for_user(other_user_id)
- if room_ids.intersection(e.room_id for e in other_rooms):
- user_ids_changed.add(other_user_id)
- defer.returnValue(user_ids_changed)
- @defer.inlineCallbacks
- def _incoming_device_list_update(self, origin, edu_content):
- user_id = edu_content["user_id"]
- device_id = edu_content["device_id"]
- stream_id = edu_content["stream_id"]
- prev_ids = edu_content.get("prev_id", [])
- if get_domain_from_id(user_id) != origin:
- # TODO: Raise?
- logger.warning("Got device list update edu for %r from %r", user_id, origin)
- return
- rooms = yield self.store.get_rooms_for_user(user_id)
- if not rooms:
- # We don't share any rooms with this user. Ignore update, as we
- # probably won't get any further updates.
- return
- with (yield self._remote_edue_linearizer.queue(user_id)):
- # If the prev id matches whats in our cache table, then we don't need
- # to resync the users device list, otherwise we do.
- resync = True
- if len(prev_ids) == 1:
- extremity = yield self.store.get_device_list_last_stream_id_for_remote(
- user_id
- )
- logger.info("Extrem: %r, prev_ids: %r", extremity, prev_ids)
- if str(extremity) == str(prev_ids[0]):
- resync = False
- if resync:
- # Fetch all devices for the user.
- result = yield self.federation.query_user_devices(origin, user_id)
- stream_id = result["stream_id"]
- devices = result["devices"]
- yield self.store.update_remote_device_list_cache(
- user_id, devices, stream_id,
- )
- device_ids = [device["device_id"] for device in devices]
- yield self.notify_device_update(user_id, device_ids)
- else:
- # Simply update the single device, since we know that is the only
- # change (becuase of the single prev_id matching the current cache)
- content = dict(edu_content)
- for key in ("user_id", "device_id", "stream_id", "prev_ids"):
- content.pop(key, None)
- yield self.store.update_remote_device_list_cache_entry(
- user_id, device_id, content, stream_id,
- )
- yield self.notify_device_update(user_id, [device_id])
- @defer.inlineCallbacks
- def on_federation_query_user_devices(self, user_id):
- stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
- defer.returnValue({
- "user_id": user_id,
- "stream_id": stream_id,
- "devices": devices,
- })
- @defer.inlineCallbacks
- def user_left_room(self, user, room_id):
- user_id = user.to_string()
- rooms = yield self.store.get_rooms_for_user(user_id)
- if not rooms:
- # We no longer share rooms with this user, so we'll no longer
- # receive device updates. Mark this in DB.
- yield self.store.mark_remote_user_device_list_as_unsubscribed(user_id)
- def _update_device_from_client_ips(device, client_ips):
- ip = client_ips.get((device["user_id"], device["device_id"]), {})
- device.update({
- "last_seen_ts": ip.get("last_seen"),
- "last_seen_ip": ip.get("ip"),
- })
|