1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597 |
- """
- TestCmd.py: a testing framework for commands and scripts.
- The TestCmd module provides a framework for portable automated testing
- of executable commands and scripts (in any language, not just Python),
- especially commands and scripts that require file system interaction.
- In addition to running tests and evaluating conditions, the TestCmd
- module manages and cleans up one or more temporary workspace
- directories, and provides methods for creating files and directories in
- those workspace directories from in-line data, here-documents), allowing
- tests to be completely self-contained.
- A TestCmd environment object is created via the usual invocation:
- import TestCmd
- test = TestCmd.TestCmd()
- There are a bunch of keyword arguments available at instantiation:
- test = TestCmd.TestCmd(description = 'string',
- program = 'program_or_script_to_test',
- interpreter = 'script_interpreter',
- workdir = 'prefix',
- subdir = 'subdir',
- verbose = Boolean,
- match = default_match_function,
- diff = default_diff_function,
- combine = Boolean)
- There are a bunch of methods that let you do different things:
- test.verbose_set(1)
- test.description_set('string')
- test.program_set('program_or_script_to_test')
- test.interpreter_set('script_interpreter')
- test.interpreter_set(['script_interpreter', 'arg'])
- test.workdir_set('prefix')
- test.workdir_set('')
- test.workpath('file')
- test.workpath('subdir', 'file')
- test.subdir('subdir', ...)
- test.rmdir('subdir', ...)
- test.write('file', "contents\n")
- test.write(['subdir', 'file'], "contents\n")
- test.read('file')
- test.read(['subdir', 'file'])
- test.read('file', mode)
- test.read(['subdir', 'file'], mode)
- test.writable('dir', 1)
- test.writable('dir', None)
- test.preserve(condition, ...)
- test.cleanup(condition)
- test.command_args(program = 'program_or_script_to_run',
- interpreter = 'script_interpreter',
- arguments = 'arguments to pass to program')
- test.run(program = 'program_or_script_to_run',
- interpreter = 'script_interpreter',
- arguments = 'arguments to pass to program',
- chdir = 'directory_to_chdir_to',
- stdin = 'input to feed to the program\n')
- universal_newlines = True)
- p = test.start(program = 'program_or_script_to_run',
- interpreter = 'script_interpreter',
- arguments = 'arguments to pass to program',
- universal_newlines = None)
- test.finish(self, p)
- test.pass_test()
- test.pass_test(condition)
- test.pass_test(condition, function)
- test.fail_test()
- test.fail_test(condition)
- test.fail_test(condition, function)
- test.fail_test(condition, function, skip)
- test.no_result()
- test.no_result(condition)
- test.no_result(condition, function)
- test.no_result(condition, function, skip)
- test.stdout()
- test.stdout(run)
- test.stderr()
- test.stderr(run)
- test.symlink(target, link)
- test.banner(string)
- test.banner(string, width)
- test.diff(actual, expected)
- test.match(actual, expected)
- test.match_exact("actual 1\nactual 2\n", "expected 1\nexpected 2\n")
- test.match_exact(["actual 1\n", "actual 2\n"],
- ["expected 1\n", "expected 2\n"])
- test.match_re("actual 1\nactual 2\n", regex_string)
- test.match_re(["actual 1\n", "actual 2\n"], list_of_regexes)
- test.match_re_dotall("actual 1\nactual 2\n", regex_string)
- test.match_re_dotall(["actual 1\n", "actual 2\n"], list_of_regexes)
- test.tempdir()
- test.tempdir('temporary-directory')
- test.sleep()
- test.sleep(seconds)
- test.where_is('foo')
- test.where_is('foo', 'PATH1:PATH2')
- test.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
- test.unlink('file')
- test.unlink('subdir', 'file')
- The TestCmd module provides pass_test(), fail_test(), and no_result()
- unbound functions that report test results for use with the Aegis change
- management system. These methods terminate the test immediately,
- reporting PASSED, FAILED, or NO RESULT respectively, and exiting with
- status 0 (success), 1 or 2 respectively. This allows for a distinction
- between an actual failed test and a test that could not be properly
- evaluated because of an external condition (such as a full file system
- or incorrect permissions).
- import TestCmd
- TestCmd.pass_test()
- TestCmd.pass_test(condition)
- TestCmd.pass_test(condition, function)
- TestCmd.fail_test()
- TestCmd.fail_test(condition)
- TestCmd.fail_test(condition, function)
- TestCmd.fail_test(condition, function, skip)
- TestCmd.no_result()
- TestCmd.no_result(condition)
- TestCmd.no_result(condition, function)
- TestCmd.no_result(condition, function, skip)
- The TestCmd module also provides unbound functions that handle matching
- in the same way as the match_*() methods described above.
- import TestCmd
- test = TestCmd.TestCmd(match = TestCmd.match_exact)
- test = TestCmd.TestCmd(match = TestCmd.match_re)
- test = TestCmd.TestCmd(match = TestCmd.match_re_dotall)
- The TestCmd module provides unbound functions that can be used for the
- "diff" argument to TestCmd.TestCmd instantiation:
- import TestCmd
- test = TestCmd.TestCmd(match = TestCmd.match_re,
- diff = TestCmd.diff_re)
- test = TestCmd.TestCmd(diff = TestCmd.simple_diff)
- The "diff" argument can also be used with standard difflib functions:
- import difflib
- test = TestCmd.TestCmd(diff = difflib.context_diff)
- test = TestCmd.TestCmd(diff = difflib.unified_diff)
- Lastly, the where_is() method also exists in an unbound function
- version.
- import TestCmd
- TestCmd.where_is('foo')
- TestCmd.where_is('foo', 'PATH1:PATH2')
- TestCmd.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
- """
- # Copyright 2000-2010 Steven Knight
- # This module is free software, and you may redistribute it and/or modify
- # it under the same terms as Python itself, so long as this copyright message
- # and disclaimer are retained in their original form.
- #
- # IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
- # SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
- # THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
- # DAMAGE.
- #
- # THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
- # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
- # PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
- # AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
- # SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
- __author__ = "Steven Knight <knight at baldmt dot com>"
- __revision__ = "TestCmd.py 0.37.D001 2010/01/11 16:55:50 knight"
- __version__ = "0.37"
- import errno
- import os
- import os.path
- import re
- import shutil
- import stat
- import string
- import sys
- import tempfile
- import time
- import traceback
- import types
- import UserList
- __all__ = [
- 'diff_re',
- 'fail_test',
- 'no_result',
- 'pass_test',
- 'match_exact',
- 'match_re',
- 'match_re_dotall',
- 'python_executable',
- 'TestCmd'
- ]
- try:
- import difflib
- except ImportError:
- __all__.append('simple_diff')
- def is_List(e):
- return type(e) is types.ListType \
- or isinstance(e, UserList.UserList)
- try:
- from UserString import UserString
- except ImportError:
- class UserString:
- pass
- if hasattr(types, 'UnicodeType'):
- def is_String(e):
- return type(e) is types.StringType \
- or type(e) is types.UnicodeType \
- or isinstance(e, UserString)
- else:
- def is_String(e):
- return type(e) is types.StringType or isinstance(e, UserString)
- tempfile.template = 'testcmd.'
- if os.name in ('posix', 'nt'):
- tempfile.template = 'testcmd.' + str(os.getpid()) + '.'
- else:
- tempfile.template = 'testcmd.'
- re_space = re.compile('\s')
- _Cleanup = []
- _chain_to_exitfunc = None
- def _clean():
- global _Cleanup
- cleanlist = filter(None, _Cleanup)
- del _Cleanup[:]
- cleanlist.reverse()
- for test in cleanlist:
- test.cleanup()
- if _chain_to_exitfunc:
- _chain_to_exitfunc()
- try:
- import atexit
- except ImportError:
- # TODO(1.5): atexit requires python 2.0, so chain sys.exitfunc
- try:
- _chain_to_exitfunc = sys.exitfunc
- except AttributeError:
- pass
- sys.exitfunc = _clean
- else:
- atexit.register(_clean)
- try:
- zip
- except NameError:
- def zip(*lists):
- result = []
- for i in xrange(min(map(len, lists))):
- result.append(tuple(map(lambda l, i=i: l[i], lists)))
- return result
- class Collector:
- def __init__(self, top):
- self.entries = [top]
- def __call__(self, arg, dirname, names):
- pathjoin = lambda n, d=dirname: os.path.join(d, n)
- self.entries.extend(map(pathjoin, names))
- def _caller(tblist, skip):
- string = ""
- arr = []
- for file, line, name, text in tblist:
- if file[-10:] == "TestCmd.py":
- break
- arr = [(file, line, name, text)] + arr
- atfrom = "at"
- for file, line, name, text in arr[skip:]:
- if name in ("?", "<module>"):
- name = ""
- else:
- name = " (" + name + ")"
- string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name))
- atfrom = "\tfrom"
- return string
- def fail_test(self = None, condition = 1, function = None, skip = 0):
- """Cause the test to fail.
- By default, the fail_test() method reports that the test FAILED
- and exits with a status of 1. If a condition argument is supplied,
- the test fails only if the condition is true.
- """
- if not condition:
- return
- if not function is None:
- function()
- of = ""
- desc = ""
- sep = " "
- if not self is None:
- if self.program:
- of = " of " + self.program
- sep = "\n\t"
- if self.description:
- desc = " [" + self.description + "]"
- sep = "\n\t"
- at = _caller(traceback.extract_stack(), skip)
- sys.stderr.write("FAILED test" + of + desc + sep + at)
- sys.exit(1)
- def no_result(self = None, condition = 1, function = None, skip = 0):
- """Causes a test to exit with no valid result.
- By default, the no_result() method reports NO RESULT for the test
- and exits with a status of 2. If a condition argument is supplied,
- the test fails only if the condition is true.
- """
- if not condition:
- return
- if not function is None:
- function()
- of = ""
- desc = ""
- sep = " "
- if not self is None:
- if self.program:
- of = " of " + self.program
- sep = "\n\t"
- if self.description:
- desc = " [" + self.description + "]"
- sep = "\n\t"
- if os.environ.get('TESTCMD_DEBUG_SKIPS'):
- at = _caller(traceback.extract_stack(), skip)
- sys.stderr.write("NO RESULT for test" + of + desc + sep + at)
- else:
- sys.stderr.write("NO RESULT\n")
- sys.exit(2)
- def pass_test(self = None, condition = 1, function = None):
- """Causes a test to pass.
- By default, the pass_test() method reports PASSED for the test
- and exits with a status of 0. If a condition argument is supplied,
- the test passes only if the condition is true.
- """
- if not condition:
- return
- if not function is None:
- function()
- sys.stderr.write("PASSED\n")
- sys.exit(0)
- def match_exact(lines = None, matches = None):
- """
- """
- if not is_List(lines):
- lines = string.split(lines, "\n")
- if not is_List(matches):
- matches = string.split(matches, "\n")
- if len(lines) != len(matches):
- return
- for i in range(len(lines)):
- if lines[i] != matches[i]:
- return
- return 1
- def match_re(lines = None, res = None):
- """
- """
- if not is_List(lines):
- lines = string.split(lines, "\n")
- if not is_List(res):
- res = string.split(res, "\n")
- if len(lines) != len(res):
- return
- for i in range(len(lines)):
- s = "^" + res[i] + "$"
- try:
- expr = re.compile(s)
- except re.error, e:
- msg = "Regular expression error in %s: %s"
- raise re.error, msg % (repr(s), e[0])
- if not expr.search(lines[i]):
- return
- return 1
- def match_re_dotall(lines = None, res = None):
- """
- """
- if not type(lines) is type(""):
- lines = string.join(lines, "\n")
- if not type(res) is type(""):
- res = string.join(res, "\n")
- s = "^" + res + "$"
- try:
- expr = re.compile(s, re.DOTALL)
- except re.error, e:
- msg = "Regular expression error in %s: %s"
- raise re.error, msg % (repr(s), e[0])
- if expr.match(lines):
- return 1
- try:
- import difflib
- except ImportError:
- pass
- else:
- def simple_diff(a, b, fromfile='', tofile='',
- fromfiledate='', tofiledate='', n=3, lineterm='\n'):
- """
- A function with the same calling signature as difflib.context_diff
- (diff -c) and difflib.unified_diff (diff -u) but which prints
- output like the simple, unadorned 'diff" command.
- """
- sm = difflib.SequenceMatcher(None, a, b)
- def comma(x1, x2):
- return x1+1 == x2 and str(x2) or '%s,%s' % (x1+1, x2)
- result = []
- for op, a1, a2, b1, b2 in sm.get_opcodes():
- if op == 'delete':
- result.append("%sd%d" % (comma(a1, a2), b1))
- result.extend(map(lambda l: '< ' + l, a[a1:a2]))
- elif op == 'insert':
- result.append("%da%s" % (a1, comma(b1, b2)))
- result.extend(map(lambda l: '> ' + l, b[b1:b2]))
- elif op == 'replace':
- result.append("%sc%s" % (comma(a1, a2), comma(b1, b2)))
- result.extend(map(lambda l: '< ' + l, a[a1:a2]))
- result.append('---')
- result.extend(map(lambda l: '> ' + l, b[b1:b2]))
- return result
- def diff_re(a, b, fromfile='', tofile='',
- fromfiledate='', tofiledate='', n=3, lineterm='\n'):
- """
- A simple "diff" of two sets of lines when the expected lines
- are regular expressions. This is a really dumb thing that
- just compares each line in turn, so it doesn't look for
- chunks of matching lines and the like--but at least it lets
- you know exactly which line first didn't compare correctl...
- """
- result = []
- diff = len(a) - len(b)
- if diff < 0:
- a = a + ['']*(-diff)
- elif diff > 0:
- b = b + ['']*diff
- i = 0
- for aline, bline in zip(a, b):
- s = "^" + aline + "$"
- try:
- expr = re.compile(s)
- except re.error, e:
- msg = "Regular expression error in %s: %s"
- raise re.error, msg % (repr(s), e[0])
- if not expr.search(bline):
- result.append("%sc%s" % (i+1, i+1))
- result.append('< ' + repr(a[i]))
- result.append('---')
- result.append('> ' + repr(b[i]))
- i = i+1
- return result
- if os.name == 'java':
- python_executable = os.path.join(sys.prefix, 'jython')
- else:
- python_executable = sys.executable
- if sys.platform == 'win32':
- default_sleep_seconds = 2
- def where_is(file, path=None, pathext=None):
- if path is None:
- path = os.environ['PATH']
- if is_String(path):
- path = string.split(path, os.pathsep)
- if pathext is None:
- pathext = os.environ['PATHEXT']
- if is_String(pathext):
- pathext = string.split(pathext, os.pathsep)
- for ext in pathext:
- if string.lower(ext) == string.lower(file[-len(ext):]):
- pathext = ['']
- break
- for dir in path:
- f = os.path.join(dir, file)
- for ext in pathext:
- fext = f + ext
- if os.path.isfile(fext):
- return fext
- return None
- else:
- def where_is(file, path=None, pathext=None):
- if path is None:
- path = os.environ['PATH']
- if is_String(path):
- path = string.split(path, os.pathsep)
- for dir in path:
- f = os.path.join(dir, file)
- if os.path.isfile(f):
- try:
- st = os.stat(f)
- except OSError:
- continue
- if stat.S_IMODE(st[stat.ST_MODE]) & 0111:
- return f
- return None
- default_sleep_seconds = 1
- try:
- import subprocess
- except ImportError:
- # The subprocess module doesn't exist in this version of Python,
- # so we're going to cobble up something that looks just enough
- # like its API for our purposes below.
- import new
- subprocess = new.module('subprocess')
- subprocess.PIPE = 'PIPE'
- subprocess.STDOUT = 'STDOUT'
- subprocess.mswindows = (sys.platform == 'win32')
- try:
- import popen2
- popen2.Popen3
- except AttributeError:
- class Popen3:
- universal_newlines = 1
- def __init__(self, command, **kw):
- if sys.platform == 'win32' and command[0] == '"':
- command = '"' + command + '"'
- (stdin, stdout, stderr) = os.popen3(' ' + command)
- self.stdin = stdin
- self.stdout = stdout
- self.stderr = stderr
- def close_output(self):
- self.stdout.close()
- self.resultcode = self.stderr.close()
- def wait(self):
- resultcode = self.resultcode
- if os.WIFEXITED(resultcode):
- return os.WEXITSTATUS(resultcode)
- elif os.WIFSIGNALED(resultcode):
- return os.WTERMSIG(resultcode)
- else:
- return None
- else:
- try:
- popen2.Popen4
- except AttributeError:
- # A cribbed Popen4 class, with some retrofitted code from
- # the Python 1.5 Popen3 class methods to do certain things
- # by hand.
- class Popen4(popen2.Popen3):
- childerr = None
- def __init__(self, cmd, bufsize=-1):
- p2cread, p2cwrite = os.pipe()
- c2pread, c2pwrite = os.pipe()
- self.pid = os.fork()
- if self.pid == 0:
- # Child
- os.dup2(p2cread, 0)
- os.dup2(c2pwrite, 1)
- os.dup2(c2pwrite, 2)
- for i in range(3, popen2.MAXFD):
- try:
- os.close(i)
- except: pass
- try:
- os.execvp(cmd[0], cmd)
- finally:
- os._exit(1)
- # Shouldn't come here, I guess
- os._exit(1)
- os.close(p2cread)
- self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
- os.close(c2pwrite)
- self.fromchild = os.fdopen(c2pread, 'r', bufsize)
- popen2._active.append(self)
- popen2.Popen4 = Popen4
- class Popen3(popen2.Popen3, popen2.Popen4):
- universal_newlines = 1
- def __init__(self, command, **kw):
- if kw.get('stderr') == 'STDOUT':
- apply(popen2.Popen4.__init__, (self, command, 1))
- else:
- apply(popen2.Popen3.__init__, (self, command, 1))
- self.stdin = self.tochild
- self.stdout = self.fromchild
- self.stderr = self.childerr
- def wait(self, *args, **kw):
- resultcode = apply(popen2.Popen3.wait, (self,)+args, kw)
- if os.WIFEXITED(resultcode):
- return os.WEXITSTATUS(resultcode)
- elif os.WIFSIGNALED(resultcode):
- return os.WTERMSIG(resultcode)
- else:
- return None
- subprocess.Popen = Popen3
- # From Josiah Carlson,
- # ASPN : Python Cookbook : Module to allow Asynchronous subprocess use on Windows and Posix platforms
- # http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554
- PIPE = subprocess.PIPE
- if subprocess.mswindows:
- from win32file import ReadFile, WriteFile
- from win32pipe import PeekNamedPipe
- import msvcrt
- else:
- import select
- import fcntl
- try: fcntl.F_GETFL
- except AttributeError: fcntl.F_GETFL = 3
- try: fcntl.F_SETFL
- except AttributeError: fcntl.F_SETFL = 4
- class Popen(subprocess.Popen):
- def recv(self, maxsize=None):
- return self._recv('stdout', maxsize)
- def recv_err(self, maxsize=None):
- return self._recv('stderr', maxsize)
- def send_recv(self, input='', maxsize=None):
- return self.send(input), self.recv(maxsize), self.recv_err(maxsize)
- def get_conn_maxsize(self, which, maxsize):
- if maxsize is None:
- maxsize = 1024
- elif maxsize < 1:
- maxsize = 1
- return getattr(self, which), maxsize
- def _close(self, which):
- getattr(self, which).close()
- setattr(self, which, None)
- if subprocess.mswindows:
- def send(self, input):
- if not self.stdin:
- return None
- try:
- x = msvcrt.get_osfhandle(self.stdin.fileno())
- (errCode, written) = WriteFile(x, input)
- except ValueError:
- return self._close('stdin')
- except (subprocess.pywintypes.error, Exception), why:
- if why[0] in (109, errno.ESHUTDOWN):
- return self._close('stdin')
- raise
- return written
- def _recv(self, which, maxsize):
- conn, maxsize = self.get_conn_maxsize(which, maxsize)
- if conn is None:
- return None
- try:
- x = msvcrt.get_osfhandle(conn.fileno())
- (read, nAvail, nMessage) = PeekNamedPipe(x, 0)
- if maxsize < nAvail:
- nAvail = maxsize
- if nAvail > 0:
- (errCode, read) = ReadFile(x, nAvail, None)
- except ValueError:
- return self._close(which)
- except (subprocess.pywintypes.error, Exception), why:
- if why[0] in (109, errno.ESHUTDOWN):
- return self._close(which)
- raise
- #if self.universal_newlines:
- # read = self._translate_newlines(read)
- return read
- else:
- def send(self, input):
- if not self.stdin:
- return None
- if not select.select([], [self.stdin], [], 0)[1]:
- return 0
- try:
- written = os.write(self.stdin.fileno(), input)
- except OSError, why:
- if why[0] == errno.EPIPE: #broken pipe
- return self._close('stdin')
- raise
- return written
- def _recv(self, which, maxsize):
- conn, maxsize = self.get_conn_maxsize(which, maxsize)
- if conn is None:
- return None
- try:
- flags = fcntl.fcntl(conn, fcntl.F_GETFL)
- except TypeError:
- flags = None
- else:
- if not conn.closed:
- fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK)
- try:
- if not select.select([conn], [], [], 0)[0]:
- return ''
- r = conn.read(maxsize)
- if not r:
- return self._close(which)
- #if self.universal_newlines:
- # r = self._translate_newlines(r)
- return r
- finally:
- if not conn.closed and not flags is None:
- fcntl.fcntl(conn, fcntl.F_SETFL, flags)
- disconnect_message = "Other end disconnected!"
- def recv_some(p, t=.1, e=1, tr=5, stderr=0):
- if tr < 1:
- tr = 1
- x = time.time()+t
- y = []
- r = ''
- pr = p.recv
- if stderr:
- pr = p.recv_err
- while time.time() < x or r:
- r = pr()
- if r is None:
- if e:
- raise Exception(disconnect_message)
- else:
- break
- elif r:
- y.append(r)
- else:
- time.sleep(max((x-time.time())/tr, 0))
- return ''.join(y)
- # TODO(3.0: rewrite to use memoryview()
- def send_all(p, data):
- while len(data):
- sent = p.send(data)
- if sent is None:
- raise Exception(disconnect_message)
- data = buffer(data, sent)
- try:
- object
- except NameError:
- class object:
- pass
- class TestCmd(object):
- """Class TestCmd
- """
- def __init__(self, description = None,
- program = None,
- interpreter = None,
- workdir = None,
- subdir = None,
- verbose = None,
- match = None,
- diff = None,
- combine = 0,
- universal_newlines = 1):
- self._cwd = os.getcwd()
- self.description_set(description)
- self.program_set(program)
- self.interpreter_set(interpreter)
- if verbose is None:
- try:
- verbose = max( 0, int(os.environ.get('TESTCMD_VERBOSE', 0)) )
- except ValueError:
- verbose = 0
- self.verbose_set(verbose)
- self.combine = combine
- self.universal_newlines = universal_newlines
- if match is not None:
- self.match_function = match
- else:
- self.match_function = match_re
- if diff is not None:
- self.diff_function = diff
- else:
- try:
- difflib
- except NameError:
- pass
- else:
- self.diff_function = simple_diff
- #self.diff_function = difflib.context_diff
- #self.diff_function = difflib.unified_diff
- self._dirlist = []
- self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0}
- if os.environ.has_key('PRESERVE') and not os.environ['PRESERVE'] is '':
- self._preserve['pass_test'] = os.environ['PRESERVE']
- self._preserve['fail_test'] = os.environ['PRESERVE']
- self._preserve['no_result'] = os.environ['PRESERVE']
- else:
- try:
- self._preserve['pass_test'] = os.environ['PRESERVE_PASS']
- except KeyError:
- pass
- try:
- self._preserve['fail_test'] = os.environ['PRESERVE_FAIL']
- except KeyError:
- pass
- try:
- self._preserve['no_result'] = os.environ['PRESERVE_NO_RESULT']
- except KeyError:
- pass
- self._stdout = []
- self._stderr = []
- self.status = None
- self.condition = 'no_result'
- self.workdir_set(workdir)
- self.subdir(subdir)
- def __del__(self):
- self.cleanup()
- def __repr__(self):
- return "%x" % id(self)
- banner_char = '='
- banner_width = 80
- def banner(self, s, width=None):
- if width is None:
- width = self.banner_width
- return s + self.banner_char * (width - len(s))
- if os.name == 'posix':
- def escape(self, arg):
- "escape shell special characters"
- slash = '\\'
- special = '"$'
- arg = string.replace(arg, slash, slash+slash)
- for c in special:
- arg = string.replace(arg, c, slash+c)
- if re_space.search(arg):
- arg = '"' + arg + '"'
- return arg
- else:
- # Windows does not allow special characters in file names
- # anyway, so no need for an escape function, we will just quote
- # the arg.
- def escape(self, arg):
- if re_space.search(arg):
- arg = '"' + arg + '"'
- return arg
- def canonicalize(self, path):
- if is_List(path):
- path = apply(os.path.join, tuple(path))
- if not os.path.isabs(path):
- path = os.path.join(self.workdir, path)
- return path
- def chmod(self, path, mode):
- """Changes permissions on the specified file or directory
- path name."""
- path = self.canonicalize(path)
- os.chmod(path, mode)
- def cleanup(self, condition = None):
- """Removes any temporary working directories for the specified
- TestCmd environment. If the environment variable PRESERVE was
- set when the TestCmd environment was created, temporary working
- directories are not removed. If any of the environment variables
- PRESERVE_PASS, PRESERVE_FAIL, or PRESERVE_NO_RESULT were set
- when the TestCmd environment was created, then temporary working
- directories are not removed if the test passed, failed, or had
- no result, respectively. Temporary working directories are also
- preserved for conditions specified via the preserve method.
- Typically, this method is not called directly, but is used when
- the script exits to clean up temporary working directories as
- appropriate for the exit status.
- """
- if not self._dirlist:
- return
- os.chdir(self._cwd)
- self.workdir = None
- if condition is None:
- condition = self.condition
- if self._preserve[condition]:
- for dir in self._dirlist:
- print "Preserved directory", dir
- else:
- list = self._dirlist[:]
- list.reverse()
- for dir in list:
- self.writable(dir, 1)
- shutil.rmtree(dir, ignore_errors = 1)
- self._dirlist = []
- try:
- global _Cleanup
- _Cleanup.remove(self)
- except (AttributeError, ValueError):
- pass
- def command_args(self, program = None,
- interpreter = None,
- arguments = None):
- if program:
- if type(program) == type('') and not os.path.isabs(program):
- program = os.path.join(self._cwd, program)
- else:
- program = self.program
- if not interpreter:
- interpreter = self.interpreter
- if not type(program) in [type([]), type(())]:
- program = [program]
- cmd = list(program)
- if interpreter:
- if not type(interpreter) in [type([]), type(())]:
- interpreter = [interpreter]
- cmd = list(interpreter) + cmd
- if arguments:
- if type(arguments) == type(''):
- arguments = string.split(arguments)
- cmd.extend(arguments)
- return cmd
- def description_set(self, description):
- """Set the description of the functionality being tested.
- """
- self.description = description
- try:
- difflib
- except NameError:
- def diff(self, a, b, name, *args, **kw):
- print self.banner('Expected %s' % name)
- print a
- print self.banner('Actual %s' % name)
- print b
- else:
- def diff(self, a, b, name, *args, **kw):
- print self.banner(name)
- args = (a.splitlines(), b.splitlines()) + args
- lines = apply(self.diff_function, args, kw)
- for l in lines:
- print l
- def fail_test(self, condition = 1, function = None, skip = 0):
- """Cause the test to fail.
- """
- if not condition:
- return
- self.condition = 'fail_test'
- fail_test(self = self,
- condition = condition,
- function = function,
- skip = skip)
- def interpreter_set(self, interpreter):
- """Set the program to be used to interpret the program
- under test as a script.
- """
- self.interpreter = interpreter
- def match(self, lines, matches):
- """Compare actual and expected file contents.
- """
- return self.match_function(lines, matches)
- def match_exact(self, lines, matches):
- """Compare actual and expected file contents.
- """
- return match_exact(lines, matches)
- def match_re(self, lines, res):
- """Compare actual and expected file contents.
- """
- return match_re(lines, res)
- def match_re_dotall(self, lines, res):
- """Compare actual and expected file contents.
- """
- return match_re_dotall(lines, res)
- def no_result(self, condition = 1, function = None, skip = 0):
- """Report that the test could not be run.
- """
- if not condition:
- return
- self.condition = 'no_result'
- no_result(self = self,
- condition = condition,
- function = function,
- skip = skip)
- def pass_test(self, condition = 1, function = None):
- """Cause the test to pass.
- """
- if not condition:
- return
- self.condition = 'pass_test'
- pass_test(self = self, condition = condition, function = function)
- def preserve(self, *conditions):
- """Arrange for the temporary working directories for the
- specified TestCmd environment to be preserved for one or more
- conditions. If no conditions are specified, arranges for
- the temporary working directories to be preserved for all
- conditions.
- """
- if conditions is ():
- conditions = ('pass_test', 'fail_test', 'no_result')
- for cond in conditions:
- self._preserve[cond] = 1
- def program_set(self, program):
- """Set the executable program or script to be tested.
- """
- if program and not os.path.isabs(program):
- program = os.path.join(self._cwd, program)
- self.program = program
- def read(self, file, mode = 'rb'):
- """Reads and returns the contents of the specified file name.
- The file name may be a list, in which case the elements are
- concatenated with the os.path.join() method. The file is
- assumed to be under the temporary working directory unless it
- is an absolute path name. The I/O mode for the file may
- be specified; it must begin with an 'r'. The default is
- 'rb' (binary read).
- """
- file = self.canonicalize(file)
- if mode[0] != 'r':
- raise ValueError, "mode must begin with 'r'"
- with open(file, mode) as f:
- result = f.read()
- return result
- def rmdir(self, dir):
- """Removes the specified dir name.
- The dir name may be a list, in which case the elements are
- concatenated with the os.path.join() method. The dir is
- assumed to be under the temporary working directory unless it
- is an absolute path name.
- The dir must be empty.
- """
- dir = self.canonicalize(dir)
- os.rmdir(dir)
- def start(self, program = None,
- interpreter = None,
- arguments = None,
- universal_newlines = None,
- **kw):
- """
- Starts a program or script for the test environment.
- The specified program will have the original directory
- prepended unless it is enclosed in a [list].
- """
- cmd = self.command_args(program, interpreter, arguments)
- cmd_string = string.join(map(self.escape, cmd), ' ')
- if self.verbose:
- sys.stderr.write(cmd_string + "\n")
- if universal_newlines is None:
- universal_newlines = self.universal_newlines
- # On Windows, if we make stdin a pipe when we plan to send
- # no input, and the test program exits before
- # Popen calls msvcrt.open_osfhandle, that call will fail.
- # So don't use a pipe for stdin if we don't need one.
- stdin = kw.get('stdin', None)
- if stdin is not None:
- stdin = subprocess.PIPE
- combine = kw.get('combine', self.combine)
- if combine:
- stderr_value = subprocess.STDOUT
- else:
- stderr_value = subprocess.PIPE
- return Popen(cmd,
- stdin=stdin,
- stdout=subprocess.PIPE,
- stderr=stderr_value,
- universal_newlines=universal_newlines)
- def finish(self, popen, **kw):
- """
- Finishes and waits for the process being run under control of
- the specified popen argument, recording the exit status,
- standard output and error output.
- """
- popen.stdin.close()
- self.status = popen.wait()
- if not self.status:
- self.status = 0
- self._stdout.append(popen.stdout.read())
- if popen.stderr:
- stderr = popen.stderr.read()
- else:
- stderr = ''
- self._stderr.append(stderr)
- def run(self, program = None,
- interpreter = None,
- arguments = None,
- chdir = None,
- stdin = None,
- universal_newlines = None):
- """Runs a test of the program or script for the test
- environment. Standard output and error output are saved for
- future retrieval via the stdout() and stderr() methods.
- The specified program will have the original directory
- prepended unless it is enclosed in a [list].
- """
- if chdir:
- oldcwd = os.getcwd()
- if not os.path.isabs(chdir):
- chdir = os.path.join(self.workpath(chdir))
- if self.verbose:
- sys.stderr.write("chdir(" + chdir + ")\n")
- os.chdir(chdir)
- p = self.start(program,
- interpreter,
- arguments,
- universal_newlines,
- stdin=stdin)
- if stdin:
- if is_List(stdin):
- for line in stdin:
- p.stdin.write(line)
- else:
- p.stdin.write(stdin)
- p.stdin.close()
- out = p.stdout.read()
- if p.stderr is None:
- err = ''
- else:
- err = p.stderr.read()
- try:
- close_output = p.close_output
- except AttributeError:
- p.stdout.close()
- if not p.stderr is None:
- p.stderr.close()
- else:
- close_output()
- self._stdout.append(out)
- self._stderr.append(err)
- self.status = p.wait()
- if not self.status:
- self.status = 0
- if chdir:
- os.chdir(oldcwd)
- if self.verbose >= 2:
- write = sys.stdout.write
- write('============ STATUS: %d\n' % self.status)
- out = self.stdout()
- if out or self.verbose >= 3:
- write('============ BEGIN STDOUT (len=%d):\n' % len(out))
- write(out)
- write('============ END STDOUT\n')
- err = self.stderr()
- if err or self.verbose >= 3:
- write('============ BEGIN STDERR (len=%d)\n' % len(err))
- write(err)
- write('============ END STDERR\n')
- def sleep(self, seconds = default_sleep_seconds):
- """Sleeps at least the specified number of seconds. If no
- number is specified, sleeps at least the minimum number of
- seconds necessary to advance file time stamps on the current
- system. Sleeping more seconds is all right.
- """
- time.sleep(seconds)
- def stderr(self, run = None):
- """Returns the error output from the specified run number.
- If there is no specified run number, then returns the error
- output of the last run. If the run number is less than zero,
- then returns the error output from that many runs back from the
- current run.
- """
- if not run:
- run = len(self._stderr)
- elif run < 0:
- run = len(self._stderr) + run
- run = run - 1
- return self._stderr[run]
- def stdout(self, run = None):
- """Returns the standard output from the specified run number.
- If there is no specified run number, then returns the standard
- output of the last run. If the run number is less than zero,
- then returns the standard output from that many runs back from
- the current run.
- """
- if not run:
- run = len(self._stdout)
- elif run < 0:
- run = len(self._stdout) + run
- run = run - 1
- return self._stdout[run]
- def subdir(self, *subdirs):
- """Create new subdirectories under the temporary working
- directory, one for each argument. An argument may be a list,
- in which case the list elements are concatenated using the
- os.path.join() method. Subdirectories multiple levels deep
- must be created using a separate argument for each level:
- test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory'])
- Returns the number of subdirectories actually created.
- """
- count = 0
- for sub in subdirs:
- if sub is None:
- continue
- if is_List(sub):
- sub = apply(os.path.join, tuple(sub))
- new = os.path.join(self.workdir, sub)
- try:
- os.mkdir(new)
- except OSError:
- pass
- else:
- count = count + 1
- return count
- def symlink(self, target, link):
- """Creates a symlink to the specified target.
- The link name may be a list, in which case the elements are
- concatenated with the os.path.join() method. The link is
- assumed to be under the temporary working directory unless it
- is an absolute path name. The target is *not* assumed to be
- under the temporary working directory.
- """
- link = self.canonicalize(link)
- os.symlink(target, link)
- def tempdir(self, path=None):
- """Creates a temporary directory.
- A unique directory name is generated if no path name is specified.
- The directory is created, and will be removed when the TestCmd
- object is destroyed.
- """
- if path is None:
- try:
- path = tempfile.mktemp(prefix=tempfile.template)
- except TypeError:
- path = tempfile.mktemp()
- os.mkdir(path)
- # Symlinks in the path will report things
- # differently from os.getcwd(), so chdir there
- # and back to fetch the canonical path.
- cwd = os.getcwd()
- try:
- os.chdir(path)
- path = os.getcwd()
- finally:
- os.chdir(cwd)
- # Uppercase the drive letter since the case of drive
- # letters is pretty much random on win32:
- drive,rest = os.path.splitdrive(path)
- if drive:
- path = string.upper(drive) + rest
- #
- self._dirlist.append(path)
- global _Cleanup
- try:
- _Cleanup.index(self)
- except ValueError:
- _Cleanup.append(self)
- return path
- def touch(self, path, mtime=None):
- """Updates the modification time on the specified file or
- directory path name. The default is to update to the
- current time if no explicit modification time is specified.
- """
- path = self.canonicalize(path)
- atime = os.path.getatime(path)
- if mtime is None:
- mtime = time.time()
- os.utime(path, (atime, mtime))
- def unlink(self, file):
- """Unlinks the specified file name.
- The file name may be a list, in which case the elements are
- concatenated with the os.path.join() method. The file is
- assumed to be under the temporary working directory unless it
- is an absolute path name.
- """
- file = self.canonicalize(file)
- os.unlink(file)
- def verbose_set(self, verbose):
- """Set the verbose level.
- """
- self.verbose = verbose
- def where_is(self, file, path=None, pathext=None):
- """Find an executable file.
- """
- if is_List(file):
- file = apply(os.path.join, tuple(file))
- if not os.path.isabs(file):
- file = where_is(file, path, pathext)
- return file
- def workdir_set(self, path):
- """Creates a temporary working directory with the specified
- path name. If the path is a null string (''), a unique
- directory name is created.
- """
- if (path != None):
- if path == '':
- path = None
- path = self.tempdir(path)
- self.workdir = path
- def workpath(self, *args):
- """Returns the absolute path name to a subdirectory or file
- within the current temporary working directory. Concatenates
- the temporary working directory name with the specified
- arguments using the os.path.join() method.
- """
- return apply(os.path.join, (self.workdir,) + tuple(args))
- def readable(self, top, read=1):
- """Make the specified directory tree readable (read == 1)
- or not (read == None).
- This method has no effect on Windows systems, which use a
- completely different mechanism to control file readability.
- """
- if sys.platform == 'win32':
- return
- if read:
- def do_chmod(fname):
- try: st = os.stat(fname)
- except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IREAD))
- else:
- def do_chmod(fname):
- try: st = os.stat(fname)
- except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IREAD))
- if os.path.isfile(top):
- # If it's a file, that's easy, just chmod it.
- do_chmod(top)
- elif read:
- # It's a directory and we're trying to turn on read
- # permission, so it's also pretty easy, just chmod the
- # directory and then chmod every entry on our walk down the
- # tree. Because os.path.walk() is top-down, we'll enable
- # read permission on any directories that have it disabled
- # before os.path.walk() tries to list their contents.
- do_chmod(top)
- def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
- for n in names:
- do_chmod(os.path.join(dirname, n))
- os.path.walk(top, chmod_entries, None)
- else:
- # It's a directory and we're trying to turn off read
- # permission, which means we have to chmod the directoreis
- # in the tree bottom-up, lest disabling read permission from
- # the top down get in the way of being able to get at lower
- # parts of the tree. But os.path.walk() visits things top
- # down, so we just use an object to collect a list of all
- # of the entries in the tree, reverse the list, and then
- # chmod the reversed (bottom-up) list.
- col = Collector(top)
- os.path.walk(top, col, None)
- col.entries.reverse()
- for d in col.entries: do_chmod(d)
- def writable(self, top, write=1):
- """Make the specified directory tree writable (write == 1)
- or not (write == None).
- """
- if sys.platform == 'win32':
- if write:
- def do_chmod(fname):
- try: os.chmod(fname, stat.S_IWRITE)
- except OSError: pass
- else:
- def do_chmod(fname):
- try: os.chmod(fname, stat.S_IREAD)
- except OSError: pass
- else:
- if write:
- def do_chmod(fname):
- try: st = os.stat(fname)
- except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|0200))
- else:
- def do_chmod(fname):
- try: st = os.stat(fname)
- except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~0200))
- if os.path.isfile(top):
- do_chmod(top)
- else:
- col = Collector(top)
- os.path.walk(top, col, None)
- for d in col.entries: do_chmod(d)
- def executable(self, top, execute=1):
- """Make the specified directory tree executable (execute == 1)
- or not (execute == None).
- This method has no effect on Windows systems, which use a
- completely different mechanism to control file executability.
- """
- if sys.platform == 'win32':
- return
- if execute:
- def do_chmod(fname):
- try: st = os.stat(fname)
- except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]|stat.S_IEXEC))
- else:
- def do_chmod(fname):
- try: st = os.stat(fname)
- except OSError: pass
- else: os.chmod(fname, stat.S_IMODE(st[stat.ST_MODE]&~stat.S_IEXEC))
- if os.path.isfile(top):
- # If it's a file, that's easy, just chmod it.
- do_chmod(top)
- elif execute:
- # It's a directory and we're trying to turn on execute
- # permission, so it's also pretty easy, just chmod the
- # directory and then chmod every entry on our walk down the
- # tree. Because os.path.walk() is top-down, we'll enable
- # execute permission on any directories that have it disabled
- # before os.path.walk() tries to list their contents.
- do_chmod(top)
- def chmod_entries(arg, dirname, names, do_chmod=do_chmod):
- for n in names:
- do_chmod(os.path.join(dirname, n))
- os.path.walk(top, chmod_entries, None)
- else:
- # It's a directory and we're trying to turn off execute
- # permission, which means we have to chmod the directories
- # in the tree bottom-up, lest disabling execute permission from
- # the top down get in the way of being able to get at lower
- # parts of the tree. But os.path.walk() visits things top
- # down, so we just use an object to collect a list of all
- # of the entries in the tree, reverse the list, and then
- # chmod the reversed (bottom-up) list.
- col = Collector(top)
- os.path.walk(top, col, None)
- col.entries.reverse()
- for d in col.entries: do_chmod(d)
- def write(self, file, content, mode = 'wb'):
- """Writes the specified content text (second argument) to the
- specified file name (first argument). The file name may be
- a list, in which case the elements are concatenated with the
- os.path.join() method. The file is created under the temporary
- working directory. Any subdirectories in the path must already
- exist. The I/O mode for the file may be specified; it must
- begin with a 'w'. The default is 'wb' (binary write).
- """
- file = self.canonicalize(file)
- if mode[0] != 'w':
- raise ValueError, "mode must begin with 'w'"
- with open(file, mode) as f:
- f.write(content)
- # Local Variables:
- # tab-width:4
- # indent-tabs-mode:nil
- # End:
- # vim: set expandtab tabstop=4 shiftwidth=4:
|