__init__.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. # Copyright 2019 New Vector Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import os.path
  15. import subprocess
  16. from typing import List
  17. from zope.interface import implementer
  18. from OpenSSL import SSL
  19. from OpenSSL.SSL import Connection
  20. from twisted.internet.address import IPv4Address
  21. from twisted.internet.interfaces import IOpenSSLServerConnectionCreator
  22. from twisted.internet.ssl import Certificate, trustRootFromCertificates
  23. from twisted.protocols.tls import TLSMemoryBIOProtocol
  24. from twisted.web.client import BrowserLikePolicyForHTTPS # noqa: F401
  25. from twisted.web.iweb import IPolicyForHTTPS # noqa: F401
  26. def get_test_https_policy() -> BrowserLikePolicyForHTTPS:
  27. """Get a test IPolicyForHTTPS which trusts the test CA cert
  28. Returns:
  29. IPolicyForHTTPS
  30. """
  31. ca_file = get_test_ca_cert_file()
  32. with open(ca_file) as stream:
  33. content = stream.read()
  34. cert = Certificate.loadPEM(content)
  35. trust_root = trustRootFromCertificates([cert])
  36. return BrowserLikePolicyForHTTPS(trustRoot=trust_root)
  37. def get_test_ca_cert_file() -> str:
  38. """Get the path to the test CA cert
  39. The keypair is generated with:
  40. openssl genrsa -out ca.key 2048
  41. openssl req -new -x509 -key ca.key -days 3650 -out ca.crt \
  42. -subj '/CN=synapse test CA'
  43. """
  44. return os.path.join(os.path.dirname(__file__), "ca.crt")
  45. def get_test_key_file() -> str:
  46. """get the path to the test key
  47. The key file is made with:
  48. openssl genrsa -out server.key 2048
  49. """
  50. return os.path.join(os.path.dirname(__file__), "server.key")
  51. cert_file_count = 0
  52. CONFIG_TEMPLATE = b"""\
  53. [default]
  54. basicConstraints = CA:FALSE
  55. keyUsage=nonRepudiation, digitalSignature, keyEncipherment
  56. subjectAltName = %(sanentries)s
  57. """
  58. def create_test_cert_file(sanlist: List[bytes]) -> str:
  59. """build an x509 certificate file
  60. Args:
  61. sanlist: a list of subjectAltName values for the cert
  62. Returns:
  63. The path to the file
  64. """
  65. global cert_file_count
  66. csr_filename = "server.csr"
  67. cnf_filename = "server.%i.cnf" % (cert_file_count,)
  68. cert_filename = "server.%i.crt" % (cert_file_count,)
  69. cert_file_count += 1
  70. # first build a CSR
  71. subprocess.check_call(
  72. [
  73. "openssl",
  74. "req",
  75. "-new",
  76. "-key",
  77. get_test_key_file(),
  78. "-subj",
  79. "/",
  80. "-out",
  81. csr_filename,
  82. ]
  83. )
  84. # now a config file describing the right SAN entries
  85. sanentries = b",".join(sanlist)
  86. with open(cnf_filename, "wb") as f:
  87. f.write(CONFIG_TEMPLATE % {b"sanentries": sanentries})
  88. # finally the cert
  89. ca_key_filename = os.path.join(os.path.dirname(__file__), "ca.key")
  90. ca_cert_filename = get_test_ca_cert_file()
  91. subprocess.check_call(
  92. [
  93. "openssl",
  94. "x509",
  95. "-req",
  96. "-in",
  97. csr_filename,
  98. "-CA",
  99. ca_cert_filename,
  100. "-CAkey",
  101. ca_key_filename,
  102. "-set_serial",
  103. "1",
  104. "-extfile",
  105. cnf_filename,
  106. "-out",
  107. cert_filename,
  108. ]
  109. )
  110. return cert_filename
  111. @implementer(IOpenSSLServerConnectionCreator)
  112. class TestServerTLSConnectionFactory:
  113. """An SSL connection creator which returns connections which present a certificate
  114. signed by our test CA."""
  115. def __init__(self, sanlist: List[bytes]):
  116. """
  117. Args:
  118. sanlist: a list of subjectAltName values for the cert
  119. """
  120. self._cert_file = create_test_cert_file(sanlist)
  121. def serverConnectionForTLS(self, tlsProtocol: TLSMemoryBIOProtocol) -> Connection:
  122. ctx = SSL.Context(SSL.SSLv23_METHOD)
  123. ctx.use_certificate_file(self._cert_file)
  124. ctx.use_privatekey_file(get_test_key_file())
  125. return Connection(ctx, None)
  126. # A dummy address, useful for tests that use FakeTransport and don't care about where
  127. # packets are going to/coming from.
  128. dummy_address = IPv4Address("TCP", "127.0.0.1", 80)