device.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from synapse.api import errors
  16. from synapse.api.constants import EventTypes
  17. from synapse.util import stringutils
  18. from synapse.util.async import Linearizer
  19. from synapse.types import get_domain_from_id
  20. from twisted.internet import defer
  21. from ._base import BaseHandler
  22. import logging
  23. logger = logging.getLogger(__name__)
  24. class DeviceHandler(BaseHandler):
  25. def __init__(self, hs):
  26. super(DeviceHandler, self).__init__(hs)
  27. self.hs = hs
  28. self.state = hs.get_state_handler()
  29. self.federation_sender = hs.get_federation_sender()
  30. self.federation = hs.get_replication_layer()
  31. self._remote_edue_linearizer = Linearizer(name="remote_device_list")
  32. self.federation.register_edu_handler(
  33. "m.device_list_update", self._incoming_device_list_update,
  34. )
  35. self.federation.register_query_handler(
  36. "user_devices", self.on_federation_query_user_devices,
  37. )
  38. hs.get_distributor().observe("user_left_room", self.user_left_room)
  39. @defer.inlineCallbacks
  40. def check_device_registered(self, user_id, device_id,
  41. initial_device_display_name=None):
  42. """
  43. If the given device has not been registered, register it with the
  44. supplied display name.
  45. If no device_id is supplied, we make one up.
  46. Args:
  47. user_id (str): @user:id
  48. device_id (str | None): device id supplied by client
  49. initial_device_display_name (str | None): device display name from
  50. client
  51. Returns:
  52. str: device id (generated if none was supplied)
  53. """
  54. if device_id is not None:
  55. new_device = yield self.store.store_device(
  56. user_id=user_id,
  57. device_id=device_id,
  58. initial_device_display_name=initial_device_display_name,
  59. )
  60. if new_device:
  61. yield self.notify_device_update(user_id, [device_id])
  62. defer.returnValue(device_id)
  63. # if the device id is not specified, we'll autogen one, but loop a few
  64. # times in case of a clash.
  65. attempts = 0
  66. while attempts < 5:
  67. device_id = stringutils.random_string(10).upper()
  68. new_device = yield self.store.store_device(
  69. user_id=user_id,
  70. device_id=device_id,
  71. initial_device_display_name=initial_device_display_name,
  72. )
  73. if new_device:
  74. yield self.notify_device_update(user_id, [device_id])
  75. defer.returnValue(device_id)
  76. attempts += 1
  77. raise errors.StoreError(500, "Couldn't generate a device ID.")
  78. @defer.inlineCallbacks
  79. def get_devices_by_user(self, user_id):
  80. """
  81. Retrieve the given user's devices
  82. Args:
  83. user_id (str):
  84. Returns:
  85. defer.Deferred: list[dict[str, X]]: info on each device
  86. """
  87. device_map = yield self.store.get_devices_by_user(user_id)
  88. ips = yield self.store.get_last_client_ip_by_device(
  89. devices=((user_id, device_id) for device_id in device_map.keys())
  90. )
  91. devices = device_map.values()
  92. for device in devices:
  93. _update_device_from_client_ips(device, ips)
  94. defer.returnValue(devices)
  95. @defer.inlineCallbacks
  96. def get_device(self, user_id, device_id):
  97. """ Retrieve the given device
  98. Args:
  99. user_id (str):
  100. device_id (str):
  101. Returns:
  102. defer.Deferred: dict[str, X]: info on the device
  103. Raises:
  104. errors.NotFoundError: if the device was not found
  105. """
  106. try:
  107. device = yield self.store.get_device(user_id, device_id)
  108. except errors.StoreError:
  109. raise errors.NotFoundError
  110. ips = yield self.store.get_last_client_ip_by_device(
  111. devices=((user_id, device_id),)
  112. )
  113. _update_device_from_client_ips(device, ips)
  114. defer.returnValue(device)
  115. @defer.inlineCallbacks
  116. def delete_device(self, user_id, device_id):
  117. """ Delete the given device
  118. Args:
  119. user_id (str):
  120. device_id (str):
  121. Returns:
  122. defer.Deferred:
  123. """
  124. try:
  125. yield self.store.delete_device(user_id, device_id)
  126. except errors.StoreError, e:
  127. if e.code == 404:
  128. # no match
  129. pass
  130. else:
  131. raise
  132. yield self.store.user_delete_access_tokens(
  133. user_id, device_id=device_id,
  134. delete_refresh_tokens=True,
  135. )
  136. yield self.store.delete_e2e_keys_by_device(
  137. user_id=user_id, device_id=device_id
  138. )
  139. yield self.notify_device_update(user_id, [device_id])
  140. @defer.inlineCallbacks
  141. def update_device(self, user_id, device_id, content):
  142. """ Update the given device
  143. Args:
  144. user_id (str):
  145. device_id (str):
  146. content (dict): body of update request
  147. Returns:
  148. defer.Deferred:
  149. """
  150. try:
  151. yield self.store.update_device(
  152. user_id,
  153. device_id,
  154. new_display_name=content.get("display_name")
  155. )
  156. yield self.notify_device_update(user_id, [device_id])
  157. except errors.StoreError, e:
  158. if e.code == 404:
  159. raise errors.NotFoundError()
  160. else:
  161. raise
  162. @defer.inlineCallbacks
  163. def notify_device_update(self, user_id, device_ids):
  164. """Notify that a user's device(s) has changed. Pokes the notifier, and
  165. remote servers if the user is local.
  166. """
  167. rooms = yield self.store.get_rooms_for_user(user_id)
  168. room_ids = [r.room_id for r in rooms]
  169. hosts = set()
  170. if self.hs.is_mine_id(user_id):
  171. for room_id in room_ids:
  172. users = yield self.store.get_users_in_room(room_id)
  173. hosts.update(get_domain_from_id(u) for u in users)
  174. hosts.discard(self.server_name)
  175. position = yield self.store.add_device_change_to_streams(
  176. user_id, device_ids, list(hosts)
  177. )
  178. yield self.notifier.on_new_event(
  179. "device_list_key", position, rooms=room_ids,
  180. )
  181. if hosts:
  182. logger.info("Sending device list update notif to: %r", hosts)
  183. for host in hosts:
  184. self.federation_sender.send_device_messages(host)
  185. @defer.inlineCallbacks
  186. def get_user_ids_changed(self, user_id, from_token):
  187. """Get list of users that have had the devices updated, or have newly
  188. joined a room, that `user_id` may be interested in.
  189. Args:
  190. user_id (str)
  191. from_token (StreamToken)
  192. """
  193. rooms = yield self.store.get_rooms_for_user(user_id)
  194. room_ids = set(r.room_id for r in rooms)
  195. # First we check if any devices have changed
  196. changed = yield self.store.get_user_whose_devices_changed(
  197. from_token.device_list_key
  198. )
  199. # Then work out if any users have since joined
  200. rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
  201. possibly_changed = set(changed)
  202. for room_id in rooms_changed:
  203. # Fetch (an approximation) of the current state at the time.
  204. event_rows, token = yield self.store.get_recent_event_ids_for_room(
  205. room_id, end_token=from_token.room_key, limit=1,
  206. )
  207. if event_rows:
  208. last_event_id = event_rows[-1]["event_id"]
  209. prev_state_ids = yield self.store.get_state_ids_for_event(last_event_id)
  210. else:
  211. prev_state_ids = {}
  212. current_state_ids = yield self.state.get_current_state_ids(room_id)
  213. # If there has been any change in membership, include them in the
  214. # possibly changed list. We'll check if they are joined below,
  215. # and we're not toooo worried about spuriously adding users.
  216. for key, event_id in current_state_ids.iteritems():
  217. etype, state_key = key
  218. if etype == EventTypes.Member:
  219. prev_event_id = prev_state_ids.get(key, None)
  220. if not prev_event_id or prev_event_id != event_id:
  221. possibly_changed.add(state_key)
  222. user_ids_changed = set()
  223. for other_user_id in possibly_changed:
  224. other_rooms = yield self.store.get_rooms_for_user(other_user_id)
  225. if room_ids.intersection(e.room_id for e in other_rooms):
  226. user_ids_changed.add(other_user_id)
  227. defer.returnValue(user_ids_changed)
  228. @defer.inlineCallbacks
  229. def _incoming_device_list_update(self, origin, edu_content):
  230. user_id = edu_content["user_id"]
  231. device_id = edu_content["device_id"]
  232. stream_id = edu_content["stream_id"]
  233. prev_ids = edu_content.get("prev_id", [])
  234. if get_domain_from_id(user_id) != origin:
  235. # TODO: Raise?
  236. logger.warning("Got device list update edu for %r from %r", user_id, origin)
  237. return
  238. rooms = yield self.store.get_rooms_for_user(user_id)
  239. if not rooms:
  240. # We don't share any rooms with this user. Ignore update, as we
  241. # probably won't get any further updates.
  242. return
  243. with (yield self._remote_edue_linearizer.queue(user_id)):
  244. # If the prev id matches whats in our cache table, then we don't need
  245. # to resync the users device list, otherwise we do.
  246. resync = True
  247. if len(prev_ids) == 1:
  248. extremity = yield self.store.get_device_list_last_stream_id_for_remote(
  249. user_id
  250. )
  251. logger.info("Extrem: %r, prev_ids: %r", extremity, prev_ids)
  252. if str(extremity) == str(prev_ids[0]):
  253. resync = False
  254. if resync:
  255. # Fetch all devices for the user.
  256. result = yield self.federation.query_user_devices(origin, user_id)
  257. stream_id = result["stream_id"]
  258. devices = result["devices"]
  259. yield self.store.update_remote_device_list_cache(
  260. user_id, devices, stream_id,
  261. )
  262. device_ids = [device["device_id"] for device in devices]
  263. yield self.notify_device_update(user_id, device_ids)
  264. else:
  265. # Simply update the single device, since we know that is the only
  266. # change (becuase of the single prev_id matching the current cache)
  267. content = dict(edu_content)
  268. for key in ("user_id", "device_id", "stream_id", "prev_ids"):
  269. content.pop(key, None)
  270. yield self.store.update_remote_device_list_cache_entry(
  271. user_id, device_id, content, stream_id,
  272. )
  273. yield self.notify_device_update(user_id, [device_id])
  274. @defer.inlineCallbacks
  275. def on_federation_query_user_devices(self, user_id):
  276. stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
  277. defer.returnValue({
  278. "user_id": user_id,
  279. "stream_id": stream_id,
  280. "devices": devices,
  281. })
  282. @defer.inlineCallbacks
  283. def user_left_room(self, user, room_id):
  284. user_id = user.to_string()
  285. rooms = yield self.store.get_rooms_for_user(user_id)
  286. if not rooms:
  287. # We no longer share rooms with this user, so we'll no longer
  288. # receive device updates. Mark this in DB.
  289. yield self.store.mark_remote_user_device_list_as_unsubscribed(user_id)
  290. def _update_device_from_client_ips(device, client_ips):
  291. ip = client_ips.get((device["user_id"], device["device_id"]), {})
  292. device.update({
  293. "last_seen_ts": ip.get("last_seen"),
  294. "last_seen_ip": ip.get("ip"),
  295. })