test_registration.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. # Copyright 2014-2021 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from synapse.api.constants import UserTypes
  15. from synapse.api.errors import ThreepidValidationError
  16. from tests.unittest import HomeserverTestCase
  17. class RegistrationStoreTestCase(HomeserverTestCase):
  18. def prepare(self, reactor, clock, hs):
  19. self.store = hs.get_datastore()
  20. self.user_id = "@my-user:test"
  21. self.tokens = ["AbCdEfGhIjKlMnOpQrStUvWxYz", "BcDeFgHiJkLmNoPqRsTuVwXyZa"]
  22. self.pwhash = "{xx1}123456789"
  23. self.device_id = "akgjhdjklgshg"
  24. def test_register(self):
  25. self.get_success(self.store.register_user(self.user_id, self.pwhash))
  26. self.assertEquals(
  27. {
  28. # TODO(paul): Surely this field should be 'user_id', not 'name'
  29. "name": self.user_id,
  30. "password_hash": self.pwhash,
  31. "admin": 0,
  32. "is_guest": 0,
  33. "consent_version": None,
  34. "consent_server_notice_sent": None,
  35. "appservice_id": None,
  36. "creation_ts": 0,
  37. "user_type": None,
  38. "deactivated": 0,
  39. "shadow_banned": 0,
  40. },
  41. (self.get_success(self.store.get_user_by_id(self.user_id))),
  42. )
  43. def test_add_tokens(self):
  44. self.get_success(self.store.register_user(self.user_id, self.pwhash))
  45. self.get_success(
  46. self.store.add_access_token_to_user(
  47. self.user_id, self.tokens[1], self.device_id, valid_until_ms=None
  48. )
  49. )
  50. result = self.get_success(self.store.get_user_by_access_token(self.tokens[1]))
  51. self.assertEqual(result.user_id, self.user_id)
  52. self.assertEqual(result.device_id, self.device_id)
  53. self.assertIsNotNone(result.token_id)
  54. def test_user_delete_access_tokens(self):
  55. # add some tokens
  56. self.get_success(self.store.register_user(self.user_id, self.pwhash))
  57. self.get_success(
  58. self.store.add_access_token_to_user(
  59. self.user_id, self.tokens[0], device_id=None, valid_until_ms=None
  60. )
  61. )
  62. self.get_success(
  63. self.store.add_access_token_to_user(
  64. self.user_id, self.tokens[1], self.device_id, valid_until_ms=None
  65. )
  66. )
  67. # now delete some
  68. self.get_success(
  69. self.store.user_delete_access_tokens(self.user_id, device_id=self.device_id)
  70. )
  71. # check they were deleted
  72. user = self.get_success(self.store.get_user_by_access_token(self.tokens[1]))
  73. self.assertIsNone(user, "access token was not deleted by device_id")
  74. # check the one not associated with the device was not deleted
  75. user = self.get_success(self.store.get_user_by_access_token(self.tokens[0]))
  76. self.assertEqual(self.user_id, user.user_id)
  77. # now delete the rest
  78. self.get_success(self.store.user_delete_access_tokens(self.user_id))
  79. user = self.get_success(self.store.get_user_by_access_token(self.tokens[0]))
  80. self.assertIsNone(user, "access token was not deleted without device_id")
  81. def test_is_support_user(self):
  82. TEST_USER = "@test:test"
  83. SUPPORT_USER = "@support:test"
  84. res = self.get_success(self.store.is_support_user(None))
  85. self.assertFalse(res)
  86. self.get_success(
  87. self.store.register_user(user_id=TEST_USER, password_hash=None)
  88. )
  89. res = self.get_success(self.store.is_support_user(TEST_USER))
  90. self.assertFalse(res)
  91. self.get_success(
  92. self.store.register_user(
  93. user_id=SUPPORT_USER, password_hash=None, user_type=UserTypes.SUPPORT
  94. )
  95. )
  96. res = self.get_success(self.store.is_support_user(SUPPORT_USER))
  97. self.assertTrue(res)
  98. def test_3pid_inhibit_invalid_validation_session_error(self):
  99. """Tests that enabling the configuration option to inhibit 3PID errors on
  100. /requestToken also inhibits validation errors caused by an unknown session ID.
  101. """
  102. # Check that, with the config setting set to false (the default value), a
  103. # validation error is caused by the unknown session ID.
  104. e = self.get_failure(
  105. self.store.validate_threepid_session(
  106. "fake_sid",
  107. "fake_client_secret",
  108. "fake_token",
  109. 0,
  110. ),
  111. ThreepidValidationError,
  112. )
  113. self.assertEquals(e.value.msg, "Unknown session_id", e)
  114. # Set the config setting to true.
  115. self.store._ignore_unknown_session_error = True
  116. # Check that now the validation error is caused by the token not matching.
  117. e = self.get_failure(
  118. self.store.validate_threepid_session(
  119. "fake_sid",
  120. "fake_client_secret",
  121. "fake_token",
  122. 0,
  123. ),
  124. ThreepidValidationError,
  125. )
  126. self.assertEquals(e.value.msg, "Validation token not found or has expired", e)