123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- #***************************************************************************
- # _ _ ____ _
- # Project ___| | | | _ \| |
- # / __| | | | |_) | |
- # | (__| |_| | _ <| |___
- # \___|\___/|_| \_\_____|
- #
- # Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
- #
- # This software is licensed as described in the file COPYING, which
- # you should have received as part of this distribution. The terms
- # are also available at https://curl.se/docs/copyright.html.
- #
- # You may opt to use, copy, modify, merge, publish, distribute and/or sell
- # copies of the Software, and permit persons to whom the Software is
- # furnished to do so, under the terms of the COPYING file.
- #
- # This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
- # KIND, either express or implied.
- #
- # SPDX-License-Identifier: curl
- #
- ###########################################################################
- #
- import logging
- import os
- import re
- import shutil
- import socket
- import subprocess
- import tempfile
- from configparser import ConfigParser, ExtendedInterpolation
- from datetime import timedelta
- from typing import Optional
- from .certs import CertificateSpec, Credentials, TestCA
- from .ports import alloc_ports
- log = logging.getLogger(__name__)
- def init_config_from(conf_path):
- if os.path.isfile(conf_path):
- config = ConfigParser(interpolation=ExtendedInterpolation())
- config.read(conf_path)
- return config
- return None
- TESTS_HTTPD_PATH = os.path.dirname(os.path.dirname(__file__))
- TOP_PATH = os.path.join(os.getcwd(), os.path.pardir)
- DEF_CONFIG = init_config_from(os.path.join(TOP_PATH, 'tests', 'http', 'config.ini'))
- CURL = os.path.join(TOP_PATH, 'src', 'curl')
- class EnvConfig:
- def __init__(self):
- self.tests_dir = TESTS_HTTPD_PATH
- self.gen_dir = os.path.join(self.tests_dir, 'gen')
- self.project_dir = os.path.dirname(os.path.dirname(self.tests_dir))
- self.build_dir = TOP_PATH
- self.config = DEF_CONFIG
- # check cur and its features
- self.curl = CURL
- if 'CURL' in os.environ:
- self.curl = os.environ['CURL']
- self.curl_props = {
- 'version_string': '',
- 'version': '',
- 'os': '',
- 'fullname': '',
- 'features_string': '',
- 'features': set(),
- 'protocols_string': '',
- 'protocols': set(),
- 'libs': set(),
- 'lib_versions': set(),
- }
- self.curl_is_debug = False
- self.curl_protos = []
- p = subprocess.run(args=[self.curl, '-V'],
- capture_output=True, text=True)
- if p.returncode != 0:
- raise RuntimeError(f'{self.curl} -V failed with exit code: {p.returncode}')
- if p.stderr.startswith('WARNING:'):
- self.curl_is_debug = True
- for line in p.stdout.splitlines(keepends=False):
- if line.startswith('curl '):
- self.curl_props['version_string'] = line
- m = re.match(r'^curl (?P<version>\S+) (?P<os>\S+) (?P<libs>.*)$', line)
- if m:
- self.curl_props['fullname'] = m.group(0)
- self.curl_props['version'] = m.group('version')
- self.curl_props['os'] = m.group('os')
- self.curl_props['lib_versions'] = {
- lib.lower() for lib in m.group('libs').split(' ')
- }
- self.curl_props['libs'] = {
- re.sub(r'/[a-z0-9.-]*', '', lib) for lib in self.curl_props['lib_versions']
- }
- if line.startswith('Features: '):
- self.curl_props['features_string'] = line[10:]
- self.curl_props['features'] = {
- feat.lower() for feat in line[10:].split(' ')
- }
- if line.startswith('Protocols: '):
- self.curl_props['protocols_string'] = line[11:]
- self.curl_props['protocols'] = {
- prot.lower() for prot in line[11:].split(' ')
- }
- self.ports = alloc_ports(port_specs={
- 'ftp': socket.SOCK_STREAM,
- 'ftps': socket.SOCK_STREAM,
- 'http': socket.SOCK_STREAM,
- 'https': socket.SOCK_STREAM,
- 'nghttpx_https': socket.SOCK_STREAM,
- 'proxy': socket.SOCK_STREAM,
- 'proxys': socket.SOCK_STREAM,
- 'h2proxys': socket.SOCK_STREAM,
- 'caddy': socket.SOCK_STREAM,
- 'caddys': socket.SOCK_STREAM,
- 'ws': socket.SOCK_STREAM,
- })
- self.httpd = self.config['httpd']['httpd']
- self.apachectl = self.config['httpd']['apachectl']
- self.apxs = self.config['httpd']['apxs']
- if len(self.apxs) == 0:
- self.apxs = None
- self._httpd_version = None
- self.examples_pem = {
- 'key': 'xxx',
- 'cert': 'xxx',
- }
- self.htdocs_dir = os.path.join(self.gen_dir, 'htdocs')
- self.tld = 'http.curl.se'
- self.domain1 = f"one.{self.tld}"
- self.domain1brotli = f"brotli.one.{self.tld}"
- self.domain2 = f"two.{self.tld}"
- self.ftp_domain = f"ftp.{self.tld}"
- self.proxy_domain = f"proxy.{self.tld}"
- self.expired_domain = f"expired.{self.tld}"
- self.cert_specs = [
- CertificateSpec(domains=[self.domain1, self.domain1brotli, 'localhost', '127.0.0.1'], key_type='rsa2048'),
- CertificateSpec(domains=[self.domain2], key_type='rsa2048'),
- CertificateSpec(domains=[self.ftp_domain], key_type='rsa2048'),
- CertificateSpec(domains=[self.proxy_domain, '127.0.0.1'], key_type='rsa2048'),
- CertificateSpec(domains=[self.expired_domain], key_type='rsa2048',
- valid_from=timedelta(days=-100), valid_to=timedelta(days=-10)),
- CertificateSpec(name="clientsX", sub_specs=[
- CertificateSpec(name="user1", client=True),
- ]),
- ]
- self.nghttpx = self.config['nghttpx']['nghttpx']
- if len(self.nghttpx.strip()) == 0:
- self.nghttpx = None
- self._nghttpx_version = None
- self.nghttpx_with_h3 = False
- if self.nghttpx is not None:
- p = subprocess.run(args=[self.nghttpx, '-v'],
- capture_output=True, text=True)
- if p.returncode != 0:
- # not a working nghttpx
- self.nghttpx = None
- else:
- self._nghttpx_version = re.sub(r'^nghttpx\s*', '', p.stdout.strip())
- self.nghttpx_with_h3 = re.match(r'.* nghttp3/.*', p.stdout.strip()) is not None
- log.debug(f'nghttpx -v: {p.stdout}')
- self.caddy = self.config['caddy']['caddy']
- self._caddy_version = None
- if len(self.caddy.strip()) == 0:
- self.caddy = None
- if self.caddy is not None:
- try:
- p = subprocess.run(args=[self.caddy, 'version'],
- capture_output=True, text=True)
- if p.returncode != 0:
- # not a working caddy
- self.caddy = None
- m = re.match(r'v?(\d+\.\d+\.\d+).*', p.stdout)
- if m:
- self._caddy_version = m.group(1)
- else:
- raise RuntimeError(f'Unable to determine cadd version from: {p.stdout}')
- # TODO: specify specific exceptions here
- except: # noqa: E722
- self.caddy = None
- self.vsftpd = self.config['vsftpd']['vsftpd']
- self._vsftpd_version = None
- if self.vsftpd is not None:
- try:
- with tempfile.TemporaryFile('w+') as tmp:
- p = subprocess.run(args=[self.vsftpd, '-v'],
- capture_output=True, text=True, stdin=tmp)
- if p.returncode != 0:
- # not a working vsftpd
- self.vsftpd = None
- if p.stderr:
- ver_text = p.stderr
- else:
- # Oddly, some versions of vsftpd write to stdin (!)
- # instead of stderr, which is odd but works. If there
- # is nothing on stderr, read the file on stdin and use
- # any data there instead.
- tmp.seek(0)
- ver_text = tmp.read()
- m = re.match(r'vsftpd: version (\d+\.\d+\.\d+)', ver_text)
- if m:
- self._vsftpd_version = m.group(1)
- elif len(p.stderr) == 0:
- # vsftp does not use stdout or stderr for printing its version... -.-
- self._vsftpd_version = 'unknown'
- else:
- raise Exception(f'Unable to determine VsFTPD version from: {p.stderr}')
- except Exception:
- self.vsftpd = None
- self._tcpdump = shutil.which('tcpdump')
- @property
- def httpd_version(self):
- if self._httpd_version is None and self.apxs is not None:
- try:
- p = subprocess.run(args=[self.apxs, '-q', 'HTTPD_VERSION'],
- capture_output=True, text=True)
- if p.returncode != 0:
- log.error(f'{self.apxs} failed to query HTTPD_VERSION: {p}')
- else:
- self._httpd_version = p.stdout.strip()
- except Exception:
- log.exception(f'{self.apxs} failed to run')
- return self._httpd_version
- def versiontuple(self, v):
- v = re.sub(r'(\d+\.\d+(\.\d+)?)(-\S+)?', r'\1', v)
- return tuple(map(int, v.split('.')))
- def httpd_is_at_least(self, minv):
- if self.httpd_version is None:
- return False
- hv = self.versiontuple(self.httpd_version)
- return hv >= self.versiontuple(minv)
- def caddy_is_at_least(self, minv):
- if self.caddy_version is None:
- return False
- hv = self.versiontuple(self.caddy_version)
- return hv >= self.versiontuple(minv)
- def is_complete(self) -> bool:
- return os.path.isfile(self.httpd) and \
- os.path.isfile(self.apachectl) and \
- self.apxs is not None and \
- os.path.isfile(self.apxs)
- def get_incomplete_reason(self) -> Optional[str]:
- if self.httpd is None or len(self.httpd.strip()) == 0:
- return 'httpd not configured, see `--with-test-httpd=<path>`'
- if not os.path.isfile(self.httpd):
- return f'httpd ({self.httpd}) not found'
- if not os.path.isfile(self.apachectl):
- return f'apachectl ({self.apachectl}) not found'
- if self.apxs is None:
- return "command apxs not found (commonly provided in apache2-dev)"
- if not os.path.isfile(self.apxs):
- return f"apxs ({self.apxs}) not found"
- return None
- @property
- def nghttpx_version(self):
- return self._nghttpx_version
- @property
- def caddy_version(self):
- return self._caddy_version
- @property
- def vsftpd_version(self):
- return self._vsftpd_version
- @property
- def tcpdmp(self) -> Optional[str]:
- return self._tcpdump
- class Env:
- CONFIG = EnvConfig()
- @staticmethod
- def setup_incomplete() -> bool:
- return not Env.CONFIG.is_complete()
- @staticmethod
- def incomplete_reason() -> Optional[str]:
- return Env.CONFIG.get_incomplete_reason()
- @staticmethod
- def have_nghttpx() -> bool:
- return Env.CONFIG.nghttpx is not None
- @staticmethod
- def have_h3_server() -> bool:
- return Env.CONFIG.nghttpx_with_h3
- @staticmethod
- def have_ssl_curl() -> bool:
- return Env.curl_has_feature('ssl') or Env.curl_has_feature('multissl')
- @staticmethod
- def have_h2_curl() -> bool:
- return 'http2' in Env.CONFIG.curl_props['features']
- @staticmethod
- def have_h3_curl() -> bool:
- return 'http3' in Env.CONFIG.curl_props['features']
- @staticmethod
- def curl_uses_lib(libname: str) -> bool:
- return libname.lower() in Env.CONFIG.curl_props['libs']
- @staticmethod
- def curl_uses_ossl_quic() -> bool:
- if Env.have_h3_curl():
- return not Env.curl_uses_lib('ngtcp2') and Env.curl_uses_lib('nghttp3')
- return False
- @staticmethod
- def curl_version_string() -> str:
- return Env.CONFIG.curl_props['version_string']
- @staticmethod
- def curl_features_string() -> str:
- return Env.CONFIG.curl_props['features_string']
- @staticmethod
- def curl_has_feature(feature: str) -> bool:
- return feature.lower() in Env.CONFIG.curl_props['features']
- @staticmethod
- def curl_protocols_string() -> str:
- return Env.CONFIG.curl_props['protocols_string']
- @staticmethod
- def curl_has_protocol(protocol: str) -> bool:
- return protocol.lower() in Env.CONFIG.curl_props['protocols']
- @staticmethod
- def curl_lib_version(libname: str) -> str:
- prefix = f'{libname.lower()}/'
- for lversion in Env.CONFIG.curl_props['lib_versions']:
- if lversion.startswith(prefix):
- return lversion[len(prefix):]
- return 'unknown'
- @staticmethod
- def curl_lib_version_at_least(libname: str, min_version) -> bool:
- lversion = Env.curl_lib_version(libname)
- if lversion != 'unknown':
- return Env.CONFIG.versiontuple(min_version) <= \
- Env.CONFIG.versiontuple(lversion)
- return False
- @staticmethod
- def curl_os() -> str:
- return Env.CONFIG.curl_props['os']
- @staticmethod
- def curl_fullname() -> str:
- return Env.CONFIG.curl_props['fullname']
- @staticmethod
- def curl_version() -> str:
- return Env.CONFIG.curl_props['version']
- @staticmethod
- def curl_is_debug() -> bool:
- return Env.CONFIG.curl_is_debug
- @staticmethod
- def have_h3() -> bool:
- return Env.have_h3_curl() and Env.have_h3_server()
- @staticmethod
- def httpd_version() -> str:
- return Env.CONFIG.httpd_version
- @staticmethod
- def nghttpx_version() -> str:
- return Env.CONFIG.nghttpx_version
- @staticmethod
- def caddy_version() -> str:
- return Env.CONFIG.caddy_version
- @staticmethod
- def caddy_is_at_least(minv) -> bool:
- return Env.CONFIG.caddy_is_at_least(minv)
- @staticmethod
- def httpd_is_at_least(minv) -> bool:
- return Env.CONFIG.httpd_is_at_least(minv)
- @staticmethod
- def has_caddy() -> bool:
- return Env.CONFIG.caddy is not None
- @staticmethod
- def has_vsftpd() -> bool:
- return Env.CONFIG.vsftpd is not None
- @staticmethod
- def vsftpd_version() -> str:
- return Env.CONFIG.vsftpd_version
- @staticmethod
- def tcpdump() -> Optional[str]:
- return Env.CONFIG.tcpdmp
- def __init__(self, pytestconfig=None):
- self._verbose = pytestconfig.option.verbose \
- if pytestconfig is not None else 0
- self._ca = None
- self._test_timeout = 300.0 if self._verbose > 1 else 60.0 # seconds
- def issue_certs(self):
- if self._ca is None:
- ca_dir = os.path.join(self.CONFIG.gen_dir, 'ca')
- self._ca = TestCA.create_root(name=self.CONFIG.tld,
- store_dir=ca_dir,
- key_type="rsa2048")
- self._ca.issue_certs(self.CONFIG.cert_specs)
- def setup(self):
- os.makedirs(self.gen_dir, exist_ok=True)
- os.makedirs(self.htdocs_dir, exist_ok=True)
- self.issue_certs()
- def get_credentials(self, domain) -> Optional[Credentials]:
- creds = self.ca.get_credentials_for_name(domain)
- if len(creds) > 0:
- return creds[0]
- return None
- @property
- def verbose(self) -> int:
- return self._verbose
- @property
- def test_timeout(self) -> Optional[float]:
- return self._test_timeout
- @test_timeout.setter
- def test_timeout(self, val: Optional[float]):
- self._test_timeout = val
- @property
- def gen_dir(self) -> str:
- return self.CONFIG.gen_dir
- @property
- def project_dir(self) -> str:
- return self.CONFIG.project_dir
- @property
- def build_dir(self) -> str:
- return self.CONFIG.build_dir
- @property
- def ca(self):
- return self._ca
- @property
- def htdocs_dir(self) -> str:
- return self.CONFIG.htdocs_dir
- @property
- def tld(self) -> str:
- return self.CONFIG.tld
- @property
- def domain1(self) -> str:
- return self.CONFIG.domain1
- @property
- def domain1brotli(self) -> str:
- return self.CONFIG.domain1brotli
- @property
- def domain2(self) -> str:
- return self.CONFIG.domain2
- @property
- def ftp_domain(self) -> str:
- return self.CONFIG.ftp_domain
- @property
- def proxy_domain(self) -> str:
- return self.CONFIG.proxy_domain
- @property
- def expired_domain(self) -> str:
- return self.CONFIG.expired_domain
- @property
- def http_port(self) -> int:
- return self.CONFIG.ports['http']
- @property
- def https_port(self) -> int:
- return self.CONFIG.ports['https']
- @property
- def nghttpx_https_port(self) -> int:
- return self.CONFIG.ports['nghttpx_https']
- @property
- def h3_port(self) -> int:
- return self.https_port
- @property
- def proxy_port(self) -> int:
- return self.CONFIG.ports['proxy']
- @property
- def proxys_port(self) -> int:
- return self.CONFIG.ports['proxys']
- @property
- def ftp_port(self) -> int:
- return self.CONFIG.ports['ftp']
- @property
- def ftps_port(self) -> int:
- return self.CONFIG.ports['ftps']
- @property
- def h2proxys_port(self) -> int:
- return self.CONFIG.ports['h2proxys']
- def pts_port(self, proto: str = 'http/1.1') -> int:
- # proxy tunnel port
- return self.CONFIG.ports['h2proxys' if proto == 'h2' else 'proxys']
- @property
- def caddy(self) -> str:
- return self.CONFIG.caddy
- @property
- def caddy_https_port(self) -> int:
- return self.CONFIG.ports['caddys']
- @property
- def caddy_http_port(self) -> int:
- return self.CONFIG.ports['caddy']
- @property
- def vsftpd(self) -> str:
- return self.CONFIG.vsftpd
- @property
- def ws_port(self) -> int:
- return self.CONFIG.ports['ws']
- @property
- def curl(self) -> str:
- return self.CONFIG.curl
- @property
- def httpd(self) -> str:
- return self.CONFIG.httpd
- @property
- def apachectl(self) -> str:
- return self.CONFIG.apachectl
- @property
- def apxs(self) -> str:
- return self.CONFIG.apxs
- @property
- def nghttpx(self) -> Optional[str]:
- return self.CONFIG.nghttpx
- @property
- def slow_network(self) -> bool:
- return "CURL_DBG_SOCK_WBLOCK" in os.environ or \
- "CURL_DBG_SOCK_WPARTIAL" in os.environ
- @property
- def ci_run(self) -> bool:
- return "CURL_CI" in os.environ
- def port_for(self, alpn_proto: Optional[str] = None):
- if alpn_proto is None or \
- alpn_proto in ['h2', 'http/1.1', 'http/1.0', 'http/0.9']:
- return self.https_port
- if alpn_proto in ['h3']:
- return self.h3_port
- return self.http_port
- def authority_for(self, domain: str, alpn_proto: Optional[str] = None):
- return f'{domain}:{self.port_for(alpn_proto=alpn_proto)}'
- def make_data_file(self, indir: str, fname: str, fsize: int,
- line_length: int = 1024) -> str:
- if line_length < 11:
- raise RuntimeError('line_length less than 11 not supported')
- fpath = os.path.join(indir, fname)
- s10 = "0123456789"
- s = round((line_length / 10) + 1) * s10
- s = s[0:line_length-11]
- with open(fpath, 'w') as fd:
- for i in range(int(fsize / line_length)):
- fd.write(f"{i:09d}-{s}\n")
- remain = int(fsize % line_length)
- if remain != 0:
- i = int(fsize / line_length) + 1
- fd.write(f"{i:09d}-{s}"[0:remain-1] + "\n")
- return fpath
|