device.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. from six import iteritems, itervalues
  17. from twisted.internet import defer
  18. from synapse.api import errors
  19. from synapse.api.constants import EventTypes
  20. from synapse.api.errors import (
  21. FederationDeniedError,
  22. HttpResponseException,
  23. RequestSendFailed,
  24. )
  25. from synapse.types import RoomStreamToken, get_domain_from_id
  26. from synapse.util import stringutils
  27. from synapse.util.async_helpers import Linearizer
  28. from synapse.util.caches.expiringcache import ExpiringCache
  29. from synapse.util.metrics import measure_func
  30. from synapse.util.retryutils import NotRetryingDestination
  31. from ._base import BaseHandler
  32. logger = logging.getLogger(__name__)
  33. class DeviceWorkerHandler(BaseHandler):
  34. def __init__(self, hs):
  35. super(DeviceWorkerHandler, self).__init__(hs)
  36. self.hs = hs
  37. self.state = hs.get_state_handler()
  38. self._auth_handler = hs.get_auth_handler()
  39. @defer.inlineCallbacks
  40. def get_devices_by_user(self, user_id):
  41. """
  42. Retrieve the given user's devices
  43. Args:
  44. user_id (str):
  45. Returns:
  46. defer.Deferred: list[dict[str, X]]: info on each device
  47. """
  48. device_map = yield self.store.get_devices_by_user(user_id)
  49. ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None)
  50. devices = list(device_map.values())
  51. for device in devices:
  52. _update_device_from_client_ips(device, ips)
  53. return devices
  54. @defer.inlineCallbacks
  55. def get_device(self, user_id, device_id):
  56. """ Retrieve the given device
  57. Args:
  58. user_id (str):
  59. device_id (str):
  60. Returns:
  61. defer.Deferred: dict[str, X]: info on the device
  62. Raises:
  63. errors.NotFoundError: if the device was not found
  64. """
  65. try:
  66. device = yield self.store.get_device(user_id, device_id)
  67. except errors.StoreError:
  68. raise errors.NotFoundError
  69. ips = yield self.store.get_last_client_ip_by_device(user_id, device_id)
  70. _update_device_from_client_ips(device, ips)
  71. return device
  72. @measure_func("device.get_user_ids_changed")
  73. @defer.inlineCallbacks
  74. def get_user_ids_changed(self, user_id, from_token):
  75. """Get list of users that have had the devices updated, or have newly
  76. joined a room, that `user_id` may be interested in.
  77. Args:
  78. user_id (str)
  79. from_token (StreamToken)
  80. """
  81. now_room_key = yield self.store.get_room_events_max_id()
  82. room_ids = yield self.store.get_rooms_for_user(user_id)
  83. # First we check if any devices have changed for users that we share
  84. # rooms with.
  85. users_who_share_room = yield self.store.get_users_who_share_room_with_user(
  86. user_id
  87. )
  88. changed = yield self.store.get_users_whose_devices_changed(
  89. from_token.device_list_key, users_who_share_room
  90. )
  91. # Then work out if any users have since joined
  92. rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
  93. member_events = yield self.store.get_membership_changes_for_user(
  94. user_id, from_token.room_key, now_room_key
  95. )
  96. rooms_changed.update(event.room_id for event in member_events)
  97. stream_ordering = RoomStreamToken.parse_stream_token(from_token.room_key).stream
  98. possibly_changed = set(changed)
  99. possibly_left = set()
  100. for room_id in rooms_changed:
  101. current_state_ids = yield self.store.get_current_state_ids(room_id)
  102. # The user may have left the room
  103. # TODO: Check if they actually did or if we were just invited.
  104. if room_id not in room_ids:
  105. for key, event_id in iteritems(current_state_ids):
  106. etype, state_key = key
  107. if etype != EventTypes.Member:
  108. continue
  109. possibly_left.add(state_key)
  110. continue
  111. # Fetch the current state at the time.
  112. try:
  113. event_ids = yield self.store.get_forward_extremeties_for_room(
  114. room_id, stream_ordering=stream_ordering
  115. )
  116. except errors.StoreError:
  117. # we have purged the stream_ordering index since the stream
  118. # ordering: treat it the same as a new room
  119. event_ids = []
  120. # special-case for an empty prev state: include all members
  121. # in the changed list
  122. if not event_ids:
  123. for key, event_id in iteritems(current_state_ids):
  124. etype, state_key = key
  125. if etype != EventTypes.Member:
  126. continue
  127. possibly_changed.add(state_key)
  128. continue
  129. current_member_id = current_state_ids.get((EventTypes.Member, user_id))
  130. if not current_member_id:
  131. continue
  132. # mapping from event_id -> state_dict
  133. prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
  134. # Check if we've joined the room? If so we just blindly add all the users to
  135. # the "possibly changed" users.
  136. for state_dict in itervalues(prev_state_ids):
  137. member_event = state_dict.get((EventTypes.Member, user_id), None)
  138. if not member_event or member_event != current_member_id:
  139. for key, event_id in iteritems(current_state_ids):
  140. etype, state_key = key
  141. if etype != EventTypes.Member:
  142. continue
  143. possibly_changed.add(state_key)
  144. break
  145. # If there has been any change in membership, include them in the
  146. # possibly changed list. We'll check if they are joined below,
  147. # and we're not toooo worried about spuriously adding users.
  148. for key, event_id in iteritems(current_state_ids):
  149. etype, state_key = key
  150. if etype != EventTypes.Member:
  151. continue
  152. # check if this member has changed since any of the extremities
  153. # at the stream_ordering, and add them to the list if so.
  154. for state_dict in itervalues(prev_state_ids):
  155. prev_event_id = state_dict.get(key, None)
  156. if not prev_event_id or prev_event_id != event_id:
  157. if state_key != user_id:
  158. possibly_changed.add(state_key)
  159. break
  160. if possibly_changed or possibly_left:
  161. # Take the intersection of the users whose devices may have changed
  162. # and those that actually still share a room with the user
  163. possibly_joined = possibly_changed & users_who_share_room
  164. possibly_left = (possibly_changed | possibly_left) - users_who_share_room
  165. else:
  166. possibly_joined = []
  167. possibly_left = []
  168. return {"changed": list(possibly_joined), "left": list(possibly_left)}
  169. class DeviceHandler(DeviceWorkerHandler):
  170. def __init__(self, hs):
  171. super(DeviceHandler, self).__init__(hs)
  172. self.federation_sender = hs.get_federation_sender()
  173. self._edu_updater = DeviceListEduUpdater(hs, self)
  174. federation_registry = hs.get_federation_registry()
  175. federation_registry.register_edu_handler(
  176. "m.device_list_update", self._edu_updater.incoming_device_list_update
  177. )
  178. federation_registry.register_query_handler(
  179. "user_devices", self.on_federation_query_user_devices
  180. )
  181. hs.get_distributor().observe("user_left_room", self.user_left_room)
  182. @defer.inlineCallbacks
  183. def check_device_registered(
  184. self, user_id, device_id, initial_device_display_name=None
  185. ):
  186. """
  187. If the given device has not been registered, register it with the
  188. supplied display name.
  189. If no device_id is supplied, we make one up.
  190. Args:
  191. user_id (str): @user:id
  192. device_id (str | None): device id supplied by client
  193. initial_device_display_name (str | None): device display name from
  194. client
  195. Returns:
  196. str: device id (generated if none was supplied)
  197. """
  198. if device_id is not None:
  199. new_device = yield self.store.store_device(
  200. user_id=user_id,
  201. device_id=device_id,
  202. initial_device_display_name=initial_device_display_name,
  203. )
  204. if new_device:
  205. yield self.notify_device_update(user_id, [device_id])
  206. return device_id
  207. # if the device id is not specified, we'll autogen one, but loop a few
  208. # times in case of a clash.
  209. attempts = 0
  210. while attempts < 5:
  211. device_id = stringutils.random_string(10).upper()
  212. new_device = yield self.store.store_device(
  213. user_id=user_id,
  214. device_id=device_id,
  215. initial_device_display_name=initial_device_display_name,
  216. )
  217. if new_device:
  218. yield self.notify_device_update(user_id, [device_id])
  219. return device_id
  220. attempts += 1
  221. raise errors.StoreError(500, "Couldn't generate a device ID.")
  222. @defer.inlineCallbacks
  223. def delete_device(self, user_id, device_id):
  224. """ Delete the given device
  225. Args:
  226. user_id (str):
  227. device_id (str):
  228. Returns:
  229. defer.Deferred:
  230. """
  231. try:
  232. yield self.store.delete_device(user_id, device_id)
  233. except errors.StoreError as e:
  234. if e.code == 404:
  235. # no match
  236. pass
  237. else:
  238. raise
  239. yield self._auth_handler.delete_access_tokens_for_user(
  240. user_id, device_id=device_id
  241. )
  242. yield self.store.delete_e2e_keys_by_device(user_id=user_id, device_id=device_id)
  243. yield self.notify_device_update(user_id, [device_id])
  244. @defer.inlineCallbacks
  245. def delete_all_devices_for_user(self, user_id, except_device_id=None):
  246. """Delete all of the user's devices
  247. Args:
  248. user_id (str):
  249. except_device_id (str|None): optional device id which should not
  250. be deleted
  251. Returns:
  252. defer.Deferred:
  253. """
  254. device_map = yield self.store.get_devices_by_user(user_id)
  255. device_ids = list(device_map)
  256. if except_device_id is not None:
  257. device_ids = [d for d in device_ids if d != except_device_id]
  258. yield self.delete_devices(user_id, device_ids)
  259. @defer.inlineCallbacks
  260. def delete_devices(self, user_id, device_ids):
  261. """ Delete several devices
  262. Args:
  263. user_id (str):
  264. device_ids (List[str]): The list of device IDs to delete
  265. Returns:
  266. defer.Deferred:
  267. """
  268. try:
  269. yield self.store.delete_devices(user_id, device_ids)
  270. except errors.StoreError as e:
  271. if e.code == 404:
  272. # no match
  273. pass
  274. else:
  275. raise
  276. # Delete access tokens and e2e keys for each device. Not optimised as it is not
  277. # considered as part of a critical path.
  278. for device_id in device_ids:
  279. yield self._auth_handler.delete_access_tokens_for_user(
  280. user_id, device_id=device_id
  281. )
  282. yield self.store.delete_e2e_keys_by_device(
  283. user_id=user_id, device_id=device_id
  284. )
  285. yield self.notify_device_update(user_id, device_ids)
  286. @defer.inlineCallbacks
  287. def update_device(self, user_id, device_id, content):
  288. """ Update the given device
  289. Args:
  290. user_id (str):
  291. device_id (str):
  292. content (dict): body of update request
  293. Returns:
  294. defer.Deferred:
  295. """
  296. try:
  297. yield self.store.update_device(
  298. user_id, device_id, new_display_name=content.get("display_name")
  299. )
  300. yield self.notify_device_update(user_id, [device_id])
  301. except errors.StoreError as e:
  302. if e.code == 404:
  303. raise errors.NotFoundError()
  304. else:
  305. raise
  306. @measure_func("notify_device_update")
  307. @defer.inlineCallbacks
  308. def notify_device_update(self, user_id, device_ids):
  309. """Notify that a user's device(s) has changed. Pokes the notifier, and
  310. remote servers if the user is local.
  311. """
  312. users_who_share_room = yield self.store.get_users_who_share_room_with_user(
  313. user_id
  314. )
  315. hosts = set()
  316. if self.hs.is_mine_id(user_id):
  317. hosts.update(get_domain_from_id(u) for u in users_who_share_room)
  318. hosts.discard(self.server_name)
  319. position = yield self.store.add_device_change_to_streams(
  320. user_id, device_ids, list(hosts)
  321. )
  322. for device_id in device_ids:
  323. logger.debug(
  324. "Notifying about update %r/%r, ID: %r", user_id, device_id, position
  325. )
  326. room_ids = yield self.store.get_rooms_for_user(user_id)
  327. yield self.notifier.on_new_event("device_list_key", position, rooms=room_ids)
  328. if hosts:
  329. logger.info(
  330. "Sending device list update notif for %r to: %r", user_id, hosts
  331. )
  332. for host in hosts:
  333. self.federation_sender.send_device_messages(host)
  334. @defer.inlineCallbacks
  335. def on_federation_query_user_devices(self, user_id):
  336. stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
  337. return {"user_id": user_id, "stream_id": stream_id, "devices": devices}
  338. @defer.inlineCallbacks
  339. def user_left_room(self, user, room_id):
  340. user_id = user.to_string()
  341. room_ids = yield self.store.get_rooms_for_user(user_id)
  342. if not room_ids:
  343. # We no longer share rooms with this user, so we'll no longer
  344. # receive device updates. Mark this in DB.
  345. yield self.store.mark_remote_user_device_list_as_unsubscribed(user_id)
  346. def _update_device_from_client_ips(device, client_ips):
  347. ip = client_ips.get((device["user_id"], device["device_id"]), {})
  348. device.update({"last_seen_ts": ip.get("last_seen"), "last_seen_ip": ip.get("ip")})
  349. class DeviceListEduUpdater(object):
  350. "Handles incoming device list updates from federation and updates the DB"
  351. def __init__(self, hs, device_handler):
  352. self.store = hs.get_datastore()
  353. self.federation = hs.get_federation_client()
  354. self.clock = hs.get_clock()
  355. self.device_handler = device_handler
  356. self._remote_edu_linearizer = Linearizer(name="remote_device_list")
  357. # user_id -> list of updates waiting to be handled.
  358. self._pending_updates = {}
  359. # Recently seen stream ids. We don't bother keeping these in the DB,
  360. # but they're useful to have them about to reduce the number of spurious
  361. # resyncs.
  362. self._seen_updates = ExpiringCache(
  363. cache_name="device_update_edu",
  364. clock=self.clock,
  365. max_len=10000,
  366. expiry_ms=30 * 60 * 1000,
  367. iterable=True,
  368. )
  369. @defer.inlineCallbacks
  370. def incoming_device_list_update(self, origin, edu_content):
  371. """Called on incoming device list update from federation. Responsible
  372. for parsing the EDU and adding to pending updates list.
  373. """
  374. user_id = edu_content.pop("user_id")
  375. device_id = edu_content.pop("device_id")
  376. stream_id = str(edu_content.pop("stream_id")) # They may come as ints
  377. prev_ids = edu_content.pop("prev_id", [])
  378. prev_ids = [str(p) for p in prev_ids] # They may come as ints
  379. if get_domain_from_id(user_id) != origin:
  380. # TODO: Raise?
  381. logger.warning(
  382. "Got device list update edu for %r/%r from %r",
  383. user_id,
  384. device_id,
  385. origin,
  386. )
  387. return
  388. room_ids = yield self.store.get_rooms_for_user(user_id)
  389. if not room_ids:
  390. # We don't share any rooms with this user. Ignore update, as we
  391. # probably won't get any further updates.
  392. logger.warning(
  393. "Got device list update edu for %r/%r, but don't share a room",
  394. user_id,
  395. device_id,
  396. )
  397. return
  398. logger.debug("Received device list update for %r/%r", user_id, device_id)
  399. self._pending_updates.setdefault(user_id, []).append(
  400. (device_id, stream_id, prev_ids, edu_content)
  401. )
  402. yield self._handle_device_updates(user_id)
  403. @measure_func("_incoming_device_list_update")
  404. @defer.inlineCallbacks
  405. def _handle_device_updates(self, user_id):
  406. "Actually handle pending updates."
  407. with (yield self._remote_edu_linearizer.queue(user_id)):
  408. pending_updates = self._pending_updates.pop(user_id, [])
  409. if not pending_updates:
  410. # This can happen since we batch updates
  411. return
  412. for device_id, stream_id, prev_ids, content in pending_updates:
  413. logger.debug(
  414. "Handling update %r/%r, ID: %r, prev: %r ",
  415. user_id,
  416. device_id,
  417. stream_id,
  418. prev_ids,
  419. )
  420. # Given a list of updates we check if we need to resync. This
  421. # happens if we've missed updates.
  422. resync = yield self._need_to_do_resync(user_id, pending_updates)
  423. logger.debug("Need to re-sync devices for %r? %r", user_id, resync)
  424. if resync:
  425. # Fetch all devices for the user.
  426. origin = get_domain_from_id(user_id)
  427. try:
  428. result = yield self.federation.query_user_devices(origin, user_id)
  429. except (
  430. NotRetryingDestination,
  431. RequestSendFailed,
  432. HttpResponseException,
  433. ):
  434. # TODO: Remember that we are now out of sync and try again
  435. # later
  436. logger.warn("Failed to handle device list update for %s", user_id)
  437. # We abort on exceptions rather than accepting the update
  438. # as otherwise synapse will 'forget' that its device list
  439. # is out of date. If we bail then we will retry the resync
  440. # next time we get a device list update for this user_id.
  441. # This makes it more likely that the device lists will
  442. # eventually become consistent.
  443. return
  444. except FederationDeniedError as e:
  445. logger.info(e)
  446. return
  447. except Exception:
  448. # TODO: Remember that we are now out of sync and try again
  449. # later
  450. logger.exception(
  451. "Failed to handle device list update for %s", user_id
  452. )
  453. return
  454. stream_id = result["stream_id"]
  455. devices = result["devices"]
  456. # If the remote server has more than ~1000 devices for this user
  457. # we assume that something is going horribly wrong (e.g. a bot
  458. # that logs in and creates a new device every time it tries to
  459. # send a message). Maintaining lots of devices per user in the
  460. # cache can cause serious performance issues as if this request
  461. # takes more than 60s to complete, internal replication from the
  462. # inbound federation worker to the synapse master may time out
  463. # causing the inbound federation to fail and causing the remote
  464. # server to retry, causing a DoS. So in this scenario we give
  465. # up on storing the total list of devices and only handle the
  466. # delta instead.
  467. if len(devices) > 1000:
  468. logger.warn(
  469. "Ignoring device list snapshot for %s as it has >1K devs (%d)",
  470. user_id,
  471. len(devices),
  472. )
  473. devices = []
  474. for device in devices:
  475. logger.debug(
  476. "Handling resync update %r/%r, ID: %r",
  477. user_id,
  478. device["device_id"],
  479. stream_id,
  480. )
  481. yield self.store.update_remote_device_list_cache(
  482. user_id, devices, stream_id
  483. )
  484. device_ids = [device["device_id"] for device in devices]
  485. yield self.device_handler.notify_device_update(user_id, device_ids)
  486. # We clobber the seen updates since we've re-synced from a given
  487. # point.
  488. self._seen_updates[user_id] = set([stream_id])
  489. else:
  490. # Simply update the single device, since we know that is the only
  491. # change (because of the single prev_id matching the current cache)
  492. for device_id, stream_id, prev_ids, content in pending_updates:
  493. yield self.store.update_remote_device_list_cache_entry(
  494. user_id, device_id, content, stream_id
  495. )
  496. yield self.device_handler.notify_device_update(
  497. user_id, [device_id for device_id, _, _, _ in pending_updates]
  498. )
  499. self._seen_updates.setdefault(user_id, set()).update(
  500. stream_id for _, stream_id, _, _ in pending_updates
  501. )
  502. @defer.inlineCallbacks
  503. def _need_to_do_resync(self, user_id, updates):
  504. """Given a list of updates for a user figure out if we need to do a full
  505. resync, or whether we have enough data that we can just apply the delta.
  506. """
  507. seen_updates = self._seen_updates.get(user_id, set())
  508. extremity = yield self.store.get_device_list_last_stream_id_for_remote(user_id)
  509. logger.debug("Current extremity for %r: %r", user_id, extremity)
  510. stream_id_in_updates = set() # stream_ids in updates list
  511. for _, stream_id, prev_ids, _ in updates:
  512. if not prev_ids:
  513. # We always do a resync if there are no previous IDs
  514. return True
  515. for prev_id in prev_ids:
  516. if prev_id == extremity:
  517. continue
  518. elif prev_id in seen_updates:
  519. continue
  520. elif prev_id in stream_id_in_updates:
  521. continue
  522. else:
  523. return True
  524. stream_id_in_updates.add(stream_id)
  525. return False