|
@@ -13,60 +13,110 @@
|
|
|
# See the License for the specific language governing permissions and
|
|
|
# limitations under the License.
|
|
|
|
|
|
+import logging
|
|
|
import os
|
|
|
+from datetime import datetime
|
|
|
from hashlib import sha256
|
|
|
|
|
|
from unpaddedbase64 import encode_base64
|
|
|
|
|
|
from OpenSSL import crypto
|
|
|
|
|
|
-from ._base import Config
|
|
|
+from synapse.config._base import Config
|
|
|
+
|
|
|
+logger = logging.getLogger()
|
|
|
|
|
|
|
|
|
class TlsConfig(Config):
|
|
|
def read_config(self, config):
|
|
|
- self.tls_certificate = self.read_tls_certificate(
|
|
|
- config.get("tls_certificate_path")
|
|
|
- )
|
|
|
- self.tls_certificate_file = config.get("tls_certificate_path")
|
|
|
|
|
|
+ acme_config = config.get("acme", {})
|
|
|
+ self.acme_enabled = acme_config.get("enabled", False)
|
|
|
+ self.acme_url = acme_config.get(
|
|
|
+ "url", "https://acme-v01.api.letsencrypt.org/directory"
|
|
|
+ )
|
|
|
+ self.acme_port = acme_config.get("port", 8449)
|
|
|
+ self.acme_bind_addresses = acme_config.get("bind_addresses", ["127.0.0.1"])
|
|
|
+ self.acme_reprovision_threshold = acme_config.get("reprovision_threshold", 30)
|
|
|
+
|
|
|
+ self.tls_certificate_file = os.path.abspath(config.get("tls_certificate_path"))
|
|
|
+ self.tls_private_key_file = os.path.abspath(config.get("tls_private_key_path"))
|
|
|
+ self._original_tls_fingerprints = config["tls_fingerprints"]
|
|
|
+ self.tls_fingerprints = list(self._original_tls_fingerprints)
|
|
|
self.no_tls = config.get("no_tls", False)
|
|
|
|
|
|
- if self.no_tls:
|
|
|
- self.tls_private_key = None
|
|
|
- else:
|
|
|
- self.tls_private_key = self.read_tls_private_key(
|
|
|
- config.get("tls_private_key_path")
|
|
|
- )
|
|
|
+ # This config option applies to non-federation HTTP clients
|
|
|
+ # (e.g. for talking to recaptcha, identity servers, and such)
|
|
|
+ # It should never be used in production, and is intended for
|
|
|
+ # use only when running tests.
|
|
|
+ self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
|
|
|
+ "use_insecure_ssl_client_just_for_testing_do_not_use"
|
|
|
+ )
|
|
|
+
|
|
|
+ self.tls_certificate = None
|
|
|
+ self.tls_private_key = None
|
|
|
+
|
|
|
+ def is_disk_cert_valid(self):
|
|
|
+ """
|
|
|
+ Is the certificate we have on disk valid, and if so, for how long?
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ int: Days remaining of certificate validity.
|
|
|
+ None: No certificate exists.
|
|
|
+ """
|
|
|
+ if not os.path.exists(self.tls_certificate_file):
|
|
|
+ return None
|
|
|
+
|
|
|
+ try:
|
|
|
+ with open(self.tls_certificate_file, 'rb') as f:
|
|
|
+ cert_pem = f.read()
|
|
|
+ except Exception:
|
|
|
+ logger.exception("Failed to read existing certificate off disk!")
|
|
|
+ raise
|
|
|
+
|
|
|
+ try:
|
|
|
+ tls_certificate = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem)
|
|
|
+ except Exception:
|
|
|
+ logger.exception("Failed to parse existing certificate off disk!")
|
|
|
+ raise
|
|
|
+
|
|
|
+ # YYYYMMDDhhmmssZ -- in UTC
|
|
|
+ expires_on = datetime.strptime(
|
|
|
+ tls_certificate.get_notAfter().decode('ascii'), "%Y%m%d%H%M%SZ"
|
|
|
+ )
|
|
|
+ now = datetime.utcnow()
|
|
|
+ days_remaining = (expires_on - now).days
|
|
|
+ return days_remaining
|
|
|
|
|
|
- self.tls_fingerprints = config["tls_fingerprints"]
|
|
|
+ def read_certificate_from_disk(self):
|
|
|
+ """
|
|
|
+ Read the certificates from disk.
|
|
|
+ """
|
|
|
+ self.tls_certificate = self.read_tls_certificate(self.tls_certificate_file)
|
|
|
+
|
|
|
+ if not self.no_tls:
|
|
|
+ self.tls_private_key = self.read_tls_private_key(self.tls_private_key_file)
|
|
|
+
|
|
|
+ self.tls_fingerprints = list(self._original_tls_fingerprints)
|
|
|
|
|
|
# Check that our own certificate is included in the list of fingerprints
|
|
|
# and include it if it is not.
|
|
|
x509_certificate_bytes = crypto.dump_certificate(
|
|
|
- crypto.FILETYPE_ASN1,
|
|
|
- self.tls_certificate
|
|
|
+ crypto.FILETYPE_ASN1, self.tls_certificate
|
|
|
)
|
|
|
sha256_fingerprint = encode_base64(sha256(x509_certificate_bytes).digest())
|
|
|
sha256_fingerprints = set(f["sha256"] for f in self.tls_fingerprints)
|
|
|
if sha256_fingerprint not in sha256_fingerprints:
|
|
|
self.tls_fingerprints.append({u"sha256": sha256_fingerprint})
|
|
|
|
|
|
- # This config option applies to non-federation HTTP clients
|
|
|
- # (e.g. for talking to recaptcha, identity servers, and such)
|
|
|
- # It should never be used in production, and is intended for
|
|
|
- # use only when running tests.
|
|
|
- self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
|
|
|
- "use_insecure_ssl_client_just_for_testing_do_not_use"
|
|
|
- )
|
|
|
-
|
|
|
def default_config(self, config_dir_path, server_name, **kwargs):
|
|
|
base_key_name = os.path.join(config_dir_path, server_name)
|
|
|
|
|
|
tls_certificate_path = base_key_name + ".tls.crt"
|
|
|
tls_private_key_path = base_key_name + ".tls.key"
|
|
|
|
|
|
- return """\
|
|
|
+ return (
|
|
|
+ """\
|
|
|
# PEM encoded X509 certificate for TLS.
|
|
|
# You can replace the self-signed certificate that synapse
|
|
|
# autogenerates on launch with your own SSL certificate + key pair
|
|
@@ -107,7 +157,24 @@ class TlsConfig(Config):
|
|
|
#
|
|
|
tls_fingerprints: []
|
|
|
# tls_fingerprints: [{"sha256": "<base64_encoded_sha256_fingerprint>"}]
|
|
|
- """ % locals()
|
|
|
+
|
|
|
+ ## Support for ACME certificate auto-provisioning.
|
|
|
+ # acme:
|
|
|
+ # enabled: false
|
|
|
+ ## ACME path.
|
|
|
+ ## If you only want to test, use the staging url:
|
|
|
+ ## https://acme-staging.api.letsencrypt.org/directory
|
|
|
+ # url: 'https://acme-v01.api.letsencrypt.org/directory'
|
|
|
+ ## Port number (to listen for the HTTP-01 challenge).
|
|
|
+ ## Using port 80 requires utilising something like authbind, or proxying to it.
|
|
|
+ # port: 8449
|
|
|
+ ## Hosts to bind to.
|
|
|
+ # bind_addresses: ['127.0.0.1']
|
|
|
+ ## How many days remaining on a certificate before it is renewed.
|
|
|
+ # reprovision_threshold: 30
|
|
|
+ """
|
|
|
+ % locals()
|
|
|
+ )
|
|
|
|
|
|
def read_tls_certificate(self, cert_path):
|
|
|
cert_pem = self.read_file(cert_path, "tls_certificate")
|