123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272 |
- # Copyright 2014-2016 OpenMarket Ltd
- # Copyright 2019 New Vector Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import logging
- from service_identity import VerificationError
- from service_identity.pyopenssl import verify_hostname, verify_ip_address
- from zope.interface import implementer
- from OpenSSL import SSL, crypto
- from twisted.internet._sslverify import _defaultCurveName
- from twisted.internet.abstract import isIPAddress, isIPv6Address
- from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
- from twisted.internet.ssl import (
- CertificateOptions,
- ContextFactory,
- TLSVersion,
- platformTrust,
- )
- from twisted.protocols.tls import TLSMemoryBIOProtocol
- from twisted.python.failure import Failure
- from twisted.web.iweb import IPolicyForHTTPS
- from synapse.config.homeserver import HomeServerConfig
- logger = logging.getLogger(__name__)
- _TLS_VERSION_MAP = {
- "1": TLSVersion.TLSv1_0,
- "1.1": TLSVersion.TLSv1_1,
- "1.2": TLSVersion.TLSv1_2,
- "1.3": TLSVersion.TLSv1_3,
- }
- class ServerContextFactory(ContextFactory):
- """Factory for PyOpenSSL SSL contexts that are used to handle incoming
- connections.
- TODO: replace this with an implementation of IOpenSSLServerConnectionCreator,
- per https://github.com/matrix-org/synapse/issues/1691
- """
- def __init__(self, config: HomeServerConfig):
- # TODO: once pyOpenSSL exposes TLS_METHOD and SSL_CTX_set_min_proto_version,
- # switch to those (see https://github.com/pyca/cryptography/issues/5379).
- #
- # note that, despite the confusing name, SSLv23_METHOD does *not* enforce SSLv2
- # or v3, but is a synonym for TLS_METHOD, which allows the client and server
- # to negotiate an appropriate version of TLS constrained by the version options
- # set with context.set_options.
- #
- self._context = SSL.Context(SSL.SSLv23_METHOD)
- self.configure_context(self._context, config)
- @staticmethod
- def configure_context(context: SSL.Context, config: HomeServerConfig) -> None:
- try:
- _ecCurve = crypto.get_elliptic_curve(_defaultCurveName)
- context.set_tmp_ecdh(_ecCurve)
- except Exception:
- logger.exception("Failed to enable elliptic curve for TLS")
- context.set_options(
- SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3 | SSL.OP_NO_TLSv1 | SSL.OP_NO_TLSv1_1
- )
- context.use_certificate_chain_file(config.tls.tls_certificate_file)
- assert config.tls.tls_private_key is not None
- context.use_privatekey(config.tls.tls_private_key)
- # https://hynek.me/articles/hardening-your-web-servers-ssl-ciphers/
- context.set_cipher_list(
- b"ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES256:ECDH+AES128:!aNULL:!SHA1:!AESCCM"
- )
- def getContext(self) -> SSL.Context:
- return self._context
- @implementer(IPolicyForHTTPS)
- class FederationPolicyForHTTPS:
- """Factory for Twisted SSLClientConnectionCreators that are used to make connections
- to remote servers for federation.
- Uses one of two OpenSSL context objects for all connections, depending on whether
- we should do SSL certificate verification.
- get_options decides whether we should do SSL certificate verification and
- constructs an SSLClientConnectionCreator factory accordingly.
- """
- def __init__(self, config: HomeServerConfig):
- self._config = config
- # Check if we're using a custom list of a CA certificates
- trust_root = config.tls.federation_ca_trust_root
- if trust_root is None:
- # Use CA root certs provided by OpenSSL
- trust_root = platformTrust()
- # "insecurelyLowerMinimumTo" is the argument that will go lower than
- # Twisted's default, which is why it is marked as "insecure" (since
- # Twisted's defaults are reasonably secure). But, since Twisted is
- # moving to TLS 1.2 by default, we want to respect the config option if
- # it is set to 1.0 (which the alternate option, raiseMinimumTo, will not
- # let us do).
- minTLS = _TLS_VERSION_MAP[config.tls.federation_client_minimum_tls_version]
- _verify_ssl = CertificateOptions(
- trustRoot=trust_root, insecurelyLowerMinimumTo=minTLS
- )
- self._verify_ssl_context = _verify_ssl.getContext()
- self._verify_ssl_context.set_info_callback(_context_info_cb)
- _no_verify_ssl = CertificateOptions(insecurelyLowerMinimumTo=minTLS)
- self._no_verify_ssl_context = _no_verify_ssl.getContext()
- self._no_verify_ssl_context.set_info_callback(_context_info_cb)
- self._should_verify = self._config.tls.federation_verify_certificates
- self._federation_certificate_verification_whitelist = (
- self._config.tls.federation_certificate_verification_whitelist
- )
- def get_options(self, host: bytes) -> IOpenSSLClientConnectionCreator:
- # IPolicyForHTTPS.get_options takes bytes, but we want to compare
- # against the str whitelist. The hostnames in the whitelist are already
- # IDNA-encoded like the hosts will be here.
- ascii_host = host.decode("ascii")
- # Check if certificate verification has been enabled
- should_verify = self._should_verify
- # Check if we've disabled certificate verification for this host
- if self._should_verify:
- for regex in self._federation_certificate_verification_whitelist:
- if regex.match(ascii_host):
- should_verify = False
- break
- ssl_context = (
- self._verify_ssl_context if should_verify else self._no_verify_ssl_context
- )
- return SSLClientConnectionCreator(host, ssl_context, should_verify)
- def creatorForNetloc(
- self, hostname: bytes, port: int
- ) -> IOpenSSLClientConnectionCreator:
- """Implements the IPolicyForHTTPS interface so that this can be passed
- directly to agents.
- """
- return self.get_options(hostname)
- @implementer(IPolicyForHTTPS)
- class RegularPolicyForHTTPS:
- """Factory for Twisted SSLClientConnectionCreators that are used to make connections
- to remote servers, for other than federation.
- Always uses the same OpenSSL context object, which uses the default OpenSSL CA
- trust root.
- """
- def __init__(self) -> None:
- trust_root = platformTrust()
- self._ssl_context = CertificateOptions(trustRoot=trust_root).getContext()
- self._ssl_context.set_info_callback(_context_info_cb)
- def creatorForNetloc(
- self, hostname: bytes, port: int
- ) -> IOpenSSLClientConnectionCreator:
- return SSLClientConnectionCreator(hostname, self._ssl_context, True)
- def _context_info_cb(ssl_connection: SSL.Connection, where: int, ret: int) -> None:
- """The 'information callback' for our openssl context objects.
- Note: Once this is set as the info callback on a Context object, the Context should
- only be used with the SSLClientConnectionCreator.
- """
- # we assume that the app_data on the connection object has been set to
- # a TLSMemoryBIOProtocol object. (This is done by SSLClientConnectionCreator)
- tls_protocol = ssl_connection.get_app_data()
- try:
- # ... we further assume that SSLClientConnectionCreator has set the
- # '_synapse_tls_verifier' attribute to a ConnectionVerifier object.
- tls_protocol._synapse_tls_verifier.verify_context_info_cb(ssl_connection, where)
- except BaseException: # taken from the twisted implementation
- logger.exception("Error during info_callback")
- f = Failure()
- tls_protocol.failVerification(f)
- @implementer(IOpenSSLClientConnectionCreator)
- class SSLClientConnectionCreator:
- """Creates openssl connection objects for client connections.
- Replaces twisted.internet.ssl.ClientTLSOptions
- """
- def __init__(self, hostname: bytes, ctx: SSL.Context, verify_certs: bool):
- self._ctx = ctx
- self._verifier = ConnectionVerifier(hostname, verify_certs)
- def clientConnectionForTLS(
- self, tls_protocol: TLSMemoryBIOProtocol
- ) -> SSL.Connection:
- context = self._ctx
- connection = SSL.Connection(context, None)
- # as per twisted.internet.ssl.ClientTLSOptions, we set the application
- # data to our TLSMemoryBIOProtocol...
- connection.set_app_data(tls_protocol)
- # ... and we also gut-wrench a '_synapse_tls_verifier' attribute into the
- # tls_protocol so that the SSL context's info callback has something to
- # call to do the cert verification.
- tls_protocol._synapse_tls_verifier = self._verifier # type: ignore[attr-defined]
- return connection
- class ConnectionVerifier:
- """Set the SNI, and do cert verification
- This is a thing which is attached to the TLSMemoryBIOProtocol, and is called by
- the ssl context's info callback.
- """
- # This code is based on twisted.internet.ssl.ClientTLSOptions.
- def __init__(self, hostname: bytes, verify_certs: bool):
- self._verify_certs = verify_certs
- _decoded = hostname.decode("ascii")
- if isIPAddress(_decoded) or isIPv6Address(_decoded):
- self._is_ip_address = True
- else:
- self._is_ip_address = False
- self._hostnameBytes = hostname
- self._hostnameASCII = self._hostnameBytes.decode("ascii")
- def verify_context_info_cb(
- self, ssl_connection: SSL.Connection, where: int
- ) -> None:
- if where & SSL.SSL_CB_HANDSHAKE_START and not self._is_ip_address:
- ssl_connection.set_tlsext_host_name(self._hostnameBytes)
- if where & SSL.SSL_CB_HANDSHAKE_DONE and self._verify_certs:
- try:
- if self._is_ip_address:
- verify_ip_address(ssl_connection, self._hostnameASCII)
- else:
- verify_hostname(ssl_connection, self._hostnameASCII)
- except VerificationError:
- f = Failure()
- tls_protocol = ssl_connection.get_app_data()
- tls_protocol.failVerification(f)
|