test_e2e_keys.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import mock
  16. from twisted.internet import defer
  17. import synapse.api.errors
  18. import synapse.handlers.e2e_keys
  19. import synapse.storage
  20. from synapse.api import errors
  21. from tests import unittest, utils
  22. class E2eKeysHandlerTestCase(unittest.TestCase):
  23. def __init__(self, *args, **kwargs):
  24. super(E2eKeysHandlerTestCase, self).__init__(*args, **kwargs)
  25. self.hs = None # type: synapse.server.HomeServer
  26. self.handler = None # type: synapse.handlers.e2e_keys.E2eKeysHandler
  27. @defer.inlineCallbacks
  28. def setUp(self):
  29. self.hs = yield utils.setup_test_homeserver(
  30. self.addCleanup, handlers=None, federation_client=mock.Mock()
  31. )
  32. self.handler = synapse.handlers.e2e_keys.E2eKeysHandler(self.hs)
  33. @defer.inlineCallbacks
  34. def test_query_local_devices_no_devices(self):
  35. """If the user has no devices, we expect an empty list.
  36. """
  37. local_user = "@boris:" + self.hs.hostname
  38. res = yield self.handler.query_local_devices({local_user: None})
  39. self.assertDictEqual(res, {local_user: {}})
  40. @defer.inlineCallbacks
  41. def test_reupload_one_time_keys(self):
  42. """we should be able to re-upload the same keys"""
  43. local_user = "@boris:" + self.hs.hostname
  44. device_id = "xyz"
  45. keys = {
  46. "alg1:k1": "key1",
  47. "alg2:k2": {"key": "key2", "signatures": {"k1": "sig1"}},
  48. "alg2:k3": {"key": "key3"},
  49. }
  50. res = yield self.handler.upload_keys_for_user(
  51. local_user, device_id, {"one_time_keys": keys}
  52. )
  53. self.assertDictEqual(res, {"one_time_key_counts": {"alg1": 1, "alg2": 2}})
  54. # we should be able to change the signature without a problem
  55. keys["alg2:k2"]["signatures"]["k1"] = "sig2"
  56. res = yield self.handler.upload_keys_for_user(
  57. local_user, device_id, {"one_time_keys": keys}
  58. )
  59. self.assertDictEqual(res, {"one_time_key_counts": {"alg1": 1, "alg2": 2}})
  60. @defer.inlineCallbacks
  61. def test_change_one_time_keys(self):
  62. """attempts to change one-time-keys should be rejected"""
  63. local_user = "@boris:" + self.hs.hostname
  64. device_id = "xyz"
  65. keys = {
  66. "alg1:k1": "key1",
  67. "alg2:k2": {"key": "key2", "signatures": {"k1": "sig1"}},
  68. "alg2:k3": {"key": "key3"},
  69. }
  70. res = yield self.handler.upload_keys_for_user(
  71. local_user, device_id, {"one_time_keys": keys}
  72. )
  73. self.assertDictEqual(res, {"one_time_key_counts": {"alg1": 1, "alg2": 2}})
  74. try:
  75. yield self.handler.upload_keys_for_user(
  76. local_user, device_id, {"one_time_keys": {"alg1:k1": "key2"}}
  77. )
  78. self.fail("No error when changing string key")
  79. except errors.SynapseError:
  80. pass
  81. try:
  82. yield self.handler.upload_keys_for_user(
  83. local_user, device_id, {"one_time_keys": {"alg2:k3": "key2"}}
  84. )
  85. self.fail("No error when replacing dict key with string")
  86. except errors.SynapseError:
  87. pass
  88. try:
  89. yield self.handler.upload_keys_for_user(
  90. local_user, device_id, {"one_time_keys": {"alg1:k1": {"key": "key"}}}
  91. )
  92. self.fail("No error when replacing string key with dict")
  93. except errors.SynapseError:
  94. pass
  95. try:
  96. yield self.handler.upload_keys_for_user(
  97. local_user,
  98. device_id,
  99. {
  100. "one_time_keys": {
  101. "alg2:k2": {"key": "key3", "signatures": {"k1": "sig1"}}
  102. }
  103. },
  104. )
  105. self.fail("No error when replacing dict key")
  106. except errors.SynapseError:
  107. pass
  108. @defer.inlineCallbacks
  109. def test_claim_one_time_key(self):
  110. local_user = "@boris:" + self.hs.hostname
  111. device_id = "xyz"
  112. keys = {"alg1:k1": "key1"}
  113. res = yield self.handler.upload_keys_for_user(
  114. local_user, device_id, {"one_time_keys": keys}
  115. )
  116. self.assertDictEqual(res, {"one_time_key_counts": {"alg1": 1}})
  117. res2 = yield self.handler.claim_one_time_keys(
  118. {"one_time_keys": {local_user: {device_id: "alg1"}}}, timeout=None
  119. )
  120. self.assertEqual(
  121. res2,
  122. {
  123. "failures": {},
  124. "one_time_keys": {local_user: {device_id: {"alg1:k1": "key1"}}},
  125. },
  126. )