test_device.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2016 OpenMarket Ltd
  3. # Copyright 2018 New Vector Ltd
  4. # Copyright 2020 The Matrix.org Foundation C.I.C.
  5. #
  6. # Licensed under the Apache License, Version 2.0 (the "License");
  7. # you may not use this file except in compliance with the License.
  8. # You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. import synapse.api.errors
  18. import synapse.handlers.device
  19. import synapse.storage
  20. from tests import unittest
  21. user1 = "@boris:aaa"
  22. user2 = "@theresa:bbb"
  23. class DeviceTestCase(unittest.HomeserverTestCase):
  24. def make_homeserver(self, reactor, clock):
  25. hs = self.setup_test_homeserver("server", federation_http_client=None)
  26. self.handler = hs.get_device_handler()
  27. self.store = hs.get_datastore()
  28. return hs
  29. def prepare(self, reactor, clock, hs):
  30. # These tests assume that it starts 1000 seconds in.
  31. self.reactor.advance(1000)
  32. def test_device_is_created_with_invalid_name(self):
  33. self.get_failure(
  34. self.handler.check_device_registered(
  35. user_id="@boris:foo",
  36. device_id="foo",
  37. initial_device_display_name="a"
  38. * (synapse.handlers.device.MAX_DEVICE_DISPLAY_NAME_LEN + 1),
  39. ),
  40. synapse.api.errors.SynapseError,
  41. )
  42. def test_device_is_created_if_doesnt_exist(self):
  43. res = self.get_success(
  44. self.handler.check_device_registered(
  45. user_id="@boris:foo",
  46. device_id="fco",
  47. initial_device_display_name="display name",
  48. )
  49. )
  50. self.assertEqual(res, "fco")
  51. dev = self.get_success(self.handler.store.get_device("@boris:foo", "fco"))
  52. self.assertEqual(dev["display_name"], "display name")
  53. def test_device_is_preserved_if_exists(self):
  54. res1 = self.get_success(
  55. self.handler.check_device_registered(
  56. user_id="@boris:foo",
  57. device_id="fco",
  58. initial_device_display_name="display name",
  59. )
  60. )
  61. self.assertEqual(res1, "fco")
  62. res2 = self.get_success(
  63. self.handler.check_device_registered(
  64. user_id="@boris:foo",
  65. device_id="fco",
  66. initial_device_display_name="new display name",
  67. )
  68. )
  69. self.assertEqual(res2, "fco")
  70. dev = self.get_success(self.handler.store.get_device("@boris:foo", "fco"))
  71. self.assertEqual(dev["display_name"], "display name")
  72. def test_device_id_is_made_up_if_unspecified(self):
  73. device_id = self.get_success(
  74. self.handler.check_device_registered(
  75. user_id="@theresa:foo",
  76. device_id=None,
  77. initial_device_display_name="display",
  78. )
  79. )
  80. dev = self.get_success(self.handler.store.get_device("@theresa:foo", device_id))
  81. self.assertEqual(dev["display_name"], "display")
  82. def test_get_devices_by_user(self):
  83. self._record_users()
  84. res = self.get_success(self.handler.get_devices_by_user(user1))
  85. self.assertEqual(3, len(res))
  86. device_map = {d["device_id"]: d for d in res}
  87. self.assertDictContainsSubset(
  88. {
  89. "user_id": user1,
  90. "device_id": "xyz",
  91. "display_name": "display 0",
  92. "last_seen_ip": None,
  93. "last_seen_ts": None,
  94. },
  95. device_map["xyz"],
  96. )
  97. self.assertDictContainsSubset(
  98. {
  99. "user_id": user1,
  100. "device_id": "fco",
  101. "display_name": "display 1",
  102. "last_seen_ip": "ip1",
  103. "last_seen_ts": 1000000,
  104. },
  105. device_map["fco"],
  106. )
  107. self.assertDictContainsSubset(
  108. {
  109. "user_id": user1,
  110. "device_id": "abc",
  111. "display_name": "display 2",
  112. "last_seen_ip": "ip3",
  113. "last_seen_ts": 3000000,
  114. },
  115. device_map["abc"],
  116. )
  117. def test_get_device(self):
  118. self._record_users()
  119. res = self.get_success(self.handler.get_device(user1, "abc"))
  120. self.assertDictContainsSubset(
  121. {
  122. "user_id": user1,
  123. "device_id": "abc",
  124. "display_name": "display 2",
  125. "last_seen_ip": "ip3",
  126. "last_seen_ts": 3000000,
  127. },
  128. res,
  129. )
  130. def test_delete_device(self):
  131. self._record_users()
  132. # delete the device
  133. self.get_success(self.handler.delete_device(user1, "abc"))
  134. # check the device was deleted
  135. self.get_failure(
  136. self.handler.get_device(user1, "abc"), synapse.api.errors.NotFoundError
  137. )
  138. # we'd like to check the access token was invalidated, but that's a
  139. # bit of a PITA.
  140. def test_update_device(self):
  141. self._record_users()
  142. update = {"display_name": "new display"}
  143. self.get_success(self.handler.update_device(user1, "abc", update))
  144. res = self.get_success(self.handler.get_device(user1, "abc"))
  145. self.assertEqual(res["display_name"], "new display")
  146. def test_update_device_too_long_display_name(self):
  147. """Update a device with a display name that is invalid (too long)."""
  148. self._record_users()
  149. # Request to update a device display name with a new value that is longer than allowed.
  150. update = {
  151. "display_name": "a"
  152. * (synapse.handlers.device.MAX_DEVICE_DISPLAY_NAME_LEN + 1)
  153. }
  154. self.get_failure(
  155. self.handler.update_device(user1, "abc", update),
  156. synapse.api.errors.SynapseError,
  157. )
  158. # Ensure the display name was not updated.
  159. res = self.get_success(self.handler.get_device(user1, "abc"))
  160. self.assertEqual(res["display_name"], "display 2")
  161. def test_update_unknown_device(self):
  162. update = {"display_name": "new_display"}
  163. self.get_failure(
  164. self.handler.update_device("user_id", "unknown_device_id", update),
  165. synapse.api.errors.NotFoundError,
  166. )
  167. def _record_users(self):
  168. # check this works for both devices which have a recorded client_ip,
  169. # and those which don't.
  170. self._record_user(user1, "xyz", "display 0")
  171. self._record_user(user1, "fco", "display 1", "token1", "ip1")
  172. self._record_user(user1, "abc", "display 2", "token2", "ip2")
  173. self._record_user(user1, "abc", "display 2", "token3", "ip3")
  174. self._record_user(user2, "def", "dispkay", "token4", "ip4")
  175. self.reactor.advance(10000)
  176. def _record_user(
  177. self, user_id, device_id, display_name, access_token=None, ip=None
  178. ):
  179. device_id = self.get_success(
  180. self.handler.check_device_registered(
  181. user_id=user_id,
  182. device_id=device_id,
  183. initial_device_display_name=display_name,
  184. )
  185. )
  186. if ip is not None:
  187. self.get_success(
  188. self.store.insert_client_ip(
  189. user_id, access_token, ip, "user_agent", device_id
  190. )
  191. )
  192. self.reactor.advance(1000)
  193. class DehydrationTestCase(unittest.HomeserverTestCase):
  194. def make_homeserver(self, reactor, clock):
  195. hs = self.setup_test_homeserver("server", federation_http_client=None)
  196. self.handler = hs.get_device_handler()
  197. self.registration = hs.get_registration_handler()
  198. self.auth = hs.get_auth()
  199. self.store = hs.get_datastore()
  200. return hs
  201. def test_dehydrate_and_rehydrate_device(self):
  202. user_id = "@boris:dehydration"
  203. self.get_success(self.store.register_user(user_id, "foobar"))
  204. # First check if we can store and fetch a dehydrated device
  205. stored_dehydrated_device_id = self.get_success(
  206. self.handler.store_dehydrated_device(
  207. user_id=user_id,
  208. device_data={"device_data": {"foo": "bar"}},
  209. initial_device_display_name="dehydrated device",
  210. )
  211. )
  212. retrieved_device_id, device_data = self.get_success(
  213. self.handler.get_dehydrated_device(user_id=user_id)
  214. )
  215. self.assertEqual(retrieved_device_id, stored_dehydrated_device_id)
  216. self.assertEqual(device_data, {"device_data": {"foo": "bar"}})
  217. # Create a new login for the user and dehydrated the device
  218. device_id, access_token = self.get_success(
  219. self.registration.register_device(
  220. user_id=user_id,
  221. device_id=None,
  222. initial_display_name="new device",
  223. )
  224. )
  225. # Trying to claim a nonexistent device should throw an error
  226. self.get_failure(
  227. self.handler.rehydrate_device(
  228. user_id=user_id,
  229. access_token=access_token,
  230. device_id="not the right device ID",
  231. ),
  232. synapse.api.errors.NotFoundError,
  233. )
  234. # dehydrating the right devices should succeed and change our device ID
  235. # to the dehydrated device's ID
  236. res = self.get_success(
  237. self.handler.rehydrate_device(
  238. user_id=user_id,
  239. access_token=access_token,
  240. device_id=retrieved_device_id,
  241. )
  242. )
  243. self.assertEqual(res, {"success": True})
  244. # make sure that our device ID has changed
  245. user_info = self.get_success(self.auth.get_user_by_access_token(access_token))
  246. self.assertEqual(user_info.device_id, retrieved_device_id)
  247. # make sure the device has the display name that was set from the login
  248. res = self.get_success(self.handler.get_device(user_id, retrieved_device_id))
  249. self.assertEqual(res["display_name"], "new device")
  250. # make sure that the device ID that we were initially assigned no longer exists
  251. self.get_failure(
  252. self.handler.get_device(user_id, device_id),
  253. synapse.api.errors.NotFoundError,
  254. )
  255. # make sure that there's no device available for dehydrating now
  256. ret = self.get_success(self.handler.get_dehydrated_device(user_id=user_id))
  257. self.assertIsNone(ret)