123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- # -*- coding: utf-8 -*-
- # Copyright 2019 New Vector Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import os.path
- import subprocess
- from zope.interface import implementer
- from OpenSSL import SSL
- from OpenSSL.SSL import Connection
- from twisted.internet.interfaces import IOpenSSLServerConnectionCreator
- def get_test_ca_cert_file():
- """Get the path to the test CA cert
- The keypair is generated with:
- openssl genrsa -out ca.key 2048
- openssl req -new -x509 -key ca.key -days 3650 -out ca.crt \
- -subj '/CN=synapse test CA'
- """
- return os.path.join(os.path.dirname(__file__), "ca.crt")
- def get_test_key_file():
- """get the path to the test key
- The key file is made with:
- openssl genrsa -out server.key 2048
- """
- return os.path.join(os.path.dirname(__file__), "server.key")
- cert_file_count = 0
- CONFIG_TEMPLATE = b"""\
- [default]
- basicConstraints = CA:FALSE
- keyUsage=nonRepudiation, digitalSignature, keyEncipherment
- subjectAltName = %(sanentries)s
- """
- def create_test_cert_file(sanlist):
- """build an x509 certificate file
- Args:
- sanlist: list[bytes]: a list of subjectAltName values for the cert
- Returns:
- str: the path to the file
- """
- global cert_file_count
- csr_filename = "server.csr"
- cnf_filename = "server.%i.cnf" % (cert_file_count,)
- cert_filename = "server.%i.crt" % (cert_file_count,)
- cert_file_count += 1
- # first build a CSR
- subprocess.check_call(
- [
- "openssl",
- "req",
- "-new",
- "-key",
- get_test_key_file(),
- "-subj",
- "/",
- "-out",
- csr_filename,
- ]
- )
- # now a config file describing the right SAN entries
- sanentries = b",".join(sanlist)
- with open(cnf_filename, "wb") as f:
- f.write(CONFIG_TEMPLATE % {b"sanentries": sanentries})
- # finally the cert
- ca_key_filename = os.path.join(os.path.dirname(__file__), "ca.key")
- ca_cert_filename = get_test_ca_cert_file()
- subprocess.check_call(
- [
- "openssl",
- "x509",
- "-req",
- "-in",
- csr_filename,
- "-CA",
- ca_cert_filename,
- "-CAkey",
- ca_key_filename,
- "-set_serial",
- "1",
- "-extfile",
- cnf_filename,
- "-out",
- cert_filename,
- ]
- )
- return cert_filename
- @implementer(IOpenSSLServerConnectionCreator)
- class TestServerTLSConnectionFactory(object):
- """An SSL connection creator which returns connections which present a certificate
- signed by our test CA."""
- def __init__(self, sanlist):
- """
- Args:
- sanlist: list[bytes]: a list of subjectAltName values for the cert
- """
- self._cert_file = create_test_cert_file(sanlist)
- def serverConnectionForTLS(self, tlsProtocol):
- ctx = SSL.Context(SSL.TLSv1_METHOD)
- ctx.use_certificate_file(self._cert_file)
- ctx.use_privatekey_file(get_test_key_file())
- return Connection(ctx, None)
|