#!/usr/bin/env python3 from __future__ import print_function, unicode_literals, absolute_import import argparse import coverage import json import logging import multiprocessing import os import shutil import subprocess import sys import threading import time RUNNER_PY = "nosetests" RUNNER_PY2 = "nosetests-2" RUNNER_PY3 = "nosetests-3" COVER_PY = "coverage" COVER_PY2 = "coverage2" COVER_PY3 = "coverage3" LASTLEN = None NUMREMAINING = None PRINTLOCK = None RUNNING = [] FAILED = [] NUMPROCS = multiprocessing.cpu_count() - 1 if os.environ.get('BUILD_ID'): NUMPROCS = multiprocessing.cpu_count() HERE = os.path.join(os.path.dirname(os.path.abspath(__file__))) LOG = logging.getLogger(__name__) def setup_parser(): """ Set up the command line arguments supported and return the arguments """ parser = argparse.ArgumentParser(description="Run the Pagure tests") parser.add_argument( "--debug", dest="debug", action="store_true", default=False, help="Increase the level of data logged.", ) subparsers = parser.add_subparsers(title="actions") # RUN parser_run = subparsers.add_parser("run", help="Run the tests") parser_run.add_argument( "--py2", dest="py2", action="store_true", default=False, help="Runs the tests only in python2 instead of both python2 and python3", ) parser_run.add_argument( "--py3", dest="py3", action="store_true", default=False, help="Runs the tests only in python3 instead of both python2 and python3", ) parser_run.add_argument( "--results", default="results", help="Specify a folder in which the results should be placed " "(defaults to `results`)", ) parser_run.add_argument( "-f", "--force", default=False, action="store_true", help="Override the results and newfailed file without asking you", ) parser_run.add_argument( "--with-coverage", default=False, action="store_true", help="Also build coverage report", ) parser_run.add_argument( "failed_tests", nargs="?", help="File containing a JSON list of the failed tests to run or " "pointing to a test file to run.", ) parser_run.set_defaults(func=do_run) # RERUN parser_run = subparsers.add_parser("rerun", help="Run failed tests") parser_run.add_argument( "--debug", dest="debug", action="store_true", default=False, help="Expand the level of data returned.", ) parser_run.add_argument( "--py2", dest="py2", action="store_true", default=False, help="Runs the tests only in python2 instead of both python2 and python3", ) parser_run.add_argument( "--py3", dest="py3", action="store_true", default=False, help="Runs the tests only in python3 instead of both python2 and python3", ) parser_run.add_argument( "--results", default="results", help="Specify a folder in which the results should be placed " "(defaults to `results`)", ) parser_run.add_argument( "--with-coverage", default=False, action="store_true", help="Also build coverage report", ) parser_run.set_defaults(func=do_rerun) # LIST parser_run = subparsers.add_parser("list", help="List failed tests") parser_run.add_argument( "--results", default="results", help="Specify a folder in which the results should be placed " "(defaults to `results`)", ) parser_run.add_argument( "--show", default=False, action="store_true", help="Show the error files using `less`", ) parser_run.add_argument( "-n", default=None, nargs="?", type=int, help="Number of failed test to show", ) parser_run.set_defaults(func=do_list) # SHOW-COVERAGE parser_run = subparsers.add_parser( "show-coverage", help="Shows the coverage report from the data in the results folder") parser_run.add_argument( "--debug", dest="debug", action="store_true", default=False, help="Expand the level of data returned.", ) parser_run.add_argument( "--py2", dest="py2", action="store_true", default=False, help="Runs the tests only in python2 instead of both python2 and python3", ) parser_run.add_argument( "--py3", dest="py3", action="store_true", default=False, help="Runs the tests only in python3 instead of both python2 and python3", ) parser_run.add_argument( "--results", default="results", help="Specify a folder in which the results should be placed " "(defaults to `results`)", ) parser_run.set_defaults(func=do_show_coverage) return parser def clean_line(): global LASTLEN with PRINTLOCK: if LASTLEN is not None: print(" " * LASTLEN, end="\r") LASTLEN = None def print_running(): global LASTLEN with PRINTLOCK: msg = "Running %d suites: %d remaining, %d failed" % ( len(RUNNING), NUMREMAINING, len(FAILED), ) LASTLEN = len(msg) print(msg, end="\r") def add_running(suite): global NUMREMAINING with PRINTLOCK: NUMREMAINING -= 1 RUNNING.append(suite) clean_line() print_running() def remove_running(suite, failed): with PRINTLOCK: RUNNING.remove(suite) clean_line() status = 'passed' if failed: status = 'FAILED' print("Test suite %s: %s" % (status, suite)) print_running() class WorkerThread(threading.Thread): def __init__(self, sem, pyver, suite, results, with_cover): name = "py%s-%s" % (pyver, suite) super(WorkerThread, self).__init__(name="worker-%s" % name) self.name = name self.sem = sem self.pyver = pyver self.suite = suite self.failed = None self.results = results self.with_cover = with_cover def run(self): with self.sem: add_running(self.name) with open(os.path.join(self.results, self.name), "w") as resfile: if self.pyver == 2: runner = RUNNER_PY2 elif self.pyver == 3: runner = RUNNER_PY3 else: runner = RUNNER_PY cmd = [runner, "-v", "tests.%s" % self.suite] if self.with_cover: cmd.append("--with-cover") env = os.environ.copy() env.update({ "PAGURE_CONFIG": "../tests/test_config", "COVERAGE_FILE": os.path.join( self.results, "%s.coverage" % self.name ), "LANG": "en_US.UTF-8", }) proc = subprocess.Popen( cmd, cwd=".", stdout=resfile, stderr=subprocess.STDOUT, env=env ) res = proc.wait() if res == 0: self.failed = False else: self.failed = True if not self.failed is not True: with PRINTLOCK: FAILED.append(self.name) remove_running(self.name, self.failed) def do_run(args): """ Performs some checks and runs the tests. """ # Some pre-flight checks if not os.path.exists("./.git") or not os.path.exists("./nosetests3"): print("Please run from a single level into the Pagure codebase") return 1 if os.path.exists(args.results): if not args.force: print( "Results folder exists, please remove it so we do not clobber" " or use --force" ) return 1 else: try: shutil.rmtree(args.results) os.mkdir(args.results) except: print( "Could not delete the %s directory, it will be " "wiped clean" % args.results) for content in os.listdir(args.results): os.remove(content) else: os.mkdir(args.results) print("Pre-flight checks passed") suites = [] if args.failed_tests: here = os.path.join(os.path.dirname(os.path.abspath(__file__))) failed_tests_fullpath = os.path.join(here, args.failed_tests) if not os.path.exists(failed_tests_fullpath): print("Could not find the specified file:%s" % failed_tests_fullpath) return 1 print("Loading failed tests") try: with open(failed_tests_fullpath, "r") as ffile: suites = json.loads(ffile.read()) except: bname = os.path.basename(args.failed_tests) if bname.endswith(".py") and bname.startswith("test_"): suites.append(bname.replace(".py", "")) if len(suites) == 0: print("Loading all tests") for fname in os.listdir("./tests"): if not fname.endswith(".py"): continue if not fname.startswith("test_"): continue suites.append(fname.replace(".py", "")) return _run_test_suites(args, suites) def do_rerun(args): """ Re-run tests that failed the last/specified run. """ # Some pre-flight checks if not os.path.exists("./.git") or not os.path.exists("./pagure"): print("Please run from a single level into the Pagure codebase") return 1 if not os.path.exists(args.results): print("Could not find an existing results folder at: %s" % args.results) return 1 if not os.path.exists(os.path.join(args.results, "newfailed")): print( "Could not find an failed tests in the results folder at: %s" % args.results ) return 1 print("Pre-flight checks passed") suites = [] tmp = [] print("Loading failed tests") try: with open(os.path.join(args.results, "newfailed"), "r") as ffile: tmp = json.loads(ffile.read()) except json.decoder.JSONDecodeError: print("File containing the failed tests is not JSON") return 1 for suite in tmp: if suite.startswith(("py2-", "py3-")): suites.append(suite[4:]) return _run_test_suites(args, set(suites)) def _get_pyvers(args): pyvers = [2, 3] if args.py2: pyvers = [2,] elif args.py3: pyvers = [3,] un_versioned = False try: subprocess.check_call(["which", RUNNER_PY]) un_versioned = True except subprocess.CalledProcessError: print("No %s found no unversioned runner" % RUNNER_PY) if 2 in pyvers: nopy2 = False try: subprocess.check_call(["which", RUNNER_PY2]) except subprocess.CalledProcessError: print("No %s found, removing python 2" % RUNNER_PY2) del pyvers[pyvers.index(2)] if 3 in pyvers: nopy3 = False try: subprocess.check_call(["which", RUNNER_PY3]) except subprocess.CalledProcessError: print("No %s found, removing python 3" % RUNNER_PY3) del pyvers[pyvers.index(3)] if not pyvers and un_versioned: pyvers = [""] return pyvers def _run_test_suites(args, suites): print("Using %d processes" % NUMPROCS) print("Start timing") start = time.time() global PRINTLOCK PRINTLOCK = threading.RLock() global NUMREMAINING NUMREMAINING = 0 sem = threading.BoundedSemaphore(NUMPROCS) # Create a worker per test workers = {} pyvers = _get_pyvers(args) if not pyvers: return 1 if len(pyvers) == 1: if pyvers[0] == 2: subprocess.check_call([ "sed", "-i", "-e", "s|python|python2|", "pagure/hooks/files/hookrunner" ]) subprocess.check_call([ "sed", "-i", "-e", "s|['alembic',|['alembic-2',|", "tests/test_alembic.py" ]) elif pyvers[0] == 3: subprocess.check_call([ "sed", "-i", "-e", "s|python|python3|", "pagure/hooks/files/hookrunner" ], cwd=HERE) subprocess.check_call([ "sed", "-i", "-e", "s|\['alembic',|\['alembic-3',|", "tests/test_alembic.py" ], cwd=HERE) for suite in suites: for pyver in pyvers: NUMREMAINING += 1 workers["py%s-%s" % (pyver, suite)] = WorkerThread( sem, pyver, suite, args.results, args.with_coverage ) # Start the workers print("Starting the workers") print() print() for worker in workers.values(): worker.start() # Wait for them to terminate for worker in workers: workers[worker].join() print_running() print() print("All work done") subprocess.check_call([ "git", "checkout", "pagure/hooks/files/hookrunner", "tests/test_alembic.py" ]) # Gather results print() print() if FAILED: print("Failed tests:") for worker in workers: if not workers[worker].failed: continue print("FAILED test: %s" % (worker)) # Write failed if FAILED: with open(os.path.join(args.results, "newfailed"), "w") as ffile: ffile.write(json.dumps(FAILED)) # Exit outcode = 0 if len(FAILED) == 0: print("ALL PASSED! CONGRATULATIONS!") else: outcode = 1 # Stats end = time.time() print() print() print( "Ran %d tests in %f seconds, of which %d failed" % (len(workers), (end - start), len(FAILED)) ) if outcode == 0 and args.with_coverage: do_show_coverage(args) return outcode def do_list(args): """ List tests that failed the last/specified run. """ # Some pre-flight checks if not os.path.exists("./.git") or not os.path.exists("./pagure"): print("Please run from a single level into the Pagure codebase") return 1 if not os.path.exists(args.results): print("Could not find an existing results folder at: %s" % args.results) return 1 if not os.path.exists(os.path.join(args.results, "newfailed")): print( "Could not find an failed tests in the results folder at: %s" % args.results ) return 1 print("Pre-flight checks passed") suites = [] tmp = [] print("Loading failed tests") try: with open(os.path.join(args.results, "newfailed"), "r") as ffile: suites = json.loads(ffile.read()) except json.decoder.JSONDecodeError: print("File containing the failed tests is not JSON") return 1 print("Failed tests") failed_tests = len(suites) if args.n: suites = suites[:args.n] print("- " + "\n- ".join(suites)) print("Total: %s test failed" % failed_tests) if args.show: for suite in suites: cmd = ["less", os.path.join(args.results, suite)] subprocess.check_call(cmd) def do_show_coverage(args): print() print("Combining coverage results...") pyvers = _get_pyvers(args) for pyver in pyvers: coverfiles = [] for fname in os.listdir(args.results): if fname.endswith(".coverage") and fname.startswith("py%s-" % pyver): coverfiles.append(os.path.join(args.results, fname)) cover = None if pyver == 2: cover = COVER_PY2 elif pyver == 3: cover = COVER_PY3 else: cover = COVER_PY env = {"COVERAGE_FILE": os.path.join(args.results, "combined.coverage")} cmd = [cover, "combine"] + coverfiles subprocess.check_call(cmd, env=env) print() print("Python %s coverage: " % pyver) cmd = [cover, "report", "--include=./pagure/*", "-m"] subprocess.check_call(cmd, env=env) def main(): """ Main function """ # Set up parser for global args parser = setup_parser() # Parse the commandline try: arg = parser.parse_args() except argparse.ArgumentTypeError as err: print("\nError: {0}".format(err)) return 2 logging.basicConfig() if arg.debug: LOG.setLevel(logging.DEBUG) if "func" not in arg: parser.print_help() return 1 arg.results = os.path.abspath(arg.results) return_code = 0 try: return_code = arg.func(arg) except KeyboardInterrupt: print("\nInterrupted by user.") return_code = 1 except Exception as err: print("Error: {0}".format(err)) logging.exception("Generic error caught:") return_code = 5 return return_code if __name__ == "__main__": sys.exit(main())