123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- #***************************************************************************
- # _ _ ____ _
- # Project ___| | | | _ \| |
- # / __| | | | |_) | |
- # | (__| |_| | _ <| |___
- # \___|\___/|_| \_\_____|
- #
- # Copyright (C) 2008 - 2022, Daniel Stenberg, <daniel@haxx.se>, et al.
- #
- # This software is licensed as described in the file COPYING, which
- # you should have received as part of this distribution. The terms
- # are also available at https://curl.se/docs/copyright.html.
- #
- # You may opt to use, copy, modify, merge, publish, distribute and/or sell
- # copies of the Software, and permit persons to whom the Software is
- # furnished to do so, under the terms of the COPYING file.
- #
- # This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
- # KIND, either express or implied.
- #
- # SPDX-License-Identifier: curl
- #
- ###########################################################################
- #
- import logging
- import os
- import re
- import subprocess
- from configparser import ConfigParser, ExtendedInterpolation
- from typing import Optional
- from .certs import CertificateSpec, TestCA, Credentials
- log = logging.getLogger(__name__)
- def init_config_from(conf_path):
- if os.path.isfile(conf_path):
- config = ConfigParser(interpolation=ExtendedInterpolation())
- config.read(conf_path)
- return config
- return None
- TESTS_HTTPD_PATH = os.path.dirname(os.path.dirname(__file__))
- DEF_CONFIG = init_config_from(os.path.join(TESTS_HTTPD_PATH, 'config.ini'))
- TOP_PATH = os.path.dirname(os.path.dirname(TESTS_HTTPD_PATH))
- CURL = os.path.join(TOP_PATH, 'src/curl')
- class EnvConfig:
- def __init__(self):
- self.tests_dir = TESTS_HTTPD_PATH
- self.gen_dir = os.path.join(self.tests_dir, 'gen')
- self.config = DEF_CONFIG
- # check cur and its features
- self.curl = CURL
- self.curl_features = []
- self.curl_protos = []
- p = subprocess.run(args=[self.curl, '-V'],
- capture_output=True, text=True)
- if p.returncode != 0:
- assert False, f'{self.curl} -V failed with exit code: {p.returncode}'
- for l in p.stdout.splitlines(keepends=False):
- if l.startswith('Features: '):
- self.curl_features = [feat.lower() for feat in l[10:].split(' ')]
- if l.startswith('Protocols: '):
- self.curl_protos = [prot.lower() for prot in l[11:].split(' ')]
- self.nghttpx_with_h3 = re.match(r'.* nghttp3/.*', p.stdout.strip())
- log.error(f'nghttpx -v: {p.stdout}')
- self.http_port = self.config['test']['http_port']
- self.https_port = self.config['test']['https_port']
- self.h3_port = self.config['test']['h3_port']
- self.httpd = self.config['httpd']['httpd']
- self.apachectl = self.config['httpd']['apachectl']
- self.apxs = self.config['httpd']['apxs']
- if len(self.apxs) == 0:
- self.apxs = None
- self.examples_pem = {
- 'key': 'xxx',
- 'cert': 'xxx',
- }
- self.htdocs_dir = os.path.join(self.gen_dir, 'htdocs')
- self.tld = 'tests-httpd.curl.se'
- self.domain1 = f"one.{self.tld}"
- self.domain2 = f"two.{self.tld}"
- self.cert_specs = [
- CertificateSpec(domains=[self.domain1], key_type='rsa2048'),
- CertificateSpec(domains=[self.domain2], key_type='rsa2048'),
- CertificateSpec(name="clientsX", sub_specs=[
- CertificateSpec(name="user1", client=True),
- ]),
- ]
- self.nghttpx = self.config['nghttpx']['nghttpx']
- self.nghttpx_with_h3 = False
- if len(self.nghttpx) == 0:
- self.nghttpx = 'nghttpx'
- if self.nghttpx is not None:
- p = subprocess.run(args=[self.nghttpx, '-v'],
- capture_output=True, text=True)
- if p.returncode != 0:
- # not a working nghttpx
- self.nghttpx = None
- else:
- self.nghttpx_with_h3 = re.match(r'.* nghttp3/.*', p.stdout.strip()) is not None
- log.error(f'nghttpx -v: {p.stdout}')
- def is_complete(self) -> bool:
- return os.path.isfile(self.httpd) and \
- os.path.isfile(self.apachectl) and \
- self.apxs is not None and \
- os.path.isfile(self.apxs)
- def get_incomplete_reason(self) -> Optional[str]:
- if not os.path.isfile(self.httpd):
- return f'httpd ({self.httpd}) not found'
- if not os.path.isfile(self.apachectl):
- return f'apachectl ({self.apachectl}) not found'
- if self.apxs is None:
- return f"apxs (provided by apache2-dev) not found"
- if not os.path.isfile(self.apxs):
- return f"apxs ({self.apxs}) not found"
- return None
- class Env:
- CONFIG = EnvConfig()
- @staticmethod
- def setup_incomplete() -> bool:
- return not Env.CONFIG.is_complete()
- @staticmethod
- def incomplete_reason() -> Optional[str]:
- return Env.CONFIG.get_incomplete_reason()
- @staticmethod
- def have_h3_server() -> bool:
- return Env.CONFIG.nghttpx_with_h3
- @staticmethod
- def have_h3_curl() -> bool:
- return 'http3' in Env.CONFIG.curl_features
- @staticmethod
- def have_h3() -> bool:
- return Env.have_h3_curl() and Env.have_h3_server()
- def __init__(self, pytestconfig=None):
- self._verbose = pytestconfig.option.verbose \
- if pytestconfig is not None else 0
- self._ca = None
- def issue_certs(self):
- if self._ca is None:
- ca_dir = os.path.join(self.CONFIG.gen_dir, 'ca')
- self._ca = TestCA.create_root(name=self.CONFIG.tld,
- store_dir=ca_dir,
- key_type="rsa2048")
- self._ca.issue_certs(self.CONFIG.cert_specs)
- def setup(self):
- os.makedirs(self.gen_dir, exist_ok=True)
- os.makedirs(self.htdocs_dir, exist_ok=True)
- self.issue_certs()
- def get_credentials(self, domain) -> Optional[Credentials]:
- creds = self.ca.get_credentials_for_name(domain)
- if len(creds) > 0:
- return creds[0]
- return None
- @property
- def verbose(self) -> int:
- return self._verbose
- @property
- def gen_dir(self) -> str:
- return self.CONFIG.gen_dir
- @property
- def ca(self):
- return self._ca
- @property
- def htdocs_dir(self) -> str:
- return self.CONFIG.htdocs_dir
- @property
- def domain1(self) -> str:
- return self.CONFIG.domain1
- @property
- def domain2(self) -> str:
- return self.CONFIG.domain2
- @property
- def http_port(self) -> str:
- return self.CONFIG.http_port
- @property
- def https_port(self) -> str:
- return self.CONFIG.https_port
- @property
- def h3_port(self) -> str:
- return self.CONFIG.h3_port
- @property
- def curl(self) -> str:
- return self.CONFIG.curl
- @property
- def httpd(self) -> str:
- return self.CONFIG.httpd
- @property
- def apachectl(self) -> str:
- return self.CONFIG.apachectl
- @property
- def apxs(self) -> str:
- return self.CONFIG.apxs
- @property
- def nghttpx(self) -> Optional[str]:
- return self.CONFIG.nghttpx
- def authority_for(self, domain: str, alpn_proto: Optional[str] = None):
- if alpn_proto is None or \
- alpn_proto in ['h2', 'http/1.1', 'http/1.0', 'http/0.9']:
- return f'{domain}:{self.https_port}'
- if alpn_proto in ['h3']:
- return f'{domain}:{self.h3_port}'
- return f'{domain}:{self.http_port}'
|