123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914 |
- # -*- coding: utf-8 -*-
- # Copyright 2014-2016 OpenMarket Ltd
- # Copyright 2017, 2018 New Vector Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import abc
- import logging
- import urllib
- from collections import defaultdict
- from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple
- import attr
- from signedjson.key import (
- decode_verify_key_bytes,
- encode_verify_key_base64,
- is_signing_algorithm_supported,
- )
- from signedjson.sign import (
- SignatureVerifyException,
- encode_canonical_json,
- signature_ids,
- verify_signed_json,
- )
- from unpaddedbase64 import decode_base64
- from twisted.internet import defer
- from synapse.api.errors import (
- Codes,
- HttpResponseException,
- RequestSendFailed,
- SynapseError,
- )
- from synapse.config.key import TrustedKeyServer
- from synapse.logging.context import (
- PreserveLoggingContext,
- make_deferred_yieldable,
- preserve_fn,
- run_in_background,
- )
- from synapse.storage.keys import FetchKeyResult
- from synapse.types import JsonDict
- from synapse.util import unwrapFirstError
- from synapse.util.async_helpers import yieldable_gather_results
- from synapse.util.metrics import Measure
- from synapse.util.retryutils import NotRetryingDestination
- if TYPE_CHECKING:
- from synapse.app.homeserver import HomeServer
- logger = logging.getLogger(__name__)
- @attr.s(slots=True, cmp=False)
- class VerifyJsonRequest:
- """
- A request to verify a JSON object.
- Attributes:
- server_name: The name of the server to verify against.
- json_object: The JSON object to verify.
- minimum_valid_until_ts: time at which we require the signing key to
- be valid. (0 implies we don't care)
- request_name: The name of the request.
- key_ids: The set of key_ids to that could be used to verify the JSON object
- key_ready (Deferred[str, str, nacl.signing.VerifyKey]):
- A deferred (server_name, key_id, verify_key) tuple that resolves when
- a verify key has been fetched. The deferreds' callbacks are run with no
- logcontext.
- If we are unable to find a key which satisfies the request, the deferred
- errbacks with an M_UNAUTHORIZED SynapseError.
- """
- server_name = attr.ib(type=str)
- json_object = attr.ib(type=JsonDict)
- minimum_valid_until_ts = attr.ib(type=int)
- request_name = attr.ib(type=str)
- key_ids = attr.ib(init=False, type=List[str])
- key_ready = attr.ib(default=attr.Factory(defer.Deferred), type=defer.Deferred)
- def __attrs_post_init__(self):
- self.key_ids = signature_ids(self.json_object, self.server_name)
- class KeyLookupError(ValueError):
- pass
- class Keyring:
- def __init__(
- self, hs: "HomeServer", key_fetchers: "Optional[Iterable[KeyFetcher]]" = None
- ):
- self.clock = hs.get_clock()
- if key_fetchers is None:
- key_fetchers = (
- StoreKeyFetcher(hs),
- PerspectivesKeyFetcher(hs),
- ServerKeyFetcher(hs),
- )
- self._key_fetchers = key_fetchers
- # map from server name to Deferred. Has an entry for each server with
- # an ongoing key download; the Deferred completes once the download
- # completes.
- #
- # These are regular, logcontext-agnostic Deferreds.
- self.key_downloads = {} # type: Dict[str, defer.Deferred]
- def verify_json_for_server(
- self,
- server_name: str,
- json_object: JsonDict,
- validity_time: int,
- request_name: str,
- ) -> defer.Deferred:
- """Verify that a JSON object has been signed by a given server
- Args:
- server_name: name of the server which must have signed this object
- json_object: object to be checked
- validity_time: timestamp at which we require the signing key to
- be valid. (0 implies we don't care)
- request_name: an identifier for this json object (eg, an event id)
- for logging.
- Returns:
- Deferred[None]: completes if the the object was correctly signed, otherwise
- errbacks with an error
- """
- req = VerifyJsonRequest(server_name, json_object, validity_time, request_name)
- requests = (req,)
- return make_deferred_yieldable(self._verify_objects(requests)[0])
- def verify_json_objects_for_server(
- self, server_and_json: Iterable[Tuple[str, dict, int, str]]
- ) -> List[defer.Deferred]:
- """Bulk verifies signatures of json objects, bulk fetching keys as
- necessary.
- Args:
- server_and_json:
- Iterable of (server_name, json_object, validity_time, request_name)
- tuples.
- validity_time is a timestamp at which the signing key must be
- valid.
- request_name is an identifier for this json object (eg, an event id)
- for logging.
- Returns:
- List<Deferred[None]>: for each input triplet, a deferred indicating success
- or failure to verify each json object's signature for the given
- server_name. The deferreds run their callbacks in the sentinel
- logcontext.
- """
- return self._verify_objects(
- VerifyJsonRequest(server_name, json_object, validity_time, request_name)
- for server_name, json_object, validity_time, request_name in server_and_json
- )
- def _verify_objects(
- self, verify_requests: Iterable[VerifyJsonRequest]
- ) -> List[defer.Deferred]:
- """Does the work of verify_json_[objects_]for_server
- Args:
- verify_requests: Iterable of verification requests.
- Returns:
- List<Deferred[None]>: for each input item, a deferred indicating success
- or failure to verify each json object's signature for the given
- server_name. The deferreds run their callbacks in the sentinel
- logcontext.
- """
- # a list of VerifyJsonRequests which are awaiting a key lookup
- key_lookups = []
- handle = preserve_fn(_handle_key_deferred)
- def process(verify_request: VerifyJsonRequest) -> defer.Deferred:
- """Process an entry in the request list
- Adds a key request to key_lookups, and returns a deferred which
- will complete or fail (in the sentinel context) when verification completes.
- """
- if not verify_request.key_ids:
- return defer.fail(
- SynapseError(
- 400,
- "Not signed by %s" % (verify_request.server_name,),
- Codes.UNAUTHORIZED,
- )
- )
- logger.debug(
- "Verifying %s for %s with key_ids %s, min_validity %i",
- verify_request.request_name,
- verify_request.server_name,
- verify_request.key_ids,
- verify_request.minimum_valid_until_ts,
- )
- # add the key request to the queue, but don't start it off yet.
- key_lookups.append(verify_request)
- # now run _handle_key_deferred, which will wait for the key request
- # to complete and then do the verification.
- #
- # We want _handle_key_request to log to the right context, so we
- # wrap it with preserve_fn (aka run_in_background)
- return handle(verify_request)
- results = [process(r) for r in verify_requests]
- if key_lookups:
- run_in_background(self._start_key_lookups, key_lookups)
- return results
- async def _start_key_lookups(
- self, verify_requests: List[VerifyJsonRequest]
- ) -> None:
- """Sets off the key fetches for each verify request
- Once each fetch completes, verify_request.key_ready will be resolved.
- Args:
- verify_requests:
- """
- try:
- # map from server name to a set of outstanding request ids
- server_to_request_ids = {} # type: Dict[str, Set[int]]
- for verify_request in verify_requests:
- server_name = verify_request.server_name
- request_id = id(verify_request)
- server_to_request_ids.setdefault(server_name, set()).add(request_id)
- # Wait for any previous lookups to complete before proceeding.
- await self.wait_for_previous_lookups(server_to_request_ids.keys())
- # take out a lock on each of the servers by sticking a Deferred in
- # key_downloads
- for server_name in server_to_request_ids.keys():
- self.key_downloads[server_name] = defer.Deferred()
- logger.debug("Got key lookup lock on %s", server_name)
- # When we've finished fetching all the keys for a given server_name,
- # drop the lock by resolving the deferred in key_downloads.
- def drop_server_lock(server_name):
- d = self.key_downloads.pop(server_name)
- d.callback(None)
- def lookup_done(res, verify_request):
- server_name = verify_request.server_name
- server_requests = server_to_request_ids[server_name]
- server_requests.remove(id(verify_request))
- # if there are no more requests for this server, we can drop the lock.
- if not server_requests:
- logger.debug("Releasing key lookup lock on %s", server_name)
- drop_server_lock(server_name)
- return res
- for verify_request in verify_requests:
- verify_request.key_ready.addBoth(lookup_done, verify_request)
- # Actually start fetching keys.
- self._get_server_verify_keys(verify_requests)
- except Exception:
- logger.exception("Error starting key lookups")
- async def wait_for_previous_lookups(self, server_names: Iterable[str]) -> None:
- """Waits for any previous key lookups for the given servers to finish.
- Args:
- server_names: list of servers which we want to look up
- Returns:
- Resolves once all key lookups for the given servers have
- completed. Follows the synapse rules of logcontext preservation.
- """
- loop_count = 1
- while True:
- wait_on = [
- (server_name, self.key_downloads[server_name])
- for server_name in server_names
- if server_name in self.key_downloads
- ]
- if not wait_on:
- break
- logger.info(
- "Waiting for existing lookups for %s to complete [loop %i]",
- [w[0] for w in wait_on],
- loop_count,
- )
- with PreserveLoggingContext():
- await defer.DeferredList((w[1] for w in wait_on))
- loop_count += 1
- def _get_server_verify_keys(self, verify_requests: List[VerifyJsonRequest]) -> None:
- """Tries to find at least one key for each verify request
- For each verify_request, verify_request.key_ready is called back with
- params (server_name, key_id, VerifyKey) if a key is found, or errbacked
- with a SynapseError if none of the keys are found.
- Args:
- verify_requests: list of verify requests
- """
- remaining_requests = {rq for rq in verify_requests if not rq.key_ready.called}
- async def do_iterations():
- try:
- with Measure(self.clock, "get_server_verify_keys"):
- for f in self._key_fetchers:
- if not remaining_requests:
- return
- await self._attempt_key_fetches_with_fetcher(
- f, remaining_requests
- )
- # look for any requests which weren't satisfied
- while remaining_requests:
- verify_request = remaining_requests.pop()
- rq_str = (
- "VerifyJsonRequest(server=%s, key_ids=%s, min_valid=%i)"
- % (
- verify_request.server_name,
- verify_request.key_ids,
- verify_request.minimum_valid_until_ts,
- )
- )
- # If we run the errback immediately, it may cancel our
- # loggingcontext while we are still in it, so instead we
- # schedule it for the next time round the reactor.
- #
- # (this also ensures that we don't get a stack overflow if we
- # has a massive queue of lookups waiting for this server).
- self.clock.call_later(
- 0,
- verify_request.key_ready.errback,
- SynapseError(
- 401,
- "Failed to find any key to satisfy %s" % (rq_str,),
- Codes.UNAUTHORIZED,
- ),
- )
- except Exception as err:
- # we don't really expect to get here, because any errors should already
- # have been caught and logged. But if we do, let's log the error and make
- # sure that all of the deferreds are resolved.
- logger.error("Unexpected error in _get_server_verify_keys: %s", err)
- with PreserveLoggingContext():
- for verify_request in remaining_requests:
- if not verify_request.key_ready.called:
- verify_request.key_ready.errback(err)
- run_in_background(do_iterations)
- async def _attempt_key_fetches_with_fetcher(
- self, fetcher: "KeyFetcher", remaining_requests: Set[VerifyJsonRequest]
- ):
- """Use a key fetcher to attempt to satisfy some key requests
- Args:
- fetcher: fetcher to use to fetch the keys
- remaining_requests: outstanding key requests.
- Any successfully-completed requests will be removed from the list.
- """
- # The keys to fetch.
- # server_name -> key_id -> min_valid_ts
- missing_keys = defaultdict(dict) # type: Dict[str, Dict[str, int]]
- for verify_request in remaining_requests:
- # any completed requests should already have been removed
- assert not verify_request.key_ready.called
- keys_for_server = missing_keys[verify_request.server_name]
- for key_id in verify_request.key_ids:
- # If we have several requests for the same key, then we only need to
- # request that key once, but we should do so with the greatest
- # min_valid_until_ts of the requests, so that we can satisfy all of
- # the requests.
- keys_for_server[key_id] = max(
- keys_for_server.get(key_id, -1),
- verify_request.minimum_valid_until_ts,
- )
- results = await fetcher.get_keys(missing_keys)
- completed = []
- for verify_request in remaining_requests:
- server_name = verify_request.server_name
- # see if any of the keys we got this time are sufficient to
- # complete this VerifyJsonRequest.
- result_keys = results.get(server_name, {})
- for key_id in verify_request.key_ids:
- fetch_key_result = result_keys.get(key_id)
- if not fetch_key_result:
- # we didn't get a result for this key
- continue
- if (
- fetch_key_result.valid_until_ts
- < verify_request.minimum_valid_until_ts
- ):
- # key was not valid at this point
- continue
- # we have a valid key for this request. If we run the callback
- # immediately, it may cancel our loggingcontext while we are still in
- # it, so instead we schedule it for the next time round the reactor.
- #
- # (this also ensures that we don't get a stack overflow if we had
- # a massive queue of lookups waiting for this server).
- logger.debug(
- "Found key %s:%s for %s",
- server_name,
- key_id,
- verify_request.request_name,
- )
- self.clock.call_later(
- 0,
- verify_request.key_ready.callback,
- (server_name, key_id, fetch_key_result.verify_key),
- )
- completed.append(verify_request)
- break
- remaining_requests.difference_update(completed)
- class KeyFetcher(metaclass=abc.ABCMeta):
- @abc.abstractmethod
- async def get_keys(
- self, keys_to_fetch: Dict[str, Dict[str, int]]
- ) -> Dict[str, Dict[str, FetchKeyResult]]:
- """
- Args:
- keys_to_fetch:
- the keys to be fetched. server_name -> key_id -> min_valid_ts
- Returns:
- Map from server_name -> key_id -> FetchKeyResult
- """
- raise NotImplementedError
- class StoreKeyFetcher(KeyFetcher):
- """KeyFetcher impl which fetches keys from our data store"""
- def __init__(self, hs: "HomeServer"):
- self.store = hs.get_datastore()
- async def get_keys(
- self, keys_to_fetch: Dict[str, Dict[str, int]]
- ) -> Dict[str, Dict[str, FetchKeyResult]]:
- """see KeyFetcher.get_keys"""
- key_ids_to_fetch = (
- (server_name, key_id)
- for server_name, keys_for_server in keys_to_fetch.items()
- for key_id in keys_for_server.keys()
- )
- res = await self.store.get_server_verify_keys(key_ids_to_fetch)
- keys = {} # type: Dict[str, Dict[str, FetchKeyResult]]
- for (server_name, key_id), key in res.items():
- keys.setdefault(server_name, {})[key_id] = key
- return keys
- class BaseV2KeyFetcher(KeyFetcher):
- def __init__(self, hs: "HomeServer"):
- self.store = hs.get_datastore()
- self.config = hs.get_config()
- async def process_v2_response(
- self, from_server: str, response_json: JsonDict, time_added_ms: int
- ) -> Dict[str, FetchKeyResult]:
- """Parse a 'Server Keys' structure from the result of a /key request
- This is used to parse either the entirety of the response from
- GET /_matrix/key/v2/server, or a single entry from the list returned by
- POST /_matrix/key/v2/query.
- Checks that each signature in the response that claims to come from the origin
- server is valid, and that there is at least one such signature.
- Stores the json in server_keys_json so that it can be used for future responses
- to /_matrix/key/v2/query.
- Args:
- from_server: the name of the server producing this result: either
- the origin server for a /_matrix/key/v2/server request, or the notary
- for a /_matrix/key/v2/query.
- response_json: the json-decoded Server Keys response object
- time_added_ms: the timestamp to record in server_keys_json
- Returns:
- Map from key_id to result object
- """
- ts_valid_until_ms = response_json["valid_until_ts"]
- # start by extracting the keys from the response, since they may be required
- # to validate the signature on the response.
- verify_keys = {}
- for key_id, key_data in response_json["verify_keys"].items():
- if is_signing_algorithm_supported(key_id):
- key_base64 = key_data["key"]
- key_bytes = decode_base64(key_base64)
- verify_key = decode_verify_key_bytes(key_id, key_bytes)
- verify_keys[key_id] = FetchKeyResult(
- verify_key=verify_key, valid_until_ts=ts_valid_until_ms
- )
- server_name = response_json["server_name"]
- verified = False
- for key_id in response_json["signatures"].get(server_name, {}):
- key = verify_keys.get(key_id)
- if not key:
- # the key may not be present in verify_keys if:
- # * we got the key from the notary server, and:
- # * the key belongs to the notary server, and:
- # * the notary server is using a different key to sign notary
- # responses.
- continue
- verify_signed_json(response_json, server_name, key.verify_key)
- verified = True
- break
- if not verified:
- raise KeyLookupError(
- "Key response for %s is not signed by the origin server"
- % (server_name,)
- )
- for key_id, key_data in response_json["old_verify_keys"].items():
- if is_signing_algorithm_supported(key_id):
- key_base64 = key_data["key"]
- key_bytes = decode_base64(key_base64)
- verify_key = decode_verify_key_bytes(key_id, key_bytes)
- verify_keys[key_id] = FetchKeyResult(
- verify_key=verify_key, valid_until_ts=key_data["expired_ts"]
- )
- key_json_bytes = encode_canonical_json(response_json)
- await make_deferred_yieldable(
- defer.gatherResults(
- [
- run_in_background(
- self.store.store_server_keys_json,
- server_name=server_name,
- key_id=key_id,
- from_server=from_server,
- ts_now_ms=time_added_ms,
- ts_expires_ms=ts_valid_until_ms,
- key_json_bytes=key_json_bytes,
- )
- for key_id in verify_keys
- ],
- consumeErrors=True,
- ).addErrback(unwrapFirstError)
- )
- return verify_keys
- class PerspectivesKeyFetcher(BaseV2KeyFetcher):
- """KeyFetcher impl which fetches keys from the "perspectives" servers"""
- def __init__(self, hs: "HomeServer"):
- super().__init__(hs)
- self.clock = hs.get_clock()
- self.client = hs.get_federation_http_client()
- self.key_servers = self.config.key_servers
- async def get_keys(
- self, keys_to_fetch: Dict[str, Dict[str, int]]
- ) -> Dict[str, Dict[str, FetchKeyResult]]:
- """see KeyFetcher.get_keys"""
- async def get_key(key_server: TrustedKeyServer) -> Dict:
- try:
- return await self.get_server_verify_key_v2_indirect(
- keys_to_fetch, key_server
- )
- except KeyLookupError as e:
- logger.warning(
- "Key lookup failed from %r: %s", key_server.server_name, e
- )
- except Exception as e:
- logger.exception(
- "Unable to get key from %r: %s %s",
- key_server.server_name,
- type(e).__name__,
- str(e),
- )
- return {}
- results = await make_deferred_yieldable(
- defer.gatherResults(
- [run_in_background(get_key, server) for server in self.key_servers],
- consumeErrors=True,
- ).addErrback(unwrapFirstError)
- )
- union_of_keys = {} # type: Dict[str, Dict[str, FetchKeyResult]]
- for result in results:
- for server_name, keys in result.items():
- union_of_keys.setdefault(server_name, {}).update(keys)
- return union_of_keys
- async def get_server_verify_key_v2_indirect(
- self, keys_to_fetch: Dict[str, Dict[str, int]], key_server: TrustedKeyServer
- ) -> Dict[str, Dict[str, FetchKeyResult]]:
- """
- Args:
- keys_to_fetch:
- the keys to be fetched. server_name -> key_id -> min_valid_ts
- key_server: notary server to query for the keys
- Returns:
- Map from server_name -> key_id -> FetchKeyResult
- Raises:
- KeyLookupError if there was an error processing the entire response from
- the server
- """
- perspective_name = key_server.server_name
- logger.info(
- "Requesting keys %s from notary server %s",
- keys_to_fetch.items(),
- perspective_name,
- )
- try:
- query_response = await self.client.post_json(
- destination=perspective_name,
- path="/_matrix/key/v2/query",
- data={
- "server_keys": {
- server_name: {
- key_id: {"minimum_valid_until_ts": min_valid_ts}
- for key_id, min_valid_ts in server_keys.items()
- }
- for server_name, server_keys in keys_to_fetch.items()
- }
- },
- )
- except (NotRetryingDestination, RequestSendFailed) as e:
- # these both have str() representations which we can't really improve upon
- raise KeyLookupError(str(e))
- except HttpResponseException as e:
- raise KeyLookupError("Remote server returned an error: %s" % (e,))
- keys = {} # type: Dict[str, Dict[str, FetchKeyResult]]
- added_keys = [] # type: List[Tuple[str, str, FetchKeyResult]]
- time_now_ms = self.clock.time_msec()
- assert isinstance(query_response, dict)
- for response in query_response["server_keys"]:
- # do this first, so that we can give useful errors thereafter
- server_name = response.get("server_name")
- if not isinstance(server_name, str):
- raise KeyLookupError(
- "Malformed response from key notary server %s: invalid server_name"
- % (perspective_name,)
- )
- try:
- self._validate_perspectives_response(key_server, response)
- processed_response = await self.process_v2_response(
- perspective_name, response, time_added_ms=time_now_ms
- )
- except KeyLookupError as e:
- logger.warning(
- "Error processing response from key notary server %s for origin "
- "server %s: %s",
- perspective_name,
- server_name,
- e,
- )
- # we continue to process the rest of the response
- continue
- added_keys.extend(
- (server_name, key_id, key) for key_id, key in processed_response.items()
- )
- keys.setdefault(server_name, {}).update(processed_response)
- await self.store.store_server_verify_keys(
- perspective_name, time_now_ms, added_keys
- )
- return keys
- def _validate_perspectives_response(
- self, key_server: TrustedKeyServer, response: JsonDict
- ) -> None:
- """Optionally check the signature on the result of a /key/query request
- Args:
- key_server: the notary server that produced this result
- response: the json-decoded Server Keys response object
- """
- perspective_name = key_server.server_name
- perspective_keys = key_server.verify_keys
- if perspective_keys is None:
- # signature checking is disabled on this server
- return
- if (
- "signatures" not in response
- or perspective_name not in response["signatures"]
- ):
- raise KeyLookupError("Response not signed by the notary server")
- verified = False
- for key_id in response["signatures"][perspective_name]:
- if key_id in perspective_keys:
- verify_signed_json(response, perspective_name, perspective_keys[key_id])
- verified = True
- if not verified:
- raise KeyLookupError(
- "Response not signed with a known key: signed with: %r, known keys: %r"
- % (
- list(response["signatures"][perspective_name].keys()),
- list(perspective_keys.keys()),
- )
- )
- class ServerKeyFetcher(BaseV2KeyFetcher):
- """KeyFetcher impl which fetches keys from the origin servers"""
- def __init__(self, hs: "HomeServer"):
- super().__init__(hs)
- self.clock = hs.get_clock()
- self.client = hs.get_federation_http_client()
- async def get_keys(
- self, keys_to_fetch: Dict[str, Dict[str, int]]
- ) -> Dict[str, Dict[str, FetchKeyResult]]:
- """
- Args:
- keys_to_fetch:
- the keys to be fetched. server_name -> key_ids
- Returns:
- Map from server_name -> key_id -> FetchKeyResult
- """
- results = {}
- async def get_key(key_to_fetch_item: Tuple[str, Dict[str, int]]) -> None:
- server_name, key_ids = key_to_fetch_item
- try:
- keys = await self.get_server_verify_key_v2_direct(server_name, key_ids)
- results[server_name] = keys
- except KeyLookupError as e:
- logger.warning(
- "Error looking up keys %s from %s: %s", key_ids, server_name, e
- )
- except Exception:
- logger.exception("Error getting keys %s from %s", key_ids, server_name)
- await yieldable_gather_results(get_key, keys_to_fetch.items())
- return results
- async def get_server_verify_key_v2_direct(
- self, server_name: str, key_ids: Iterable[str]
- ) -> Dict[str, FetchKeyResult]:
- """
- Args:
- server_name:
- key_ids:
- Returns:
- Map from key ID to lookup result
- Raises:
- KeyLookupError if there was a problem making the lookup
- """
- keys = {} # type: Dict[str, FetchKeyResult]
- for requested_key_id in key_ids:
- # we may have found this key as a side-effect of asking for another.
- if requested_key_id in keys:
- continue
- time_now_ms = self.clock.time_msec()
- try:
- response = await self.client.get_json(
- destination=server_name,
- path="/_matrix/key/v2/server/"
- + urllib.parse.quote(requested_key_id),
- ignore_backoff=True,
- # we only give the remote server 10s to respond. It should be an
- # easy request to handle, so if it doesn't reply within 10s, it's
- # probably not going to.
- #
- # Furthermore, when we are acting as a notary server, we cannot
- # wait all day for all of the origin servers, as the requesting
- # server will otherwise time out before we can respond.
- #
- # (Note that get_json may make 4 attempts, so this can still take
- # almost 45 seconds to fetch the headers, plus up to another 60s to
- # read the response).
- timeout=10000,
- )
- except (NotRetryingDestination, RequestSendFailed) as e:
- # these both have str() representations which we can't really improve
- # upon
- raise KeyLookupError(str(e))
- except HttpResponseException as e:
- raise KeyLookupError("Remote server returned an error: %s" % (e,))
- assert isinstance(response, dict)
- if response["server_name"] != server_name:
- raise KeyLookupError(
- "Expected a response for server %r not %r"
- % (server_name, response["server_name"])
- )
- response_keys = await self.process_v2_response(
- from_server=server_name,
- response_json=response,
- time_added_ms=time_now_ms,
- )
- await self.store.store_server_verify_keys(
- server_name,
- time_now_ms,
- ((server_name, key_id, key) for key_id, key in response_keys.items()),
- )
- keys.update(response_keys)
- return keys
- async def _handle_key_deferred(verify_request: VerifyJsonRequest) -> None:
- """Waits for the key to become available, and then performs a verification
- Args:
- verify_request:
- Raises:
- SynapseError if there was a problem performing the verification
- """
- server_name = verify_request.server_name
- with PreserveLoggingContext():
- _, key_id, verify_key = await verify_request.key_ready
- json_object = verify_request.json_object
- try:
- verify_signed_json(json_object, server_name, verify_key)
- except SignatureVerifyException as e:
- logger.debug(
- "Error verifying signature for %s:%s:%s with key %s: %s",
- server_name,
- verify_key.alg,
- verify_key.version,
- encode_verify_key_base64(verify_key),
- str(e),
- )
- raise SynapseError(
- 401,
- "Invalid signature for server %s with key %s:%s: %s"
- % (server_name, verify_key.alg, verify_key.version, str(e)),
- Codes.UNAUTHORIZED,
- )
|