test_e2e_keys.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import mock
  16. from synapse.api import errors
  17. from twisted.internet import defer
  18. import synapse.api.errors
  19. import synapse.handlers.e2e_keys
  20. import synapse.storage
  21. from tests import unittest, utils
  22. class E2eKeysHandlerTestCase(unittest.TestCase):
  23. def __init__(self, *args, **kwargs):
  24. super(E2eKeysHandlerTestCase, self).__init__(*args, **kwargs)
  25. self.hs = None # type: synapse.server.HomeServer
  26. self.handler = None # type: synapse.handlers.e2e_keys.E2eKeysHandler
  27. @defer.inlineCallbacks
  28. def setUp(self):
  29. self.hs = yield utils.setup_test_homeserver(
  30. handlers=None,
  31. replication_layer=mock.Mock(),
  32. )
  33. self.handler = synapse.handlers.e2e_keys.E2eKeysHandler(self.hs)
  34. @defer.inlineCallbacks
  35. def test_query_local_devices_no_devices(self):
  36. """If the user has no devices, we expect an empty list.
  37. """
  38. local_user = "@boris:" + self.hs.hostname
  39. res = yield self.handler.query_local_devices({local_user: None})
  40. self.assertDictEqual(res, {local_user: {}})
  41. @defer.inlineCallbacks
  42. def test_reupload_one_time_keys(self):
  43. """we should be able to re-upload the same keys"""
  44. local_user = "@boris:" + self.hs.hostname
  45. device_id = "xyz"
  46. keys = {
  47. "alg1:k1": "key1",
  48. "alg2:k2": {
  49. "key": "key2",
  50. "signatures": {"k1": "sig1"}
  51. },
  52. "alg2:k3": {
  53. "key": "key3",
  54. },
  55. }
  56. res = yield self.handler.upload_keys_for_user(
  57. local_user, device_id, {"one_time_keys": keys},
  58. )
  59. self.assertDictEqual(res, {
  60. "one_time_key_counts": {"alg1": 1, "alg2": 2}
  61. })
  62. # we should be able to change the signature without a problem
  63. keys["alg2:k2"]["signatures"]["k1"] = "sig2"
  64. res = yield self.handler.upload_keys_for_user(
  65. local_user, device_id, {"one_time_keys": keys},
  66. )
  67. self.assertDictEqual(res, {
  68. "one_time_key_counts": {"alg1": 1, "alg2": 2}
  69. })
  70. @defer.inlineCallbacks
  71. def test_change_one_time_keys(self):
  72. """attempts to change one-time-keys should be rejected"""
  73. local_user = "@boris:" + self.hs.hostname
  74. device_id = "xyz"
  75. keys = {
  76. "alg1:k1": "key1",
  77. "alg2:k2": {
  78. "key": "key2",
  79. "signatures": {"k1": "sig1"}
  80. },
  81. "alg2:k3": {
  82. "key": "key3",
  83. },
  84. }
  85. res = yield self.handler.upload_keys_for_user(
  86. local_user, device_id, {"one_time_keys": keys},
  87. )
  88. self.assertDictEqual(res, {
  89. "one_time_key_counts": {"alg1": 1, "alg2": 2}
  90. })
  91. try:
  92. yield self.handler.upload_keys_for_user(
  93. local_user, device_id, {"one_time_keys": {"alg1:k1": "key2"}},
  94. )
  95. self.fail("No error when changing string key")
  96. except errors.SynapseError:
  97. pass
  98. try:
  99. yield self.handler.upload_keys_for_user(
  100. local_user, device_id, {"one_time_keys": {"alg2:k3": "key2"}},
  101. )
  102. self.fail("No error when replacing dict key with string")
  103. except errors.SynapseError:
  104. pass
  105. try:
  106. yield self.handler.upload_keys_for_user(
  107. local_user, device_id, {
  108. "one_time_keys": {"alg1:k1": {"key": "key"}}
  109. },
  110. )
  111. self.fail("No error when replacing string key with dict")
  112. except errors.SynapseError:
  113. pass
  114. try:
  115. yield self.handler.upload_keys_for_user(
  116. local_user, device_id, {
  117. "one_time_keys": {
  118. "alg2:k2": {
  119. "key": "key3",
  120. "signatures": {"k1": "sig1"},
  121. }
  122. },
  123. },
  124. )
  125. self.fail("No error when replacing dict key")
  126. except errors.SynapseError:
  127. pass
  128. @unittest.DEBUG
  129. @defer.inlineCallbacks
  130. def test_claim_one_time_key(self):
  131. local_user = "@boris:" + self.hs.hostname
  132. device_id = "xyz"
  133. keys = {
  134. "alg1:k1": "key1",
  135. }
  136. res = yield self.handler.upload_keys_for_user(
  137. local_user, device_id, {"one_time_keys": keys},
  138. )
  139. self.assertDictEqual(res, {
  140. "one_time_key_counts": {"alg1": 1}
  141. })
  142. res2 = yield self.handler.claim_one_time_keys({
  143. "one_time_keys": {
  144. local_user: {
  145. device_id: "alg1"
  146. }
  147. }
  148. }, timeout=None)
  149. self.assertEqual(res2, {
  150. "failures": {},
  151. "one_time_keys": {
  152. local_user: {
  153. device_id: {
  154. "alg1:k1": "key1"
  155. }
  156. }
  157. }
  158. })