summaryrefslogtreecommitdiff
path: root/testlib.py
diff options
context:
space:
mode:
authorAdrien Di Mascio <Adrien.DiMascio@logilab.fr>2006-08-11 10:38:42 +0200
committerAdrien Di Mascio <Adrien.DiMascio@logilab.fr>2006-08-11 10:38:42 +0200
commit6fb543b4225ca8d51a00e5a3a801f4f700f59e57 (patch)
treef648c39134d4a819ca71b0c7e201d27058f0f8db /testlib.py
parent79ccce632009824128bcc83df825e0bb0f5d0dae (diff)
downloadlogilab-common-6fb543b4225ca8d51a00e5a3a801f4f700f59e57.tar.gz
added start/stop capture wherever needed. This needs refactoring (maybe use the quiet_run() method)
Diffstat (limited to 'testlib.py')
-rw-r--r--testlib.py169
1 files changed, 138 insertions, 31 deletions
diff --git a/testlib.py b/testlib.py
index 7b5dd65..2abc3f9 100644
--- a/testlib.py
+++ b/testlib.py
@@ -357,14 +357,20 @@ class SkipAwareTestResult(unittest._TextTestResult):
self.stream.writeln("%s" % err)
if self.capture:
output, errput = test.captured_output()
- self.stream.writeln(self.separator2)
- self.stream.writeln("captured stdout".center(len(self.separator2)))
- self.stream.writeln(self.separator2)
- self.stream.writeln(output)
- self.stream.writeln(self.separator2)
- self.stream.writeln("captured stderr".center(len(self.separator2)))
- self.stream.writeln(self.separator2)
- self.stream.writeln(errput)
+ if output:
+ self.stream.writeln(self.separator2)
+ self.stream.writeln("captured stdout".center(len(self.separator2)))
+ self.stream.writeln(self.separator2)
+ self.stream.writeln(output)
+ else:
+ self.stream.writeln('no stdout'.center(len(self.separator2)))
+ if errput:
+ self.stream.writeln(self.separator2)
+ self.stream.writeln("captured stderr".center(len(self.separator2)))
+ self.stream.writeln(self.separator2)
+ self.stream.writeln(errput)
+ else:
+ self.stream.writeln('no stderr'.center(len(self.separator2)))
class SkipAwareTextTestRunner(unittest.TextTestRunner):
@@ -548,15 +554,87 @@ Examples:
sys.exit(not result.wasSuccessful())
-from cStringIO import StringIO
+
+
+class FDCapture:
+ """adapted from py lib (http://codespeak.net/py)
+ Capture IO to/from a given os-level filedescriptor.
+ """
+ def __init__(self, fd, attr='stdout'):
+ self.targetfd = fd
+ self.tmpfile = os.tmpfile() # self.maketempfile()
+ # save original file descriptor
+ self._savefd = os.dup(fd)
+ # override original file descriptor
+ os.dup2(self.tmpfile.fileno(), fd)
+ # also modify sys module directly
+ self.oldval = getattr(sys, attr)
+ setattr(sys, attr, self.tmpfile)
+ self.attr = attr
+
+## def maketempfile(self):
+## tmpf = os.tmpfile()
+## fd = os.dup(tmpf.fileno())
+## newf = os.fdopen(fd, tmpf.mode, 0) # No buffering
+## tmpf.close()
+## return newf
+
+ def restore(self):
+ """restore original fd and returns captured output"""
+ # hack hack hack
+ self.tmpfile.flush()
+ try:
+ ref_file = getattr(sys, '__%s__' % self.attr)
+ ref_file.flush()
+ except AttributeError:
+ pass
+ if hasattr(self.oldval, 'flush'):
+ self.oldval.flush()
+ # restore original file descriptor
+ os.dup2(self._savefd, self.targetfd)
+ # restore sys module
+ setattr(sys, self.attr, self.oldval)
+ # close backup descriptor
+ os.close(self._savefd)
+ # go to beginning of file and read it
+ self.tmpfile.seek(0)
+ return self.tmpfile.read()
+
+
+def _capture(which='stdout'):
+ """private method, should not be called directly
+ (cf. capture_stdout() and capture_stderr())
+ """
+ assert which in ('stdout', 'stderr'), "Can only capture stdout or stderr, not %s" % which
+ if which == 'stdout':
+ fd = 1
+ else:
+ fd = 2
+ return FDCapture(fd, which)
+
+def capture_stdout():
+ """captures the standard output
+
+ returns a handle object which has a `restore()` method.
+ The restore() method returns the captured stdout and restores it
+ """
+ return _capture('stdout')
+
+def capture_stderr():
+ """captures the standard error output
+
+ returns a handle object which has a `restore()` method.
+ The restore() method returns the captured stderr and restores it
+ """
+ return _capture('stderr')
+
def quiet_run(function, *args, **kwargs):
- stdout, stderr = StringIO(), StringIO()
- sys.stdout, sys.stderr = stdout, stderr
+ output = capture_stdout()
+ errput = capture_stderr()
try:
result = function(*args, **kwargs)
finally:
- sys.stdout, sys.stderr = sys.__stdout__, sys.__stderr__
- return result, stdout.getvalue(), stderr.getvalue()
+ return result, output.restore(), errput.restore()
def unittest_main():
@@ -610,17 +688,52 @@ class TestCase(unittest.TestCase):
def captured_output(self):
return self._captured_stdout, self._captured_stderr
-
- def __run__(self, result=None):
+ def _start_capture(self):
+ if self.capture:
+ self._out, self._err = capture_stdout(), capture_stderr()
+
+ def _stop_capture(self):
+ if self.capture:
+ out, err = self._out.restore(), self._err.restore()
+ self._captured_stdout += out
+ self._captured_stderr += err
+
+
+## def quiet_run(self, func, *args, **kwargs):
+## self._start_capture()
+## try:
+## func(*args, **kwargs)
+## except KeyboardInterrupt:
+## self._stop_capture()
+## raise
+## except:
+## self._stop_capture()
+## result.addError(self, self.__exc_info())
+## return
+## self._stop_capture()
+
+
+ def __call__(self, result=None):
+ """rewrite TestCase.__call__ to support generative tests
+ This is mostly a copy/paste from unittest.py (i.e same
+ variable names, same logic, except for the generative tests part)
+ """
+ if result is None:
+ result = self.defaultTestResult()
+ self.capture = getattr(result, 'capture', False)
result.startTest(self)
testMethod = getattr(self, self.__testMethodName)
+ self._start_capture()
try:
try:
self.setUp()
+ self._stop_capture()
except KeyboardInterrupt:
+ self._stop_capture()
raise
except:
+ self._stop_capture()
result.addError(self, self.__exc_info())
return
# generative tests
@@ -630,32 +743,21 @@ class TestCase(unittest.TestCase):
status = self._proceed(result, testMethod)
success = (status == 0)
try:
+ self._start_capture()
self.tearDown()
+ self._stop_capture()
except KeyboardInterrupt:
+ self._stop_capture()
raise
except:
+ self._stop_capture()
result.addError(self, self.__exc_info())
success = False
if success:
result.addSuccess(self)
finally:
result.stopTest(self)
-
- def __call__(self, result=None):
- """rewrite TestCase.__call__ to support generative tests
- This is mostly a copy/paste from unittest.py (i.e same
- variable names, same logic, except for the generative tests part)
- """
- if result is None:
- result = self.defaultTestResult()
- capture = getattr(result, 'capture', False)
- if capture:
- retval, out, err = quiet_run(self.__run__, result)
- self._captured_stdout = out
- self._captured_stderr = err
- return retval
- else:
- return self.__run__(result)
+
def _proceed_generative(self, result, testfunc, args=()):
@@ -693,15 +795,20 @@ class TestCase(unittest.TestCase):
for tearDown to be successfully executed to declare the test as
successful
"""
+ self._start_capture()
kwargs = kwargs or {}
try:
testfunc(*args, **kwargs)
+ self._stop_capture()
except self.failureException:
+ self._stop_capture()
result.addFailure(self, self.__exc_info())
return 1
except KeyboardInterrupt:
+ self._stop_capture()
raise
except:
+ self._stop_capture()
result.addError(self, self.__exc_info())
return 2
return 0