test_registration.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from tests import unittest
  16. from twisted.internet import defer
  17. from tests.utils import setup_test_homeserver
  18. class RegistrationStoreTestCase(unittest.TestCase):
  19. @defer.inlineCallbacks
  20. def setUp(self):
  21. hs = yield setup_test_homeserver()
  22. self.db_pool = hs.get_db_pool()
  23. self.store = hs.get_datastore()
  24. self.user_id = "@my-user:test"
  25. self.tokens = [
  26. "AbCdEfGhIjKlMnOpQrStUvWxYz",
  27. "BcDeFgHiJkLmNoPqRsTuVwXyZa"
  28. ]
  29. self.pwhash = "{xx1}123456789"
  30. self.device_id = "akgjhdjklgshg"
  31. @defer.inlineCallbacks
  32. def test_register(self):
  33. yield self.store.register(self.user_id, self.tokens[0], self.pwhash)
  34. self.assertEquals(
  35. # TODO(paul): Surely this field should be 'user_id', not 'name'
  36. # Additionally surely it shouldn't come in a 1-element list
  37. {"name": self.user_id, "password_hash": self.pwhash, "is_guest": 0},
  38. (yield self.store.get_user_by_id(self.user_id))
  39. )
  40. result = yield self.store.get_user_by_access_token(self.tokens[0])
  41. self.assertDictContainsSubset(
  42. {
  43. "name": self.user_id,
  44. },
  45. result
  46. )
  47. self.assertTrue("token_id" in result)
  48. @defer.inlineCallbacks
  49. def test_add_tokens(self):
  50. yield self.store.register(self.user_id, self.tokens[0], self.pwhash)
  51. yield self.store.add_access_token_to_user(self.user_id, self.tokens[1],
  52. self.device_id)
  53. result = yield self.store.get_user_by_access_token(self.tokens[1])
  54. self.assertDictContainsSubset(
  55. {
  56. "name": self.user_id,
  57. "device_id": self.device_id,
  58. },
  59. result
  60. )
  61. self.assertTrue("token_id" in result)
  62. @defer.inlineCallbacks
  63. def test_user_delete_access_tokens(self):
  64. # add some tokens
  65. yield self.store.register(self.user_id, self.tokens[0], self.pwhash)
  66. yield self.store.add_access_token_to_user(self.user_id, self.tokens[1],
  67. self.device_id)
  68. # now delete some
  69. yield self.store.user_delete_access_tokens(
  70. self.user_id, device_id=self.device_id,
  71. )
  72. # check they were deleted
  73. user = yield self.store.get_user_by_access_token(self.tokens[1])
  74. self.assertIsNone(user, "access token was not deleted by device_id")
  75. # check the one not associated with the device was not deleted
  76. user = yield self.store.get_user_by_access_token(self.tokens[0])
  77. self.assertEqual(self.user_id, user["name"])
  78. # now delete the rest
  79. yield self.store.user_delete_access_tokens(self.user_id)
  80. user = yield self.store.get_user_by_access_token(self.tokens[0])
  81. self.assertIsNone(user,
  82. "access token was not deleted without device_id")
  83. class TokenGenerator:
  84. def __init__(self):
  85. self._last_issued_token = 0
  86. def generate(self, user_id):
  87. self._last_issued_token += 1
  88. return u"%s-%d" % (user_id, self._last_issued_token,)