123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- #***************************************************************************
- # _ _ ____ _
- # Project ___| | | | _ \| |
- # / __| | | | |_) | |
- # | (__| |_| | _ <| |___
- # \___|\___/|_| \_\_____|
- #
- # Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
- #
- # This software is licensed as described in the file COPYING, which
- # you should have received as part of this distribution. The terms
- # are also available at https://curl.se/docs/copyright.html.
- #
- # You may opt to use, copy, modify, merge, publish, distribute and/or sell
- # copies of the Software, and permit persons to whom the Software is
- # furnished to do so, under the terms of the COPYING file.
- #
- # This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
- # KIND, either express or implied.
- #
- # SPDX-License-Identifier: curl
- #
- ###########################################################################
- #
- import json
- import logging
- import os
- import sys
- import time
- from threading import Thread
- import psutil
- import re
- import shutil
- import subprocess
- from statistics import mean, fmean
- from datetime import timedelta, datetime
- from typing import List, Optional, Dict, Union, Any
- from urllib.parse import urlparse
- from .env import Env
- log = logging.getLogger(__name__)
- class RunProfile:
- STAT_KEYS = ['cpu', 'rss', 'vsz']
- @classmethod
- def AverageStats(cls, profiles: List['RunProfile']):
- avg = {}
- stats = [p.stats for p in profiles]
- for key in cls.STAT_KEYS:
- vals = [s[key] for s in stats]
- avg[key] = mean(vals) if len(vals) else 0.0
- return avg
- def __init__(self, pid: int, started_at: datetime, run_dir):
- self._pid = pid
- self._started_at = started_at
- self._duration = timedelta(seconds=0)
- self._run_dir = run_dir
- self._samples = []
- self._psu = None
- self._stats = None
- @property
- def duration(self) -> timedelta:
- return self._duration
- @property
- def stats(self) -> Optional[Dict[str,Any]]:
- return self._stats
- def sample(self):
- elapsed = datetime.now() - self._started_at
- try:
- if self._psu is None:
- self._psu = psutil.Process(pid=self._pid)
- mem = self._psu.memory_info()
- self._samples.append({
- 'time': elapsed,
- 'cpu': self._psu.cpu_percent(),
- 'vsz': mem.vms,
- 'rss': mem.rss,
- })
- except psutil.NoSuchProcess:
- pass
- def finish(self):
- self._duration = datetime.now() - self._started_at
- if len(self._samples) > 0:
- weights = [s['time'].total_seconds() for s in self._samples]
- self._stats = {}
- for key in self.STAT_KEYS:
- self._stats[key] = fmean([s[key] for s in self._samples], weights)
- else:
- self._stats = None
- self._psu = None
- def __repr__(self):
- return f'RunProfile[pid={self._pid}, '\
- f'duration={self.duration.total_seconds():.3f}s, '\
- f'stats={self.stats}]'
- class RunTcpDump:
- def __init__(self, env, run_dir):
- self._env = env
- self._run_dir = run_dir
- self._proc = None
- self._stdoutfile = os.path.join(self._run_dir, 'tcpdump.out')
- self._stderrfile = os.path.join(self._run_dir, 'tcpdump.err')
- @property
- def stats(self) -> Optional[List[str]]:
- if self._proc:
- raise Exception('tcpdump still running')
- return [line
- for line in open(self._stdoutfile)
- if re.match(r'.* IP 127\.0\.0\.1\.\d+ [<>] 127\.0\.0\.1\.\d+:.*', line)]
- def stats_excluding(self, src_port) -> Optional[List[str]]:
- if self._proc:
- raise Exception('tcpdump still running')
- return [line
- for line in self.stats
- if not re.match(r'.* IP 127\.0\.0\.1\.' + str(src_port) + ' >.*', line)]
- @property
- def stderr(self) -> List[str]:
- if self._proc:
- raise Exception('tcpdump still running')
- return open(self._stderrfile).readlines()
- def sample(self):
- # not sure how to make that detection reliable for all platforms
- local_if = 'lo0' if sys.platform.startswith('darwin') else 'lo'
- try:
- tcpdump = self._env.tcpdump()
- if tcpdump is None:
- raise Exception('tcpdump not available')
- # look with tcpdump for TCP RST packets which indicate
- # we did not shut down connections cleanly
- args = []
- # at least on Linux, we need root permissions to run tcpdump
- if sys.platform.startswith('linux'):
- args.append('sudo')
- args.extend([
- tcpdump, '-i', local_if, '-n', 'tcp[tcpflags] & (tcp-rst)!=0'
- ])
- with open(self._stdoutfile, 'w') as cout, open(self._stderrfile, 'w') as cerr:
- self._proc = subprocess.Popen(args, stdout=cout, stderr=cerr,
- text=True, cwd=self._run_dir,
- shell=False)
- assert self._proc
- assert self._proc.returncode is None
- while self._proc:
- try:
- self._proc.wait(timeout=1)
- except subprocess.TimeoutExpired:
- pass
- except Exception:
- log.exception('Tcpdump')
- def start(self):
- def do_sample():
- self.sample()
- t = Thread(target=do_sample)
- t.start()
- def finish(self):
- if self._proc:
- time.sleep(1)
- self._proc.terminate()
- self._proc = None
- class ExecResult:
- def __init__(self, args: List[str], exit_code: int,
- stdout: List[str], stderr: List[str],
- duration: Optional[timedelta] = None,
- with_stats: bool = False,
- exception: Optional[str] = None,
- profile: Optional[RunProfile] = None,
- tcpdump: Optional[RunTcpDump] = None):
- self._args = args
- self._exit_code = exit_code
- self._exception = exception
- self._stdout = stdout
- self._stderr = stderr
- self._profile = profile
- self._tcpdump = tcpdump
- self._duration = duration if duration is not None else timedelta()
- self._response = None
- self._responses = []
- self._results = {}
- self._assets = []
- self._stats = []
- self._json_out = None
- self._with_stats = with_stats
- if with_stats:
- self._parse_stats()
- else:
- # noinspection PyBroadException
- try:
- out = ''.join(self._stdout)
- self._json_out = json.loads(out)
- except: # noqa: E722
- pass
- def __repr__(self):
- return f"ExecResult[code={self.exit_code}, exception={self._exception}, "\
- f"args={self._args}, stdout={self._stdout}, stderr={self._stderr}]"
- def _parse_stats(self):
- self._stats = []
- for line in self._stdout:
- try:
- self._stats.append(json.loads(line))
- # TODO: specify specific exceptions here
- except: # noqa: E722
- log.exception(f'not a JSON stat: {line}')
- break
- @property
- def exit_code(self) -> int:
- return self._exit_code
- @property
- def args(self) -> List[str]:
- return self._args
- @property
- def outraw(self) -> bytes:
- return ''.join(self._stdout).encode()
- @property
- def stdout(self) -> str:
- return ''.join(self._stdout)
- @property
- def json(self) -> Optional[Dict]:
- """Output as JSON dictionary or None if not parseable."""
- return self._json_out
- @property
- def stderr(self) -> str:
- return ''.join(self._stderr)
- @property
- def trace_lines(self) -> List[str]:
- return self._stderr
- @property
- def duration(self) -> timedelta:
- return self._duration
- @property
- def profile(self) -> Optional[RunProfile]:
- return self._profile
- @property
- def tcpdump(self) -> Optional[RunTcpDump]:
- return self._tcpdump
- @property
- def response(self) -> Optional[Dict]:
- return self._response
- @property
- def responses(self) -> List[Dict]:
- return self._responses
- @property
- def results(self) -> Dict:
- return self._results
- @property
- def assets(self) -> List:
- return self._assets
- @property
- def with_stats(self) -> bool:
- return self._with_stats
- @property
- def stats(self) -> List:
- return self._stats
- @property
- def total_connects(self) -> Optional[int]:
- if len(self.stats):
- n = 0
- for stat in self.stats:
- n += stat['num_connects']
- return n
- return None
- def add_response(self, resp: Dict):
- self._response = resp
- self._responses.append(resp)
- def add_results(self, results: Dict):
- self._results.update(results)
- if 'response' in results:
- self.add_response(results['response'])
- def add_assets(self, assets: List):
- self._assets.extend(assets)
- def check_exit_code(self, code: Union[int, bool]):
- if code is True:
- assert self.exit_code == 0, f'expected exit code {code}, '\
- f'got {self.exit_code}\n{self.dump_logs()}'
- elif code is False:
- assert self.exit_code != 0, f'expected exit code {code}, '\
- f'got {self.exit_code}\n{self.dump_logs()}'
- else:
- assert self.exit_code == code, f'expected exit code {code}, '\
- f'got {self.exit_code}\n{self.dump_logs()}'
- def check_response(self, http_status: Optional[int] = 200,
- count: Optional[int] = 1,
- protocol: Optional[str] = None,
- exitcode: Optional[int] = 0,
- connect_count: Optional[int] = None):
- if exitcode:
- self.check_exit_code(exitcode)
- if self.with_stats and isinstance(exitcode, int):
- for idx, x in enumerate(self.stats):
- if 'exitcode' in x:
- assert int(x['exitcode']) == exitcode, \
- f'response #{idx} exitcode: expected {exitcode}, '\
- f'got {x["exitcode"]}\n{self.dump_logs()}'
- if self.with_stats:
- assert len(self.stats) == count, \
- f'response count: expected {count}, ' \
- f'got {len(self.stats)}\n{self.dump_logs()}'
- else:
- assert len(self.responses) == count, \
- f'response count: expected {count}, ' \
- f'got {len(self.responses)}\n{self.dump_logs()}'
- if http_status is not None:
- if self.with_stats:
- for idx, x in enumerate(self.stats):
- assert 'http_code' in x, \
- f'response #{idx} reports no http_code\n{self.dump_stat(x)}'
- assert x['http_code'] == http_status, \
- f'response #{idx} http_code: expected {http_status}, '\
- f'got {x["http_code"]}\n{self.dump_stat(x)}'
- else:
- for idx, x in enumerate(self.responses):
- assert x['status'] == http_status, \
- f'response #{idx} status: expected {http_status},'\
- f'got {x["status"]}\n{self.dump_stat(x)}'
- if protocol is not None:
- if self.with_stats:
- http_version = None
- if protocol == 'HTTP/1.1':
- http_version = '1.1'
- elif protocol == 'HTTP/2':
- http_version = '2'
- elif protocol == 'HTTP/3':
- http_version = '3'
- if http_version is not None:
- for idx, x in enumerate(self.stats):
- assert x['http_version'] == http_version, \
- f'response #{idx} protocol: expected http/{http_version},' \
- f'got version {x["http_version"]}\n{self.dump_stat(x)}'
- else:
- for idx, x in enumerate(self.responses):
- assert x['protocol'] == protocol, \
- f'response #{idx} protocol: expected {protocol},'\
- f'got {x["protocol"]}\n{self.dump_logs()}'
- if connect_count is not None:
- assert self.total_connects == connect_count, \
- f'expected {connect_count}, but {self.total_connects} '\
- f'were made\n{self.dump_logs()}'
- def check_stats(self, count: int, http_status: Optional[int] = None,
- exitcode: Optional[int] = None,
- remote_port: Optional[int] = None,
- remote_ip: Optional[str] = None):
- if exitcode is None:
- self.check_exit_code(0)
- assert len(self.stats) == count, \
- f'stats count: expected {count}, got {len(self.stats)}\n{self.dump_logs()}'
- if http_status is not None:
- for idx, x in enumerate(self.stats):
- assert 'http_code' in x, \
- f'status #{idx} reports no http_code\n{self.dump_stat(x)}'
- assert x['http_code'] == http_status, \
- f'status #{idx} http_code: expected {http_status}, '\
- f'got {x["http_code"]}\n{self.dump_stat(x)}'
- if exitcode is not None:
- for idx, x in enumerate(self.stats):
- if 'exitcode' in x:
- assert x['exitcode'] == exitcode, \
- f'status #{idx} exitcode: expected {exitcode}, '\
- f'got {x["exitcode"]}\n{self.dump_stat(x)}'
- if remote_port is not None:
- for idx, x in enumerate(self.stats):
- assert 'remote_port' in x, f'remote_port missing\n{self.dump_stat(x)}'
- assert x['remote_port'] == remote_port, \
- f'status #{idx} remote_port: expected {remote_port}, '\
- f'got {x["remote_port"]}\n{self.dump_stat(x)}'
- if remote_ip is not None:
- for idx, x in enumerate(self.stats):
- assert 'remote_ip' in x, f'remote_ip missing\n{self.dump_stat(x)}'
- assert x['remote_ip'] == remote_ip, \
- f'status #{idx} remote_ip: expected {remote_ip}, '\
- f'got {x["remote_ip"]}\n{self.dump_stat(x)}'
- def dump_logs(self):
- lines = ['>>--stdout ----------------------------------------------\n']
- lines.extend(self._stdout)
- lines.append('>>--stderr ----------------------------------------------\n')
- lines.extend(self._stderr)
- lines.append('<<-------------------------------------------------------\n')
- return ''.join(lines)
- def dump_stat(self, x):
- lines = [
- 'json stat from curl:',
- json.JSONEncoder(indent=2).encode(x),
- ]
- if 'xfer_id' in x:
- xfer_id = x['xfer_id']
- lines.append(f'>>--xfer {xfer_id} trace:\n')
- lines.extend(self.xfer_trace_for(xfer_id))
- else:
- lines.append('>>--full trace-------------------------------------------\n')
- lines.extend(self._stderr)
- lines.append('<<-------------------------------------------------------\n')
- return ''.join(lines)
- def xfer_trace_for(self, xfer_id) -> List[str]:
- pat = re.compile(f'^[^[]* \\[{xfer_id}-.*$')
- return [line for line in self._stderr if pat.match(line)]
- class CurlClient:
- ALPN_ARG = {
- 'http/0.9': '--http0.9',
- 'http/1.0': '--http1.0',
- 'http/1.1': '--http1.1',
- 'h2': '--http2',
- 'h2c': '--http2',
- 'h3': '--http3-only',
- }
- def __init__(self, env: Env,
- run_dir: Optional[str] = None,
- timeout: Optional[float] = None,
- silent: bool = False,
- run_env: Optional[Dict[str, str]] = None,
- server_addr: Optional[str] = None):
- self.env = env
- self._timeout = timeout if timeout else env.test_timeout
- self._curl = os.environ['CURL'] if 'CURL' in os.environ else env.curl
- self._run_dir = run_dir if run_dir else os.path.join(env.gen_dir, 'curl')
- self._stdoutfile = f'{self._run_dir}/curl.stdout'
- self._stderrfile = f'{self._run_dir}/curl.stderr'
- self._headerfile = f'{self._run_dir}/curl.headers'
- self._log_path = f'{self._run_dir}/curl.log'
- self._silent = silent
- self._run_env = run_env
- self._server_addr = server_addr if server_addr else '127.0.0.1'
- self._rmrf(self._run_dir)
- self._mkpath(self._run_dir)
- @property
- def run_dir(self) -> str:
- return self._run_dir
- def download_file(self, i: int) -> str:
- return os.path.join(self.run_dir, f'download_{i}.data')
- def _rmf(self, path):
- if os.path.exists(path):
- return os.remove(path)
- def _rmrf(self, path):
- if os.path.exists(path):
- return shutil.rmtree(path)
- def _mkpath(self, path):
- if not os.path.exists(path):
- return os.makedirs(path)
- def get_proxy_args(self, proto: str = 'http/1.1',
- proxys: bool = True, tunnel: bool = False,
- use_ip: bool = False):
- proxy_name = self._server_addr if use_ip else self.env.proxy_domain
- if proxys:
- pport = self.env.pts_port(proto) if tunnel else self.env.proxys_port
- xargs = [
- '--proxy', f'https://{proxy_name}:{pport}/',
- '--resolve', f'{proxy_name}:{pport}:{self._server_addr}',
- '--proxy-cacert', self.env.ca.cert_file,
- ]
- if proto == 'h2':
- xargs.append('--proxy-http2')
- else:
- xargs = [
- '--proxy', f'http://{proxy_name}:{self.env.proxy_port}/',
- '--resolve', f'{proxy_name}:{self.env.proxy_port}:{self._server_addr}',
- ]
- if tunnel:
- xargs.append('--proxytunnel')
- return xargs
- def http_get(self, url: str, extra_args: Optional[List[str]] = None,
- alpn_proto: Optional[str] = None,
- def_tracing: bool = True,
- with_stats: bool = False,
- with_profile: bool = False,
- with_tcpdump: bool = False):
- return self._raw(url, options=extra_args,
- with_stats=with_stats,
- alpn_proto=alpn_proto,
- def_tracing=def_tracing,
- with_profile=with_profile,
- with_tcpdump=with_tcpdump)
- def http_download(self, urls: List[str],
- alpn_proto: Optional[str] = None,
- with_stats: bool = True,
- with_headers: bool = False,
- with_profile: bool = False,
- with_tcpdump: bool = False,
- no_save: bool = False,
- extra_args: Optional[List[str]] = None):
- if extra_args is None:
- extra_args = []
- if no_save:
- extra_args.extend([
- '-o', '/dev/null',
- ])
- else:
- extra_args.extend([
- '-o', 'download_#1.data',
- ])
- # remove any existing ones
- for i in range(100):
- self._rmf(self.download_file(i))
- if with_stats:
- extra_args.extend([
- '-w', '%{json}\\n'
- ])
- return self._raw(urls, alpn_proto=alpn_proto, options=extra_args,
- with_stats=with_stats,
- with_headers=with_headers,
- with_profile=with_profile,
- with_tcpdump=with_tcpdump)
- def http_upload(self, urls: List[str], data: str,
- alpn_proto: Optional[str] = None,
- with_stats: bool = True,
- with_headers: bool = False,
- with_profile: bool = False,
- with_tcpdump: bool = False,
- extra_args: Optional[List[str]] = None):
- if extra_args is None:
- extra_args = []
- extra_args.extend([
- '--data-binary', data, '-o', 'download_#1.data',
- ])
- if with_stats:
- extra_args.extend([
- '-w', '%{json}\\n'
- ])
- return self._raw(urls, alpn_proto=alpn_proto, options=extra_args,
- with_stats=with_stats,
- with_headers=with_headers,
- with_profile=with_profile,
- with_tcpdump=with_tcpdump)
- def http_delete(self, urls: List[str],
- alpn_proto: Optional[str] = None,
- with_stats: bool = True,
- with_profile: bool = False,
- extra_args: Optional[List[str]] = None):
- if extra_args is None:
- extra_args = []
- extra_args.extend([
- '-X', 'DELETE', '-o', '/dev/null',
- ])
- if with_stats:
- extra_args.extend([
- '-w', '%{json}\\n'
- ])
- return self._raw(urls, alpn_proto=alpn_proto, options=extra_args,
- with_stats=with_stats,
- with_headers=False,
- with_profile=with_profile)
- def http_put(self, urls: List[str], data=None, fdata=None,
- alpn_proto: Optional[str] = None,
- with_stats: bool = True,
- with_headers: bool = False,
- with_profile: bool = False,
- extra_args: Optional[List[str]] = None):
- if extra_args is None:
- extra_args = []
- if fdata is not None:
- extra_args.extend(['-T', fdata])
- elif data is not None:
- extra_args.extend(['-T', '-'])
- extra_args.extend([
- '-o', 'download_#1.data',
- ])
- if with_stats:
- extra_args.extend([
- '-w', '%{json}\\n'
- ])
- return self._raw(urls, intext=data,
- alpn_proto=alpn_proto, options=extra_args,
- with_stats=with_stats,
- with_headers=with_headers,
- with_profile=with_profile)
- def http_form(self, urls: List[str], form: Dict[str, str],
- alpn_proto: Optional[str] = None,
- with_stats: bool = True,
- with_headers: bool = False,
- extra_args: Optional[List[str]] = None):
- if extra_args is None:
- extra_args = []
- for key, val in form.items():
- extra_args.extend(['-F', f'{key}={val}'])
- extra_args.extend([
- '-o', 'download_#1.data',
- ])
- if with_stats:
- extra_args.extend([
- '-w', '%{json}\\n'
- ])
- return self._raw(urls, alpn_proto=alpn_proto, options=extra_args,
- with_stats=with_stats,
- with_headers=with_headers)
- def ftp_get(self, urls: List[str],
- with_stats: bool = True,
- with_profile: bool = False,
- with_tcpdump: bool = False,
- no_save: bool = False,
- extra_args: Optional[List[str]] = None):
- if extra_args is None:
- extra_args = []
- if no_save:
- extra_args.extend([
- '-o', '/dev/null',
- ])
- else:
- extra_args.extend([
- '-o', 'download_#1.data',
- ])
- # remove any existing ones
- for i in range(100):
- self._rmf(self.download_file(i))
- if with_stats:
- extra_args.extend([
- '-w', '%{json}\\n'
- ])
- return self._raw(urls, options=extra_args,
- with_stats=with_stats,
- with_headers=False,
- with_profile=with_profile,
- with_tcpdump=with_tcpdump)
- def ftp_ssl_get(self, urls: List[str],
- with_stats: bool = True,
- with_profile: bool = False,
- with_tcpdump: bool = False,
- no_save: bool = False,
- extra_args: Optional[List[str]] = None):
- if extra_args is None:
- extra_args = []
- extra_args.extend([
- '--ssl-reqd',
- ])
- return self.ftp_get(urls=urls, with_stats=with_stats,
- with_profile=with_profile, no_save=no_save,
- with_tcpdump=with_tcpdump,
- extra_args=extra_args)
- def ftp_upload(self, urls: List[str],
- fupload: Optional[Any] = None,
- updata: Optional[str] = None,
- with_stats: bool = True,
- with_profile: bool = False,
- with_tcpdump: bool = False,
- extra_args: Optional[List[str]] = None):
- if extra_args is None:
- extra_args = []
- if fupload is not None:
- extra_args.extend([
- '--upload-file', fupload
- ])
- elif updata is not None:
- extra_args.extend([
- '--upload-file', '-'
- ])
- else:
- raise Exception('need either file or data to upload')
- if with_stats:
- extra_args.extend([
- '-w', '%{json}\\n'
- ])
- return self._raw(urls, options=extra_args,
- intext=updata,
- with_stats=with_stats,
- with_headers=False,
- with_profile=with_profile,
- with_tcpdump=with_tcpdump)
- def ftp_ssl_upload(self, urls: List[str],
- fupload: Optional[Any] = None,
- updata: Optional[str] = None,
- with_stats: bool = True,
- with_profile: bool = False,
- with_tcpdump: bool = False,
- extra_args: Optional[List[str]] = None):
- if extra_args is None:
- extra_args = []
- extra_args.extend([
- '--ssl-reqd',
- ])
- return self.ftp_upload(urls=urls, fupload=fupload, updata=updata,
- with_stats=with_stats, with_profile=with_profile,
- with_tcpdump=with_tcpdump,
- extra_args=extra_args)
- def response_file(self, idx: int):
- return os.path.join(self._run_dir, f'download_{idx}.data')
- def run_direct(self, args, with_stats: bool = False, with_profile: bool = False):
- my_args = [self._curl]
- if with_stats:
- my_args.extend([
- '-w', '%{json}\\n'
- ])
- my_args.extend([
- '-o', 'download.data',
- ])
- my_args.extend(args)
- return self._run(args=my_args, with_stats=with_stats, with_profile=with_profile)
- def _run(self, args, intext='', with_stats: bool = False,
- with_profile: bool = True, with_tcpdump: bool = False):
- self._rmf(self._stdoutfile)
- self._rmf(self._stderrfile)
- self._rmf(self._headerfile)
- exception = None
- profile = None
- tcpdump = None
- started_at = datetime.now()
- if with_tcpdump:
- tcpdump = RunTcpDump(self.env, self._run_dir)
- tcpdump.start()
- try:
- with open(self._stdoutfile, 'w') as cout, open(self._stderrfile, 'w') as cerr:
- if with_profile:
- end_at = started_at + timedelta(seconds=self._timeout) \
- if self._timeout else None
- log.info(f'starting: {args}')
- p = subprocess.Popen(args, stderr=cerr, stdout=cout,
- cwd=self._run_dir, shell=False,
- env=self._run_env)
- profile = RunProfile(p.pid, started_at, self._run_dir)
- if intext is not None and False:
- p.communicate(input=intext.encode(), timeout=1)
- ptimeout = 0.0
- while True:
- try:
- p.wait(timeout=ptimeout)
- break
- except subprocess.TimeoutExpired:
- if end_at and datetime.now() >= end_at:
- p.kill()
- raise subprocess.TimeoutExpired(cmd=args, timeout=self._timeout)
- profile.sample()
- ptimeout = 0.01
- exitcode = p.returncode
- profile.finish()
- log.info(f'done: exit={exitcode}, profile={profile}')
- else:
- p = subprocess.run(args, stderr=cerr, stdout=cout,
- cwd=self._run_dir, shell=False,
- input=intext.encode() if intext else None,
- timeout=self._timeout,
- env=self._run_env)
- exitcode = p.returncode
- except subprocess.TimeoutExpired:
- now = datetime.now()
- duration = now - started_at
- log.warning(f'Timeout at {now} after {duration.total_seconds()}s '
- f'(configured {self._timeout}s): {args}')
- exitcode = -1
- exception = 'TimeoutExpired'
- if tcpdump:
- tcpdump.finish()
- coutput = open(self._stdoutfile).readlines()
- cerrput = open(self._stderrfile).readlines()
- return ExecResult(args=args, exit_code=exitcode, exception=exception,
- stdout=coutput, stderr=cerrput,
- duration=datetime.now() - started_at,
- with_stats=with_stats,
- profile=profile, tcpdump=tcpdump)
- def _raw(self, urls, intext='', timeout=None, options=None, insecure=False,
- alpn_proto: Optional[str] = None,
- force_resolve=True,
- with_stats=False,
- with_headers=True,
- def_tracing=True,
- with_profile=False,
- with_tcpdump=False):
- args = self._complete_args(
- urls=urls, timeout=timeout, options=options, insecure=insecure,
- alpn_proto=alpn_proto, force_resolve=force_resolve,
- with_headers=with_headers, def_tracing=def_tracing)
- r = self._run(args, intext=intext, with_stats=with_stats,
- with_profile=with_profile, with_tcpdump=with_tcpdump)
- if r.exit_code == 0 and with_headers:
- self._parse_headerfile(self._headerfile, r=r)
- return r
- def _complete_args(self, urls, timeout=None, options=None,
- insecure=False, force_resolve=True,
- alpn_proto: Optional[str] = None,
- with_headers: bool = True,
- def_tracing: bool = True):
- if not isinstance(urls, list):
- urls = [urls]
- args = [self._curl, "-s", "--path-as-is"]
- if 'CURL_TEST_EVENT' in os.environ:
- args.append('--test-event')
- if with_headers:
- args.extend(["-D", self._headerfile])
- if def_tracing is not False and not self._silent:
- args.extend(['-v', '--trace-ids', '--trace-time'])
- if self.env.verbose > 1:
- args.extend(['--trace-config', 'http/2,http/3,h2-proxy,h1-proxy'])
- active_options = options
- if options is not None and '--next' in options:
- active_options = options[options.index('--next') + 1:]
- for url in urls:
- u = urlparse(urls[0])
- if options:
- args.extend(options)
- if alpn_proto is not None:
- if alpn_proto not in self.ALPN_ARG:
- raise Exception(f'unknown ALPN protocol: "{alpn_proto}"')
- args.append(self.ALPN_ARG[alpn_proto])
- if u.scheme == 'http':
- pass
- elif insecure:
- args.append('--insecure')
- elif active_options and "--cacert" in active_options:
- pass
- elif u.hostname:
- args.extend(["--cacert", self.env.ca.cert_file])
- if force_resolve and u.hostname and u.hostname != 'localhost' \
- and not re.match(r'^(\d+|\[|:).*', u.hostname):
- port = u.port if u.port else 443
- args.extend([
- '--resolve', f'{u.hostname}:{port}:{self._server_addr}',
- ])
- if timeout is not None and int(timeout) > 0:
- args.extend(["--connect-timeout", str(int(timeout))])
- args.append(url)
- return args
- def _parse_headerfile(self, headerfile: str, r: Optional[ExecResult] = None) -> ExecResult:
- lines = open(headerfile).readlines()
- if r is None:
- r = ExecResult(args=[], exit_code=0, stdout=[], stderr=[])
- response = None
- def fin_response(resp):
- if resp:
- r.add_response(resp)
- expected = ['status']
- for line in lines:
- line = line.strip()
- if re.match(r'^$', line):
- if 'trailer' in expected:
- # end of trailers
- fin_response(response)
- response = None
- expected = ['status']
- elif 'header' in expected:
- # end of header, another status or trailers might follow
- expected = ['status', 'trailer']
- else:
- assert False, f"unexpected line: '{line}'"
- continue
- if 'status' in expected:
- # log.debug("reading 1st response line: %s", line)
- m = re.match(r'^(\S+) (\d+)( .*)?$', line)
- if m:
- fin_response(response)
- response = {
- "protocol": m.group(1),
- "status": int(m.group(2)),
- "description": m.group(3),
- "header": {},
- "trailer": {},
- "body": r.outraw
- }
- expected = ['header']
- continue
- if 'trailer' in expected:
- m = re.match(r'^([^:]+):\s*(.*)$', line)
- if m:
- response['trailer'][m.group(1).lower()] = m.group(2)
- continue
- if 'header' in expected:
- m = re.match(r'^([^:]+):\s*(.*)$', line)
- if m:
- response['header'][m.group(1).lower()] = m.group(2)
- continue
- assert False, f"unexpected line: '{line}, expected: {expected}'"
- fin_response(response)
- return r
|