summaryrefslogtreecommitdiff
path: root/tools/build/test/TestCmd.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/build/test/TestCmd.py')
-rw-r--r--tools/build/test/TestCmd.py589
1 files changed, 589 insertions, 0 deletions
diff --git a/tools/build/test/TestCmd.py b/tools/build/test/TestCmd.py
new file mode 100644
index 000000000..5993df7ff
--- /dev/null
+++ b/tools/build/test/TestCmd.py
@@ -0,0 +1,589 @@
+"""
+TestCmd.py: a testing framework for commands and scripts.
+
+The TestCmd module provides a framework for portable automated testing of
+executable commands and scripts (in any language, not just Python), especially
+commands and scripts that require file system interaction.
+
+In addition to running tests and evaluating conditions, the TestCmd module
+manages and cleans up one or more temporary workspace directories, and provides
+methods for creating files and directories in those workspace directories from
+in-line data, here-documents), allowing tests to be completely self-contained.
+
+A TestCmd environment object is created via the usual invocation:
+
+ test = TestCmd()
+
+The TestCmd module provides pass_test(), fail_test(), and no_result() unbound
+methods that report test results for use with the Aegis change management
+system. These methods terminate the test immediately, reporting PASSED, FAILED
+or NO RESULT respectively and exiting with status 0 (success), 1 or 2
+respectively. This allows for a distinction between an actual failed test and a
+test that could not be properly evaluated because of an external condition (such
+as a full file system or incorrect permissions).
+
+"""
+
+# Copyright 2000 Steven Knight
+# This module is free software, and you may redistribute it and/or modify
+# it under the same terms as Python itself, so long as this copyright message
+# and disclaimer are retained in their original form.
+#
+# IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
+# SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
+# THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
+# DAMAGE.
+#
+# THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+# PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
+# AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
+# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
+
+# Copyright 2002-2003 Vladimir Prus.
+# Copyright 2002-2003 Dave Abrahams.
+# Copyright 2006 Rene Rivera.
+# Distributed under the Boost Software License, Version 1.0.
+# (See accompanying file LICENSE_1_0.txt or copy at
+# http://www.boost.org/LICENSE_1_0.txt)
+
+
+from string import join, split
+
+__author__ = "Steven Knight <knight@baldmt.com>"
+__revision__ = "TestCmd.py 0.D002 2001/08/31 14:56:12 software"
+__version__ = "0.02"
+
+from types import *
+
+import os
+import os.path
+import re
+import shutil
+import stat
+import subprocess
+import sys
+import tempfile
+import traceback
+
+
+tempfile.template = 'testcmd.'
+
+_Cleanup = []
+
+def _clean():
+ global _Cleanup
+ list = _Cleanup[:]
+ _Cleanup = []
+ list.reverse()
+ for test in list:
+ test.cleanup()
+
+sys.exitfunc = _clean
+
+
+def caller(tblist, skip):
+ string = ""
+ arr = []
+ for file, line, name, text in tblist:
+ if file[-10:] == "TestCmd.py":
+ break
+ arr = [(file, line, name, text)] + arr
+ atfrom = "at"
+ for file, line, name, text in arr[skip:]:
+ if name == "?":
+ name = ""
+ else:
+ name = " (" + name + ")"
+ string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name))
+ atfrom = "\tfrom"
+ return string
+
+
+def fail_test(self=None, condition=True, function=None, skip=0):
+ """Cause the test to fail.
+
+ By default, the fail_test() method reports that the test FAILED and exits
+ with a status of 1. If a condition argument is supplied, the test fails
+ only if the condition is true.
+
+ """
+ if not condition:
+ return
+ if not function is None:
+ function()
+ of = ""
+ desc = ""
+ sep = " "
+ if not self is None:
+ if self.program:
+ of = " of " + join(self.program, " ")
+ sep = "\n\t"
+ if self.description:
+ desc = " [" + self.description + "]"
+ sep = "\n\t"
+
+ at = caller(traceback.extract_stack(), skip)
+
+ sys.stderr.write("FAILED test" + of + desc + sep + at + """
+in directory: """ + os.getcwd() )
+ sys.exit(1)
+
+
+def no_result(self=None, condition=True, function=None, skip=0):
+ """Causes a test to exit with no valid result.
+
+ By default, the no_result() method reports NO RESULT for the test and
+ exits with a status of 2. If a condition argument is supplied, the test
+ fails only if the condition is true.
+
+ """
+ if not condition:
+ return
+ if not function is None:
+ function()
+ of = ""
+ desc = ""
+ sep = " "
+ if not self is None:
+ if self.program:
+ of = " of " + self.program
+ sep = "\n\t"
+ if self.description:
+ desc = " [" + self.description + "]"
+ sep = "\n\t"
+
+ at = caller(traceback.extract_stack(), skip)
+ sys.stderr.write("NO RESULT for test" + of + desc + sep + at)
+ sys.exit(2)
+
+
+def pass_test(self=None, condition=True, function=None):
+ """Causes a test to pass.
+
+ By default, the pass_test() method reports PASSED for the test and exits
+ with a status of 0. If a condition argument is supplied, the test passes
+ only if the condition is true.
+
+ """
+ if not condition:
+ return
+ if not function is None:
+ function()
+ sys.stderr.write("PASSED\n")
+ sys.exit(0)
+
+
+def match_exact(lines=None, matches=None):
+ """
+ Returns whether the given lists or strings containing lines separated
+ using newline characters contain exactly the same data.
+
+ """
+ if not type(lines) is ListType:
+ lines = split(lines, "\n")
+ if not type(matches) is ListType:
+ matches = split(matches, "\n")
+ if len(lines) != len(matches):
+ return
+ for i in range(len(lines)):
+ if lines[i] != matches[i]:
+ return
+ return 1
+
+
+def match_re(lines=None, res=None):
+ """
+ Given lists or strings contain lines separated using newline characters.
+ This function matches those lines one by one, interpreting the lines in the
+ res parameter as regular expressions.
+
+ """
+ if not type(lines) is ListType:
+ lines = split(lines, "\n")
+ if not type(res) is ListType:
+ res = split(res, "\n")
+ if len(lines) != len(res):
+ return
+ for i in range(len(lines)):
+ if not re.compile("^" + res[i] + "$").search(lines[i]):
+ return
+ return 1
+
+
+class TestCmd:
+ def __init__(self, description=None, program=None, workdir=None,
+ subdir=None, verbose=False, match=None, inpath=None):
+
+ self._cwd = os.getcwd()
+ self.description_set(description)
+ self.program_set(program, inpath)
+ self.verbose_set(verbose)
+ if match is None:
+ self.match_func = match_re
+ else:
+ self.match_func = match
+ self._dirlist = []
+ self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0}
+ env = os.environ.get('PRESERVE')
+ if env:
+ self._preserve['pass_test'] = env
+ self._preserve['fail_test'] = env
+ self._preserve['no_result'] = env
+ else:
+ env = os.environ.get('PRESERVE_PASS')
+ if env is not None:
+ self._preserve['pass_test'] = env
+ env = os.environ.get('PRESERVE_FAIL')
+ if env is not None:
+ self._preserve['fail_test'] = env
+ env = os.environ.get('PRESERVE_PASS')
+ if env is not None:
+ self._preserve['PRESERVE_NO_RESULT'] = env
+ self._stdout = []
+ self._stderr = []
+ self.status = None
+ self.condition = 'no_result'
+ self.workdir_set(workdir)
+ self.subdir(subdir)
+
+ def __del__(self):
+ self.cleanup()
+
+ def __repr__(self):
+ return "%x" % id(self)
+
+ def cleanup(self, condition=None):
+ """
+ Removes any temporary working directories for the specified TestCmd
+ environment. If the environment variable PRESERVE was set when the
+ TestCmd environment was created, temporary working directories are not
+ removed. If any of the environment variables PRESERVE_PASS,
+ PRESERVE_FAIL or PRESERVE_NO_RESULT were set when the TestCmd
+ environment was created, then temporary working directories are not
+ removed if the test passed, failed or had no result, respectively.
+ Temporary working directories are also preserved for conditions
+ specified via the preserve method.
+
+ Typically, this method is not called directly, but is used when the
+ script exits to clean up temporary working directories as appropriate
+ for the exit status.
+
+ """
+ if not self._dirlist:
+ return
+ if condition is None:
+ condition = self.condition
+ if self._preserve[condition]:
+ for dir in self._dirlist:
+ print("Preserved directory %s" % dir)
+ else:
+ list = self._dirlist[:]
+ list.reverse()
+ for dir in list:
+ self.writable(dir, 1)
+ shutil.rmtree(dir, ignore_errors=1)
+
+ self._dirlist = []
+ self.workdir = None
+ os.chdir(self._cwd)
+ try:
+ global _Cleanup
+ _Cleanup.remove(self)
+ except (AttributeError, ValueError):
+ pass
+
+ def description_set(self, description):
+ """Set the description of the functionality being tested."""
+ self.description = description
+
+ def fail_test(self, condition=True, function=None, skip=0):
+ """Cause the test to fail."""
+ if not condition:
+ return
+ self.condition = 'fail_test'
+ fail_test(self = self,
+ condition = condition,
+ function = function,
+ skip = skip)
+
+ def match(self, lines, matches):
+ """Compare actual and expected file contents."""
+ return self.match_func(lines, matches)
+
+ def match_exact(self, lines, matches):
+ """Compare actual and expected file content exactly."""
+ return match_exact(lines, matches)
+
+ def match_re(self, lines, res):
+ """Compare file content with a regular expression."""
+ return match_re(lines, res)
+
+ def no_result(self, condition=True, function=None, skip=0):
+ """Report that the test could not be run."""
+ if not condition:
+ return
+ self.condition = 'no_result'
+ no_result(self = self,
+ condition = condition,
+ function = function,
+ skip = skip)
+
+ def pass_test(self, condition=True, function=None):
+ """Cause the test to pass."""
+ if not condition:
+ return
+ self.condition = 'pass_test'
+ pass_test(self, condition, function)
+
+ def preserve(self, *conditions):
+ """
+ Arrange for the temporary working directories for the specified
+ TestCmd environment to be preserved for one or more conditions. If no
+ conditions are specified, arranges for the temporary working
+ directories to be preserved for all conditions.
+
+ """
+ if conditions is ():
+ conditions = ('pass_test', 'fail_test', 'no_result')
+ for cond in conditions:
+ self._preserve[cond] = 1
+
+ def program_set(self, program, inpath):
+ """Set the executable program or script to be tested."""
+ if not inpath and program and not os.path.isabs(program[0]):
+ program[0] = os.path.join(self._cwd, program[0])
+ self.program = program
+
+ def read(self, file, mode='rb'):
+ """
+ Reads and returns the contents of the specified file name. The file
+ name may be a list, in which case the elements are concatenated with
+ the os.path.join() method. The file is assumed to be under the
+ temporary working directory unless it is an absolute path name. The I/O
+ mode for the file may be specified and must begin with an 'r'. The
+ default is 'rb' (binary read).
+
+ """
+ if type(file) is ListType:
+ file = apply(os.path.join, tuple(file))
+ if not os.path.isabs(file):
+ file = os.path.join(self.workdir, file)
+ if mode[0] != 'r':
+ raise ValueError, "mode must begin with 'r'"
+ return open(file, mode).read()
+
+ def run(self, program=None, arguments=None, chdir=None, stdin=None,
+ universal_newlines=True):
+ """
+ Runs a test of the program or script for the test environment.
+ Standard output and error output are saved for future retrieval via the
+ stdout() and stderr() methods.
+
+ 'universal_newlines' parameter controls how the child process
+ input/output streams are opened as defined for the same named Python
+ subprocess.POpen constructor parameter.
+
+ """
+ if chdir:
+ if not os.path.isabs(chdir):
+ chdir = os.path.join(self.workpath(chdir))
+ if self.verbose:
+ sys.stderr.write("chdir(" + chdir + ")\n")
+ else:
+ chdir = self.workdir
+
+ cmd = []
+ if program and program[0]:
+ if program[0] != self.program[0] and not os.path.isabs(program[0]):
+ program[0] = os.path.join(self._cwd, program[0])
+ cmd += program
+ else:
+ cmd += self.program
+ if arguments:
+ cmd += arguments.split(" ")
+ if self.verbose:
+ sys.stderr.write(join(cmd, " ") + "\n")
+ p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=chdir,
+ universal_newlines=universal_newlines)
+
+ if stdin:
+ if type(stdin) is ListType:
+ for line in stdin:
+ p.tochild.write(line)
+ else:
+ p.tochild.write(stdin)
+ out, err = p.communicate()
+ self._stdout.append(out)
+ self._stderr.append(err)
+ self.status = p.returncode
+
+ if self.verbose:
+ sys.stdout.write(self._stdout[-1])
+ sys.stderr.write(self._stderr[-1])
+
+ def stderr(self, run=None):
+ """
+ Returns the error output from the specified run number. If there is
+ no specified run number, then returns the error output of the last run.
+ If the run number is less than zero, then returns the error output from
+ that many runs back from the current run.
+
+ """
+ if not run:
+ run = len(self._stderr)
+ elif run < 0:
+ run = len(self._stderr) + run
+ run -= 1
+ if run < 0:
+ return ''
+ return self._stderr[run]
+
+ def stdout(self, run=None):
+ """
+ Returns the standard output from the specified run number. If there
+ is no specified run number, then returns the standard output of the
+ last run. If the run number is less than zero, then returns the
+ standard output from that many runs back from the current run.
+
+ """
+ if not run:
+ run = len(self._stdout)
+ elif run < 0:
+ run = len(self._stdout) + run
+ run -= 1
+ if run < 0:
+ return ''
+ return self._stdout[run]
+
+ def subdir(self, *subdirs):
+ """
+ Create new subdirectories under the temporary working directory, one
+ for each argument. An argument may be a list, in which case the list
+ elements are concatenated using the os.path.join() method.
+ Subdirectories multiple levels deep must be created using a separate
+ argument for each level:
+
+ test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory'])
+
+ Returns the number of subdirectories actually created.
+
+ """
+ count = 0
+ for sub in subdirs:
+ if sub is None:
+ continue
+ if type(sub) is ListType:
+ sub = apply(os.path.join, tuple(sub))
+ new = os.path.join(self.workdir, sub)
+ try:
+ os.mkdir(new)
+ except:
+ pass
+ else:
+ count += 1
+ return count
+
+ def unlink(self, file):
+ """
+ Unlinks the specified file name. The file name may be a list, in
+ which case the elements are concatenated using the os.path.join()
+ method. The file is assumed to be under the temporary working directory
+ unless it is an absolute path name.
+
+ """
+ if type(file) is ListType:
+ file = apply(os.path.join, tuple(file))
+ if not os.path.isabs(file):
+ file = os.path.join(self.workdir, file)
+ os.unlink(file)
+
+ def verbose_set(self, verbose):
+ """Set the verbose level."""
+ self.verbose = verbose
+
+ def workdir_set(self, path):
+ """
+ Creates a temporary working directory with the specified path name.
+ If the path is a null string (''), a unique directory name is created.
+
+ """
+ if os.path.isabs(path):
+ self.workdir = path
+ else:
+ if path != None:
+ if path == '':
+ path = tempfile.mktemp()
+ if path != None:
+ os.mkdir(path)
+ self._dirlist.append(path)
+ global _Cleanup
+ try:
+ _Cleanup.index(self)
+ except ValueError:
+ _Cleanup.append(self)
+ # We would like to set self.workdir like this:
+ # self.workdir = path
+ # But symlinks in the path will report things differently from
+ # os.getcwd(), so chdir there and back to fetch the canonical
+ # path.
+ cwd = os.getcwd()
+ os.chdir(path)
+ self.workdir = os.getcwd()
+ os.chdir(cwd)
+ else:
+ self.workdir = None
+
+ def workpath(self, *args):
+ """
+ Returns the absolute path name to a subdirectory or file within the
+ current temporary working directory. Concatenates the temporary working
+ directory name with the specified arguments using os.path.join().
+
+ """
+ return apply(os.path.join, (self.workdir,) + tuple(args))
+
+ def writable(self, top, write):
+ """
+ Make the specified directory tree writable (write == 1) or not
+ (write == None).
+
+ """
+ def _walk_chmod(arg, dirname, names):
+ st = os.stat(dirname)
+ os.chmod(dirname, arg(st[stat.ST_MODE]))
+ for name in names:
+ fullname = os.path.join(dirname, name)
+ st = os.stat(fullname)
+ os.chmod(fullname, arg(st[stat.ST_MODE]))
+
+ _mode_writable = lambda mode: stat.S_IMODE(mode|0200)
+ _mode_non_writable = lambda mode: stat.S_IMODE(mode&~0200)
+
+ if write:
+ f = _mode_writable
+ else:
+ f = _mode_non_writable
+ try:
+ os.path.walk(top, _walk_chmod, f)
+ except:
+ pass # Ignore any problems changing modes.
+
+ def write(self, file, content, mode='wb'):
+ """
+ Writes the specified content text (second argument) to the specified
+ file name (first argument). The file name may be a list, in which case
+ the elements are concatenated using the os.path.join() method. The file
+ is created under the temporary working directory. Any subdirectories in
+ the path must already exist. The I/O mode for the file may be specified
+ and must begin with a 'w'. The default is 'wb' (binary write).
+
+ """
+ if type(file) is ListType:
+ file = apply(os.path.join, tuple(file))
+ if not os.path.isabs(file):
+ file = os.path.join(self.workdir, file)
+ if mode[0] != 'w':
+ raise ValueError, "mode must begin with 'w'"
+ open(file, mode).write(content)