test_registration.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from tests import unittest
  16. from twisted.internet import defer
  17. from tests.utils import setup_test_homeserver
  18. class RegistrationStoreTestCase(unittest.TestCase):
  19. @defer.inlineCallbacks
  20. def setUp(self):
  21. hs = yield setup_test_homeserver()
  22. self.db_pool = hs.get_db_pool()
  23. self.store = hs.get_datastore()
  24. self.user_id = "@my-user:test"
  25. self.tokens = [
  26. "AbCdEfGhIjKlMnOpQrStUvWxYz",
  27. "BcDeFgHiJkLmNoPqRsTuVwXyZa"
  28. ]
  29. self.pwhash = "{xx1}123456789"
  30. self.device_id = "akgjhdjklgshg"
  31. @defer.inlineCallbacks
  32. def test_register(self):
  33. yield self.store.register(self.user_id, self.tokens[0], self.pwhash)
  34. self.assertEquals(
  35. {
  36. # TODO(paul): Surely this field should be 'user_id', not 'name'
  37. "name": self.user_id,
  38. "password_hash": self.pwhash,
  39. "is_guest": 0,
  40. "consent_version": None,
  41. "consent_server_notice_sent": None,
  42. "appservice_id": None,
  43. },
  44. (yield self.store.get_user_by_id(self.user_id))
  45. )
  46. result = yield self.store.get_user_by_access_token(self.tokens[0])
  47. self.assertDictContainsSubset(
  48. {
  49. "name": self.user_id,
  50. },
  51. result
  52. )
  53. self.assertTrue("token_id" in result)
  54. @defer.inlineCallbacks
  55. def test_add_tokens(self):
  56. yield self.store.register(self.user_id, self.tokens[0], self.pwhash)
  57. yield self.store.add_access_token_to_user(self.user_id, self.tokens[1],
  58. self.device_id)
  59. result = yield self.store.get_user_by_access_token(self.tokens[1])
  60. self.assertDictContainsSubset(
  61. {
  62. "name": self.user_id,
  63. "device_id": self.device_id,
  64. },
  65. result
  66. )
  67. self.assertTrue("token_id" in result)
  68. @defer.inlineCallbacks
  69. def test_user_delete_access_tokens(self):
  70. # add some tokens
  71. yield self.store.register(self.user_id, self.tokens[0], self.pwhash)
  72. yield self.store.add_access_token_to_user(self.user_id, self.tokens[1],
  73. self.device_id)
  74. # now delete some
  75. yield self.store.user_delete_access_tokens(
  76. self.user_id, device_id=self.device_id,
  77. )
  78. # check they were deleted
  79. user = yield self.store.get_user_by_access_token(self.tokens[1])
  80. self.assertIsNone(user, "access token was not deleted by device_id")
  81. # check the one not associated with the device was not deleted
  82. user = yield self.store.get_user_by_access_token(self.tokens[0])
  83. self.assertEqual(self.user_id, user["name"])
  84. # now delete the rest
  85. yield self.store.user_delete_access_tokens(self.user_id)
  86. user = yield self.store.get_user_by_access_token(self.tokens[0])
  87. self.assertIsNone(user,
  88. "access token was not deleted without device_id")
  89. class TokenGenerator:
  90. def __init__(self):
  91. self._last_issued_token = 0
  92. def generate(self, user_id):
  93. self._last_issued_token += 1
  94. return u"%s-%d" % (user_id, self._last_issued_token,)