test_device.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2016 OpenMarket Ltd
  3. # Copyright 2018 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import synapse.api.errors
  17. import synapse.handlers.device
  18. import synapse.storage
  19. from tests import unittest
  20. user1 = "@boris:aaa"
  21. user2 = "@theresa:bbb"
  22. class DeviceTestCase(unittest.HomeserverTestCase):
  23. def make_homeserver(self, reactor, clock):
  24. hs = self.setup_test_homeserver("server", http_client=None)
  25. self.handler = hs.get_device_handler()
  26. self.store = hs.get_datastore()
  27. return hs
  28. def prepare(self, reactor, clock, hs):
  29. # These tests assume that it starts 1000 seconds in.
  30. self.reactor.advance(1000)
  31. def test_device_is_created_if_doesnt_exist(self):
  32. res = self.get_success(
  33. self.handler.check_device_registered(
  34. user_id="@boris:foo",
  35. device_id="fco",
  36. initial_device_display_name="display name",
  37. )
  38. )
  39. self.assertEqual(res, "fco")
  40. dev = self.get_success(self.handler.store.get_device("@boris:foo", "fco"))
  41. self.assertEqual(dev["display_name"], "display name")
  42. def test_device_is_preserved_if_exists(self):
  43. res1 = self.get_success(
  44. self.handler.check_device_registered(
  45. user_id="@boris:foo",
  46. device_id="fco",
  47. initial_device_display_name="display name",
  48. )
  49. )
  50. self.assertEqual(res1, "fco")
  51. res2 = self.get_success(
  52. self.handler.check_device_registered(
  53. user_id="@boris:foo",
  54. device_id="fco",
  55. initial_device_display_name="new display name",
  56. )
  57. )
  58. self.assertEqual(res2, "fco")
  59. dev = self.get_success(self.handler.store.get_device("@boris:foo", "fco"))
  60. self.assertEqual(dev["display_name"], "display name")
  61. def test_device_id_is_made_up_if_unspecified(self):
  62. device_id = self.get_success(
  63. self.handler.check_device_registered(
  64. user_id="@theresa:foo",
  65. device_id=None,
  66. initial_device_display_name="display",
  67. )
  68. )
  69. dev = self.get_success(self.handler.store.get_device("@theresa:foo", device_id))
  70. self.assertEqual(dev["display_name"], "display")
  71. def test_get_devices_by_user(self):
  72. self._record_users()
  73. res = self.get_success(self.handler.get_devices_by_user(user1))
  74. self.assertEqual(3, len(res))
  75. device_map = {d["device_id"]: d for d in res}
  76. self.assertDictContainsSubset(
  77. {
  78. "user_id": user1,
  79. "device_id": "xyz",
  80. "display_name": "display 0",
  81. "last_seen_ip": None,
  82. "last_seen_ts": None,
  83. },
  84. device_map["xyz"],
  85. )
  86. self.assertDictContainsSubset(
  87. {
  88. "user_id": user1,
  89. "device_id": "fco",
  90. "display_name": "display 1",
  91. "last_seen_ip": "ip1",
  92. "last_seen_ts": 1000000,
  93. },
  94. device_map["fco"],
  95. )
  96. self.assertDictContainsSubset(
  97. {
  98. "user_id": user1,
  99. "device_id": "abc",
  100. "display_name": "display 2",
  101. "last_seen_ip": "ip3",
  102. "last_seen_ts": 3000000,
  103. },
  104. device_map["abc"],
  105. )
  106. def test_get_device(self):
  107. self._record_users()
  108. res = self.get_success(self.handler.get_device(user1, "abc"))
  109. self.assertDictContainsSubset(
  110. {
  111. "user_id": user1,
  112. "device_id": "abc",
  113. "display_name": "display 2",
  114. "last_seen_ip": "ip3",
  115. "last_seen_ts": 3000000,
  116. },
  117. res,
  118. )
  119. def test_delete_device(self):
  120. self._record_users()
  121. # delete the device
  122. self.get_success(self.handler.delete_device(user1, "abc"))
  123. # check the device was deleted
  124. res = self.handler.get_device(user1, "abc")
  125. self.pump()
  126. self.assertIsInstance(
  127. self.failureResultOf(res).value, synapse.api.errors.NotFoundError
  128. )
  129. # we'd like to check the access token was invalidated, but that's a
  130. # bit of a PITA.
  131. def test_update_device(self):
  132. self._record_users()
  133. update = {"display_name": "new display"}
  134. self.get_success(self.handler.update_device(user1, "abc", update))
  135. res = self.get_success(self.handler.get_device(user1, "abc"))
  136. self.assertEqual(res["display_name"], "new display")
  137. def test_update_device_too_long_display_name(self):
  138. """Update a device with a display name that is invalid (too long)."""
  139. self._record_users()
  140. # Request to update a device display name with a new value that is longer than allowed.
  141. update = {
  142. "display_name": "a"
  143. * (synapse.handlers.device.MAX_DEVICE_DISPLAY_NAME_LEN + 1)
  144. }
  145. self.get_failure(
  146. self.handler.update_device(user1, "abc", update),
  147. synapse.api.errors.SynapseError,
  148. )
  149. # Ensure the display name was not updated.
  150. res = self.get_success(self.handler.get_device(user1, "abc"))
  151. self.assertEqual(res["display_name"], "display 2")
  152. def test_update_unknown_device(self):
  153. update = {"display_name": "new_display"}
  154. res = self.handler.update_device("user_id", "unknown_device_id", update)
  155. self.pump()
  156. self.assertIsInstance(
  157. self.failureResultOf(res).value, synapse.api.errors.NotFoundError
  158. )
  159. def _record_users(self):
  160. # check this works for both devices which have a recorded client_ip,
  161. # and those which don't.
  162. self._record_user(user1, "xyz", "display 0")
  163. self._record_user(user1, "fco", "display 1", "token1", "ip1")
  164. self._record_user(user1, "abc", "display 2", "token2", "ip2")
  165. self._record_user(user1, "abc", "display 2", "token3", "ip3")
  166. self._record_user(user2, "def", "dispkay", "token4", "ip4")
  167. self.reactor.advance(10000)
  168. def _record_user(
  169. self, user_id, device_id, display_name, access_token=None, ip=None
  170. ):
  171. device_id = self.get_success(
  172. self.handler.check_device_registered(
  173. user_id=user_id,
  174. device_id=device_id,
  175. initial_device_display_name=display_name,
  176. )
  177. )
  178. if ip is not None:
  179. self.get_success(
  180. self.store.insert_client_ip(
  181. user_id, access_token, ip, "user_agent", device_id
  182. )
  183. )
  184. self.reactor.advance(1000)