federation_tls_options.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. # Copyright 2019 New Vector Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from OpenSSL import SSL
  15. from twisted.internet import ssl
  16. from twisted.internet.abstract import isIPAddress, isIPv6Address
  17. from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
  18. from zope.interface import implementer
  19. def _tolerateErrors(wrapped):
  20. """
  21. Wrap up an info_callback for pyOpenSSL so that if something goes wrong
  22. the error is immediately logged and the connection is dropped if possible.
  23. This is a copy of twisted.internet._sslverify._tolerateErrors. For
  24. documentation, see the twisted documentation.
  25. """
  26. def infoCallback(connection, where, ret):
  27. try:
  28. return wrapped(connection, where, ret)
  29. except: # noqa: E722, taken from the twisted implementation
  30. f = Failure()
  31. logger.exception("Error during info_callback")
  32. connection.get_app_data().failVerification(f)
  33. return infoCallback
  34. def _idnaBytes(text):
  35. """
  36. Convert some text typed by a human into some ASCII bytes. This is a
  37. copy of twisted.internet._idna._idnaBytes. For documentation, see the
  38. twisted documentation.
  39. """
  40. try:
  41. import idna
  42. except ImportError:
  43. return text.encode("idna")
  44. else:
  45. return idna.encode(text)
  46. @implementer(IOpenSSLClientConnectionCreator)
  47. class ClientTLSOptions:
  48. """
  49. Client creator for TLS without certificate identity verification. This is a
  50. copy of twisted.internet._sslverify.ClientTLSOptions with the identity
  51. verification left out. For documentation, see the twisted documentation.
  52. """
  53. def __init__(self, hostname, ctx):
  54. self._ctx = ctx
  55. if isIPAddress(hostname) or isIPv6Address(hostname):
  56. self._hostnameBytes = hostname.encode("ascii")
  57. self._sendSNI = False
  58. else:
  59. self._hostnameBytes = _idnaBytes(hostname)
  60. self._sendSNI = True
  61. ctx.set_info_callback(_tolerateErrors(self._identityVerifyingInfoCallback))
  62. def clientConnectionForTLS(self, tlsProtocol):
  63. context = self._ctx
  64. connection = SSL.Connection(context, None)
  65. connection.set_app_data(tlsProtocol)
  66. return connection
  67. def _identityVerifyingInfoCallback(self, connection, where, ret):
  68. # Literal IPv4 and IPv6 addresses are not permitted
  69. # as host names according to the RFCs
  70. if where & SSL.SSL_CB_HANDSHAKE_START and self._sendSNI:
  71. connection.set_tlsext_host_name(self._hostnameBytes)
  72. class ClientTLSOptionsFactory:
  73. """Factory for Twisted ClientTLSOptions that are used to make connections
  74. to remote servers for federation."""
  75. def __init__(self, verify_requests):
  76. if verify_requests:
  77. self._options = ssl.CertificateOptions(trustRoot=ssl.platformTrust())
  78. else:
  79. self._options = ssl.CertificateOptions()
  80. def get_options(self, host):
  81. # Use _makeContext so that we get a fresh OpenSSL CTX each time.
  82. return ClientTLSOptions(host, self._options._makeContext())