|
@@ -16,12 +16,14 @@
|
|
|
import hashlib
|
|
|
import logging
|
|
|
import os
|
|
|
-from typing import Any, Dict
|
|
|
+from typing import Any, Dict, Iterator, List, Optional
|
|
|
|
|
|
import attr
|
|
|
import jsonschema
|
|
|
from signedjson.key import (
|
|
|
NACL_ED25519,
|
|
|
+ SigningKey,
|
|
|
+ VerifyKey,
|
|
|
decode_signing_key_base64,
|
|
|
decode_verify_key_bytes,
|
|
|
generate_signing_key,
|
|
@@ -31,6 +33,7 @@ from signedjson.key import (
|
|
|
)
|
|
|
from unpaddedbase64 import decode_base64
|
|
|
|
|
|
+from synapse.types import JsonDict
|
|
|
from synapse.util.stringutils import random_string, random_string_with_symbols
|
|
|
|
|
|
from ._base import Config, ConfigError
|
|
@@ -81,14 +84,13 @@ To suppress this warning and continue using 'matrix.org', admins should set
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
-@attr.s
|
|
|
+@attr.s(slots=True, auto_attribs=True)
|
|
|
class TrustedKeyServer:
|
|
|
- # string: name of the server.
|
|
|
- server_name = attr.ib()
|
|
|
+ # name of the server.
|
|
|
+ server_name: str
|
|
|
|
|
|
- # dict[str,VerifyKey]|None: map from key id to key object, or None to disable
|
|
|
- # signature verification.
|
|
|
- verify_keys = attr.ib(default=None)
|
|
|
+ # map from key id to key object, or None to disable signature verification.
|
|
|
+ verify_keys: Optional[Dict[str, VerifyKey]] = None
|
|
|
|
|
|
|
|
|
class KeyConfig(Config):
|
|
@@ -279,15 +281,15 @@ class KeyConfig(Config):
|
|
|
% locals()
|
|
|
)
|
|
|
|
|
|
- def read_signing_keys(self, signing_key_path, name):
|
|
|
+ def read_signing_keys(self, signing_key_path: str, name: str) -> List[SigningKey]:
|
|
|
"""Read the signing keys in the given path.
|
|
|
|
|
|
Args:
|
|
|
- signing_key_path (str)
|
|
|
- name (str): Associated config key name
|
|
|
+ signing_key_path
|
|
|
+ name: Associated config key name
|
|
|
|
|
|
Returns:
|
|
|
- list[SigningKey]
|
|
|
+ The signing keys read from the given path.
|
|
|
"""
|
|
|
|
|
|
signing_keys = self.read_file(signing_key_path, name)
|
|
@@ -296,7 +298,9 @@ class KeyConfig(Config):
|
|
|
except Exception as e:
|
|
|
raise ConfigError("Error reading %s: %s" % (name, str(e)))
|
|
|
|
|
|
- def read_old_signing_keys(self, old_signing_keys):
|
|
|
+ def read_old_signing_keys(
|
|
|
+ self, old_signing_keys: Optional[JsonDict]
|
|
|
+ ) -> Dict[str, VerifyKey]:
|
|
|
if old_signing_keys is None:
|
|
|
return {}
|
|
|
keys = {}
|
|
@@ -340,7 +344,7 @@ class KeyConfig(Config):
|
|
|
write_signing_keys(signing_key_file, (key,))
|
|
|
|
|
|
|
|
|
-def _perspectives_to_key_servers(config):
|
|
|
+def _perspectives_to_key_servers(config: JsonDict) -> Iterator[JsonDict]:
|
|
|
"""Convert old-style 'perspectives' configs into new-style 'trusted_key_servers'
|
|
|
|
|
|
Returns an iterable of entries to add to trusted_key_servers.
|
|
@@ -402,7 +406,9 @@ TRUSTED_KEY_SERVERS_SCHEMA = {
|
|
|
}
|
|
|
|
|
|
|
|
|
-def _parse_key_servers(key_servers, federation_verify_certificates):
|
|
|
+def _parse_key_servers(
|
|
|
+ key_servers: List[Any], federation_verify_certificates: bool
|
|
|
+) -> Iterator[TrustedKeyServer]:
|
|
|
try:
|
|
|
jsonschema.validate(key_servers, TRUSTED_KEY_SERVERS_SCHEMA)
|
|
|
except jsonschema.ValidationError as e:
|
|
@@ -444,7 +450,7 @@ def _parse_key_servers(key_servers, federation_verify_certificates):
|
|
|
yield result
|
|
|
|
|
|
|
|
|
-def _assert_keyserver_has_verify_keys(trusted_key_server):
|
|
|
+def _assert_keyserver_has_verify_keys(trusted_key_server: TrustedKeyServer) -> None:
|
|
|
if not trusted_key_server.verify_keys:
|
|
|
raise ConfigError(INSECURE_NOTARY_ERROR)
|
|
|
|