test_registration.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from tests import unittest
  16. from twisted.internet import defer
  17. from tests.utils import setup_test_homeserver
  18. class RegistrationStoreTestCase(unittest.TestCase):
  19. @defer.inlineCallbacks
  20. def setUp(self):
  21. hs = yield setup_test_homeserver()
  22. self.db_pool = hs.get_db_pool()
  23. self.store = hs.get_datastore()
  24. self.user_id = "@my-user:test"
  25. self.tokens = [
  26. "AbCdEfGhIjKlMnOpQrStUvWxYz",
  27. "BcDeFgHiJkLmNoPqRsTuVwXyZa"
  28. ]
  29. self.pwhash = "{xx1}123456789"
  30. self.device_id = "akgjhdjklgshg"
  31. @defer.inlineCallbacks
  32. def test_register(self):
  33. yield self.store.register(self.user_id, self.tokens[0], self.pwhash)
  34. self.assertEquals(
  35. {
  36. # TODO(paul): Surely this field should be 'user_id', not 'name'
  37. "name": self.user_id,
  38. "password_hash": self.pwhash,
  39. "is_guest": 0,
  40. "consent_version": None,
  41. "consent_server_notice_sent": None,
  42. },
  43. (yield self.store.get_user_by_id(self.user_id))
  44. )
  45. result = yield self.store.get_user_by_access_token(self.tokens[0])
  46. self.assertDictContainsSubset(
  47. {
  48. "name": self.user_id,
  49. },
  50. result
  51. )
  52. self.assertTrue("token_id" in result)
  53. @defer.inlineCallbacks
  54. def test_add_tokens(self):
  55. yield self.store.register(self.user_id, self.tokens[0], self.pwhash)
  56. yield self.store.add_access_token_to_user(self.user_id, self.tokens[1],
  57. self.device_id)
  58. result = yield self.store.get_user_by_access_token(self.tokens[1])
  59. self.assertDictContainsSubset(
  60. {
  61. "name": self.user_id,
  62. "device_id": self.device_id,
  63. },
  64. result
  65. )
  66. self.assertTrue("token_id" in result)
  67. @defer.inlineCallbacks
  68. def test_user_delete_access_tokens(self):
  69. # add some tokens
  70. yield self.store.register(self.user_id, self.tokens[0], self.pwhash)
  71. yield self.store.add_access_token_to_user(self.user_id, self.tokens[1],
  72. self.device_id)
  73. # now delete some
  74. yield self.store.user_delete_access_tokens(
  75. self.user_id, device_id=self.device_id,
  76. )
  77. # check they were deleted
  78. user = yield self.store.get_user_by_access_token(self.tokens[1])
  79. self.assertIsNone(user, "access token was not deleted by device_id")
  80. # check the one not associated with the device was not deleted
  81. user = yield self.store.get_user_by_access_token(self.tokens[0])
  82. self.assertEqual(self.user_id, user["name"])
  83. # now delete the rest
  84. yield self.store.user_delete_access_tokens(self.user_id)
  85. user = yield self.store.get_user_by_access_token(self.tokens[0])
  86. self.assertIsNone(user,
  87. "access token was not deleted without device_id")
  88. class TokenGenerator:
  89. def __init__(self):
  90. self._last_issued_token = 0
  91. def generate(self, user_id):
  92. self._last_issued_token += 1
  93. return u"%s-%d" % (user_id, self._last_issued_token,)