123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615 |
- # Copyright 2016 OpenMarket Ltd
- # Copyright 2019 New Vector Ltd
- # Copyright 2019 The Matrix.org Foundation C.I.C.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- from unittest import mock
- from signedjson import key as key, sign as sign
- from synapse.api.constants import RoomEncryptionAlgorithms
- from synapse.api.errors import Codes, SynapseError
- from tests import unittest
- class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
- def make_homeserver(self, reactor, clock):
- return self.setup_test_homeserver(federation_client=mock.Mock())
- def prepare(self, reactor, clock, hs):
- self.handler = hs.get_e2e_keys_handler()
- self.store = self.hs.get_datastore()
- def test_query_local_devices_no_devices(self):
- """If the user has no devices, we expect an empty list."""
- local_user = "@boris:" + self.hs.hostname
- res = self.get_success(self.handler.query_local_devices({local_user: None}))
- self.assertDictEqual(res, {local_user: {}})
- def test_reupload_one_time_keys(self):
- """we should be able to re-upload the same keys"""
- local_user = "@boris:" + self.hs.hostname
- device_id = "xyz"
- keys = {
- "alg1:k1": "key1",
- "alg2:k2": {"key": "key2", "signatures": {"k1": "sig1"}},
- "alg2:k3": {"key": "key3"},
- }
- res = self.get_success(
- self.handler.upload_keys_for_user(
- local_user, device_id, {"one_time_keys": keys}
- )
- )
- self.assertDictEqual(res, {"one_time_key_counts": {"alg1": 1, "alg2": 2}})
- # we should be able to change the signature without a problem
- keys["alg2:k2"]["signatures"]["k1"] = "sig2"
- res = self.get_success(
- self.handler.upload_keys_for_user(
- local_user, device_id, {"one_time_keys": keys}
- )
- )
- self.assertDictEqual(res, {"one_time_key_counts": {"alg1": 1, "alg2": 2}})
- def test_change_one_time_keys(self):
- """attempts to change one-time-keys should be rejected"""
- local_user = "@boris:" + self.hs.hostname
- device_id = "xyz"
- keys = {
- "alg1:k1": "key1",
- "alg2:k2": {"key": "key2", "signatures": {"k1": "sig1"}},
- "alg2:k3": {"key": "key3"},
- }
- res = self.get_success(
- self.handler.upload_keys_for_user(
- local_user, device_id, {"one_time_keys": keys}
- )
- )
- self.assertDictEqual(res, {"one_time_key_counts": {"alg1": 1, "alg2": 2}})
- # Error when changing string key
- self.get_failure(
- self.handler.upload_keys_for_user(
- local_user, device_id, {"one_time_keys": {"alg1:k1": "key2"}}
- ),
- SynapseError,
- )
- # Error when replacing dict key with strin
- self.get_failure(
- self.handler.upload_keys_for_user(
- local_user, device_id, {"one_time_keys": {"alg2:k3": "key2"}}
- ),
- SynapseError,
- )
- # Error when replacing string key with dict
- self.get_failure(
- self.handler.upload_keys_for_user(
- local_user,
- device_id,
- {"one_time_keys": {"alg1:k1": {"key": "key"}}},
- ),
- SynapseError,
- )
- # Error when replacing dict key
- self.get_failure(
- self.handler.upload_keys_for_user(
- local_user,
- device_id,
- {
- "one_time_keys": {
- "alg2:k2": {"key": "key3", "signatures": {"k1": "sig1"}}
- }
- },
- ),
- SynapseError,
- )
- def test_claim_one_time_key(self):
- local_user = "@boris:" + self.hs.hostname
- device_id = "xyz"
- keys = {"alg1:k1": "key1"}
- res = self.get_success(
- self.handler.upload_keys_for_user(
- local_user, device_id, {"one_time_keys": keys}
- )
- )
- self.assertDictEqual(res, {"one_time_key_counts": {"alg1": 1}})
- res2 = self.get_success(
- self.handler.claim_one_time_keys(
- {"one_time_keys": {local_user: {device_id: "alg1"}}}, timeout=None
- )
- )
- self.assertEqual(
- res2,
- {
- "failures": {},
- "one_time_keys": {local_user: {device_id: {"alg1:k1": "key1"}}},
- },
- )
- def test_fallback_key(self):
- local_user = "@boris:" + self.hs.hostname
- device_id = "xyz"
- fallback_key = {"alg1:k1": "key1"}
- otk = {"alg1:k2": "key2"}
- # we shouldn't have any unused fallback keys yet
- res = self.get_success(
- self.store.get_e2e_unused_fallback_key_types(local_user, device_id)
- )
- self.assertEqual(res, [])
- self.get_success(
- self.handler.upload_keys_for_user(
- local_user,
- device_id,
- {"org.matrix.msc2732.fallback_keys": fallback_key},
- )
- )
- # we should now have an unused alg1 key
- res = self.get_success(
- self.store.get_e2e_unused_fallback_key_types(local_user, device_id)
- )
- self.assertEqual(res, ["alg1"])
- # claiming an OTK when no OTKs are available should return the fallback
- # key
- res = self.get_success(
- self.handler.claim_one_time_keys(
- {"one_time_keys": {local_user: {device_id: "alg1"}}}, timeout=None
- )
- )
- self.assertEqual(
- res,
- {"failures": {}, "one_time_keys": {local_user: {device_id: fallback_key}}},
- )
- # we shouldn't have any unused fallback keys again
- res = self.get_success(
- self.store.get_e2e_unused_fallback_key_types(local_user, device_id)
- )
- self.assertEqual(res, [])
- # claiming an OTK again should return the same fallback key
- res = self.get_success(
- self.handler.claim_one_time_keys(
- {"one_time_keys": {local_user: {device_id: "alg1"}}}, timeout=None
- )
- )
- self.assertEqual(
- res,
- {"failures": {}, "one_time_keys": {local_user: {device_id: fallback_key}}},
- )
- # if the user uploads a one-time key, the next claim should fetch the
- # one-time key, and then go back to the fallback
- self.get_success(
- self.handler.upload_keys_for_user(
- local_user, device_id, {"one_time_keys": otk}
- )
- )
- res = self.get_success(
- self.handler.claim_one_time_keys(
- {"one_time_keys": {local_user: {device_id: "alg1"}}}, timeout=None
- )
- )
- self.assertEqual(
- res,
- {"failures": {}, "one_time_keys": {local_user: {device_id: otk}}},
- )
- res = self.get_success(
- self.handler.claim_one_time_keys(
- {"one_time_keys": {local_user: {device_id: "alg1"}}}, timeout=None
- )
- )
- self.assertEqual(
- res,
- {"failures": {}, "one_time_keys": {local_user: {device_id: fallback_key}}},
- )
- def test_replace_master_key(self):
- """uploading a new signing key should make the old signing key unavailable"""
- local_user = "@boris:" + self.hs.hostname
- keys1 = {
- "master_key": {
- # private key: 2lonYOM6xYKdEsO+6KrC766xBcHnYnim1x/4LFGF8B0
- "user_id": local_user,
- "usage": ["master"],
- "keys": {
- "ed25519:nqOvzeuGWT/sRx3h7+MHoInYj3Uk2LD/unI9kDYcHwk": "nqOvzeuGWT/sRx3h7+MHoInYj3Uk2LD/unI9kDYcHwk"
- },
- }
- }
- self.get_success(self.handler.upload_signing_keys_for_user(local_user, keys1))
- keys2 = {
- "master_key": {
- # private key: 4TL4AjRYwDVwD3pqQzcor+ez/euOB1/q78aTJ+czDNs
- "user_id": local_user,
- "usage": ["master"],
- "keys": {
- "ed25519:Hq6gL+utB4ET+UvD5ci0kgAwsX6qP/zvf8v6OInU5iw": "Hq6gL+utB4ET+UvD5ci0kgAwsX6qP/zvf8v6OInU5iw"
- },
- }
- }
- self.get_success(self.handler.upload_signing_keys_for_user(local_user, keys2))
- devices = self.get_success(
- self.handler.query_devices({"device_keys": {local_user: []}}, 0, local_user)
- )
- self.assertDictEqual(devices["master_keys"], {local_user: keys2["master_key"]})
- def test_reupload_signatures(self):
- """re-uploading a signature should not fail"""
- local_user = "@boris:" + self.hs.hostname
- keys1 = {
- "master_key": {
- # private key: HvQBbU+hc2Zr+JP1sE0XwBe1pfZZEYtJNPJLZJtS+F8
- "user_id": local_user,
- "usage": ["master"],
- "keys": {
- "ed25519:EmkqvokUn8p+vQAGZitOk4PWjp7Ukp3txV2TbMPEiBQ": "EmkqvokUn8p+vQAGZitOk4PWjp7Ukp3txV2TbMPEiBQ"
- },
- },
- "self_signing_key": {
- # private key: 2lonYOM6xYKdEsO+6KrC766xBcHnYnim1x/4LFGF8B0
- "user_id": local_user,
- "usage": ["self_signing"],
- "keys": {
- "ed25519:nqOvzeuGWT/sRx3h7+MHoInYj3Uk2LD/unI9kDYcHwk": "nqOvzeuGWT/sRx3h7+MHoInYj3Uk2LD/unI9kDYcHwk"
- },
- },
- }
- master_signing_key = key.decode_signing_key_base64(
- "ed25519",
- "EmkqvokUn8p+vQAGZitOk4PWjp7Ukp3txV2TbMPEiBQ",
- "HvQBbU+hc2Zr+JP1sE0XwBe1pfZZEYtJNPJLZJtS+F8",
- )
- sign.sign_json(keys1["self_signing_key"], local_user, master_signing_key)
- signing_key = key.decode_signing_key_base64(
- "ed25519",
- "nqOvzeuGWT/sRx3h7+MHoInYj3Uk2LD/unI9kDYcHwk",
- "2lonYOM6xYKdEsO+6KrC766xBcHnYnim1x/4LFGF8B0",
- )
- self.get_success(self.handler.upload_signing_keys_for_user(local_user, keys1))
- # upload two device keys, which will be signed later by the self-signing key
- device_key_1 = {
- "user_id": local_user,
- "device_id": "abc",
- "algorithms": [
- "m.olm.curve25519-aes-sha2",
- RoomEncryptionAlgorithms.MEGOLM_V1_AES_SHA2,
- ],
- "keys": {
- "ed25519:abc": "base64+ed25519+key",
- "curve25519:abc": "base64+curve25519+key",
- },
- "signatures": {local_user: {"ed25519:abc": "base64+signature"}},
- }
- device_key_2 = {
- "user_id": local_user,
- "device_id": "def",
- "algorithms": [
- "m.olm.curve25519-aes-sha2",
- RoomEncryptionAlgorithms.MEGOLM_V1_AES_SHA2,
- ],
- "keys": {
- "ed25519:def": "base64+ed25519+key",
- "curve25519:def": "base64+curve25519+key",
- },
- "signatures": {local_user: {"ed25519:def": "base64+signature"}},
- }
- self.get_success(
- self.handler.upload_keys_for_user(
- local_user, "abc", {"device_keys": device_key_1}
- )
- )
- self.get_success(
- self.handler.upload_keys_for_user(
- local_user, "def", {"device_keys": device_key_2}
- )
- )
- # sign the first device key and upload it
- del device_key_1["signatures"]
- sign.sign_json(device_key_1, local_user, signing_key)
- self.get_success(
- self.handler.upload_signatures_for_device_keys(
- local_user, {local_user: {"abc": device_key_1}}
- )
- )
- # sign the second device key and upload both device keys. The server
- # should ignore the first device key since it already has a valid
- # signature for it
- del device_key_2["signatures"]
- sign.sign_json(device_key_2, local_user, signing_key)
- self.get_success(
- self.handler.upload_signatures_for_device_keys(
- local_user, {local_user: {"abc": device_key_1, "def": device_key_2}}
- )
- )
- device_key_1["signatures"][local_user]["ed25519:abc"] = "base64+signature"
- device_key_2["signatures"][local_user]["ed25519:def"] = "base64+signature"
- devices = self.get_success(
- self.handler.query_devices({"device_keys": {local_user: []}}, 0, local_user)
- )
- del devices["device_keys"][local_user]["abc"]["unsigned"]
- del devices["device_keys"][local_user]["def"]["unsigned"]
- self.assertDictEqual(devices["device_keys"][local_user]["abc"], device_key_1)
- self.assertDictEqual(devices["device_keys"][local_user]["def"], device_key_2)
- def test_self_signing_key_doesnt_show_up_as_device(self):
- """signing keys should be hidden when fetching a user's devices"""
- local_user = "@boris:" + self.hs.hostname
- keys1 = {
- "master_key": {
- # private key: 2lonYOM6xYKdEsO+6KrC766xBcHnYnim1x/4LFGF8B0
- "user_id": local_user,
- "usage": ["master"],
- "keys": {
- "ed25519:nqOvzeuGWT/sRx3h7+MHoInYj3Uk2LD/unI9kDYcHwk": "nqOvzeuGWT/sRx3h7+MHoInYj3Uk2LD/unI9kDYcHwk"
- },
- }
- }
- self.get_success(self.handler.upload_signing_keys_for_user(local_user, keys1))
- e = self.get_failure(
- self.hs.get_device_handler().check_device_registered(
- user_id=local_user,
- device_id="nqOvzeuGWT/sRx3h7+MHoInYj3Uk2LD/unI9kDYcHwk",
- initial_device_display_name="new display name",
- ),
- SynapseError,
- )
- res = e.value.code
- self.assertEqual(res, 400)
- res = self.get_success(self.handler.query_local_devices({local_user: None}))
- self.assertDictEqual(res, {local_user: {}})
- def test_upload_signatures(self):
- """should check signatures that are uploaded"""
- # set up a user with cross-signing keys and a device. This user will
- # try uploading signatures
- local_user = "@boris:" + self.hs.hostname
- device_id = "xyz"
- # private key: OMkooTr76ega06xNvXIGPbgvvxAOzmQncN8VObS7aBA
- device_pubkey = "NnHhnqiMFQkq969szYkooLaBAXW244ZOxgukCvm2ZeY"
- device_key = {
- "user_id": local_user,
- "device_id": device_id,
- "algorithms": [
- "m.olm.curve25519-aes-sha2",
- RoomEncryptionAlgorithms.MEGOLM_V1_AES_SHA2,
- ],
- "keys": {"curve25519:xyz": "curve25519+key", "ed25519:xyz": device_pubkey},
- "signatures": {local_user: {"ed25519:xyz": "something"}},
- }
- device_signing_key = key.decode_signing_key_base64(
- "ed25519", "xyz", "OMkooTr76ega06xNvXIGPbgvvxAOzmQncN8VObS7aBA"
- )
- self.get_success(
- self.handler.upload_keys_for_user(
- local_user, device_id, {"device_keys": device_key}
- )
- )
- # private key: 2lonYOM6xYKdEsO+6KrC766xBcHnYnim1x/4LFGF8B0
- master_pubkey = "nqOvzeuGWT/sRx3h7+MHoInYj3Uk2LD/unI9kDYcHwk"
- master_key = {
- "user_id": local_user,
- "usage": ["master"],
- "keys": {"ed25519:" + master_pubkey: master_pubkey},
- }
- master_signing_key = key.decode_signing_key_base64(
- "ed25519", master_pubkey, "2lonYOM6xYKdEsO+6KrC766xBcHnYnim1x/4LFGF8B0"
- )
- usersigning_pubkey = "Hq6gL+utB4ET+UvD5ci0kgAwsX6qP/zvf8v6OInU5iw"
- usersigning_key = {
- # private key: 4TL4AjRYwDVwD3pqQzcor+ez/euOB1/q78aTJ+czDNs
- "user_id": local_user,
- "usage": ["user_signing"],
- "keys": {"ed25519:" + usersigning_pubkey: usersigning_pubkey},
- }
- usersigning_signing_key = key.decode_signing_key_base64(
- "ed25519", usersigning_pubkey, "4TL4AjRYwDVwD3pqQzcor+ez/euOB1/q78aTJ+czDNs"
- )
- sign.sign_json(usersigning_key, local_user, master_signing_key)
- # private key: HvQBbU+hc2Zr+JP1sE0XwBe1pfZZEYtJNPJLZJtS+F8
- selfsigning_pubkey = "EmkqvokUn8p+vQAGZitOk4PWjp7Ukp3txV2TbMPEiBQ"
- selfsigning_key = {
- "user_id": local_user,
- "usage": ["self_signing"],
- "keys": {"ed25519:" + selfsigning_pubkey: selfsigning_pubkey},
- }
- selfsigning_signing_key = key.decode_signing_key_base64(
- "ed25519", selfsigning_pubkey, "HvQBbU+hc2Zr+JP1sE0XwBe1pfZZEYtJNPJLZJtS+F8"
- )
- sign.sign_json(selfsigning_key, local_user, master_signing_key)
- cross_signing_keys = {
- "master_key": master_key,
- "user_signing_key": usersigning_key,
- "self_signing_key": selfsigning_key,
- }
- self.get_success(
- self.handler.upload_signing_keys_for_user(local_user, cross_signing_keys)
- )
- # set up another user with a master key. This user will be signed by
- # the first user
- other_user = "@otherboris:" + self.hs.hostname
- other_master_pubkey = "fHZ3NPiKxoLQm5OoZbKa99SYxprOjNs4TwJUKP+twCM"
- other_master_key = {
- # private key: oyw2ZUx0O4GifbfFYM0nQvj9CL0b8B7cyN4FprtK8OI
- "user_id": other_user,
- "usage": ["master"],
- "keys": {"ed25519:" + other_master_pubkey: other_master_pubkey},
- }
- self.get_success(
- self.handler.upload_signing_keys_for_user(
- other_user, {"master_key": other_master_key}
- )
- )
- # test various signature failures (see below)
- ret = self.get_success(
- self.handler.upload_signatures_for_device_keys(
- local_user,
- {
- local_user: {
- # fails because the signature is invalid
- # should fail with INVALID_SIGNATURE
- device_id: {
- "user_id": local_user,
- "device_id": device_id,
- "algorithms": [
- "m.olm.curve25519-aes-sha2",
- RoomEncryptionAlgorithms.MEGOLM_V1_AES_SHA2,
- ],
- "keys": {
- "curve25519:xyz": "curve25519+key",
- # private key: OMkooTr76ega06xNvXIGPbgvvxAOzmQncN8VObS7aBA
- "ed25519:xyz": device_pubkey,
- },
- "signatures": {
- local_user: {
- "ed25519:" + selfsigning_pubkey: "something"
- }
- },
- },
- # fails because device is unknown
- # should fail with NOT_FOUND
- "unknown": {
- "user_id": local_user,
- "device_id": "unknown",
- "signatures": {
- local_user: {
- "ed25519:" + selfsigning_pubkey: "something"
- }
- },
- },
- # fails because the signature is invalid
- # should fail with INVALID_SIGNATURE
- master_pubkey: {
- "user_id": local_user,
- "usage": ["master"],
- "keys": {"ed25519:" + master_pubkey: master_pubkey},
- "signatures": {
- local_user: {"ed25519:" + device_pubkey: "something"}
- },
- },
- },
- other_user: {
- # fails because the device is not the user's master-signing key
- # should fail with NOT_FOUND
- "unknown": {
- "user_id": other_user,
- "device_id": "unknown",
- "signatures": {
- local_user: {
- "ed25519:" + usersigning_pubkey: "something"
- }
- },
- },
- other_master_pubkey: {
- # fails because the key doesn't match what the server has
- # should fail with UNKNOWN
- "user_id": other_user,
- "usage": ["master"],
- "keys": {
- "ed25519:" + other_master_pubkey: other_master_pubkey
- },
- "something": "random",
- "signatures": {
- local_user: {
- "ed25519:" + usersigning_pubkey: "something"
- }
- },
- },
- },
- },
- )
- )
- user_failures = ret["failures"][local_user]
- self.assertEqual(user_failures[device_id]["errcode"], Codes.INVALID_SIGNATURE)
- self.assertEqual(
- user_failures[master_pubkey]["errcode"], Codes.INVALID_SIGNATURE
- )
- self.assertEqual(user_failures["unknown"]["errcode"], Codes.NOT_FOUND)
- other_user_failures = ret["failures"][other_user]
- self.assertEqual(other_user_failures["unknown"]["errcode"], Codes.NOT_FOUND)
- self.assertEqual(
- other_user_failures[other_master_pubkey]["errcode"], Codes.UNKNOWN
- )
- # test successful signatures
- del device_key["signatures"]
- sign.sign_json(device_key, local_user, selfsigning_signing_key)
- sign.sign_json(master_key, local_user, device_signing_key)
- sign.sign_json(other_master_key, local_user, usersigning_signing_key)
- ret = self.get_success(
- self.handler.upload_signatures_for_device_keys(
- local_user,
- {
- local_user: {device_id: device_key, master_pubkey: master_key},
- other_user: {other_master_pubkey: other_master_key},
- },
- )
- )
- self.assertEqual(ret["failures"], {})
- # fetch the signed keys/devices and make sure that the signatures are there
- ret = self.get_success(
- self.handler.query_devices(
- {"device_keys": {local_user: [], other_user: []}}, 0, local_user
- )
- )
- self.assertEqual(
- ret["device_keys"][local_user]["xyz"]["signatures"][local_user][
- "ed25519:" + selfsigning_pubkey
- ],
- device_key["signatures"][local_user]["ed25519:" + selfsigning_pubkey],
- )
- self.assertEqual(
- ret["master_keys"][local_user]["signatures"][local_user][
- "ed25519:" + device_id
- ],
- master_key["signatures"][local_user]["ed25519:" + device_id],
- )
- self.assertEqual(
- ret["master_keys"][other_user]["signatures"][local_user][
- "ed25519:" + usersigning_pubkey
- ],
- other_master_key["signatures"][local_user]["ed25519:" + usersigning_pubkey],
- )
|