diff options
author | Adrien Di Mascio <Adrien.DiMascio@logilab.fr> | 2006-08-11 10:38:42 +0200 |
---|---|---|
committer | Adrien Di Mascio <Adrien.DiMascio@logilab.fr> | 2006-08-11 10:38:42 +0200 |
commit | 6fb543b4225ca8d51a00e5a3a801f4f700f59e57 (patch) | |
tree | f648c39134d4a819ca71b0c7e201d27058f0f8db /testlib.py | |
parent | 79ccce632009824128bcc83df825e0bb0f5d0dae (diff) | |
download | logilab-common-6fb543b4225ca8d51a00e5a3a801f4f700f59e57.tar.gz |
added start/stop capture wherever needed. This needs refactoring (maybe use the quiet_run() method)
Diffstat (limited to 'testlib.py')
-rw-r--r-- | testlib.py | 169 |
1 files changed, 138 insertions, 31 deletions
@@ -357,14 +357,20 @@ class SkipAwareTestResult(unittest._TextTestResult): self.stream.writeln("%s" % err) if self.capture: output, errput = test.captured_output() - self.stream.writeln(self.separator2) - self.stream.writeln("captured stdout".center(len(self.separator2))) - self.stream.writeln(self.separator2) - self.stream.writeln(output) - self.stream.writeln(self.separator2) - self.stream.writeln("captured stderr".center(len(self.separator2))) - self.stream.writeln(self.separator2) - self.stream.writeln(errput) + if output: + self.stream.writeln(self.separator2) + self.stream.writeln("captured stdout".center(len(self.separator2))) + self.stream.writeln(self.separator2) + self.stream.writeln(output) + else: + self.stream.writeln('no stdout'.center(len(self.separator2))) + if errput: + self.stream.writeln(self.separator2) + self.stream.writeln("captured stderr".center(len(self.separator2))) + self.stream.writeln(self.separator2) + self.stream.writeln(errput) + else: + self.stream.writeln('no stderr'.center(len(self.separator2))) class SkipAwareTextTestRunner(unittest.TextTestRunner): @@ -548,15 +554,87 @@ Examples: sys.exit(not result.wasSuccessful()) -from cStringIO import StringIO + + +class FDCapture: + """adapted from py lib (http://codespeak.net/py) + Capture IO to/from a given os-level filedescriptor. + """ + def __init__(self, fd, attr='stdout'): + self.targetfd = fd + self.tmpfile = os.tmpfile() # self.maketempfile() + # save original file descriptor + self._savefd = os.dup(fd) + # override original file descriptor + os.dup2(self.tmpfile.fileno(), fd) + # also modify sys module directly + self.oldval = getattr(sys, attr) + setattr(sys, attr, self.tmpfile) + self.attr = attr + +## def maketempfile(self): +## tmpf = os.tmpfile() +## fd = os.dup(tmpf.fileno()) +## newf = os.fdopen(fd, tmpf.mode, 0) # No buffering +## tmpf.close() +## return newf + + def restore(self): + """restore original fd and returns captured output""" + # hack hack hack + self.tmpfile.flush() + try: + ref_file = getattr(sys, '__%s__' % self.attr) + ref_file.flush() + except AttributeError: + pass + if hasattr(self.oldval, 'flush'): + self.oldval.flush() + # restore original file descriptor + os.dup2(self._savefd, self.targetfd) + # restore sys module + setattr(sys, self.attr, self.oldval) + # close backup descriptor + os.close(self._savefd) + # go to beginning of file and read it + self.tmpfile.seek(0) + return self.tmpfile.read() + + +def _capture(which='stdout'): + """private method, should not be called directly + (cf. capture_stdout() and capture_stderr()) + """ + assert which in ('stdout', 'stderr'), "Can only capture stdout or stderr, not %s" % which + if which == 'stdout': + fd = 1 + else: + fd = 2 + return FDCapture(fd, which) + +def capture_stdout(): + """captures the standard output + + returns a handle object which has a `restore()` method. + The restore() method returns the captured stdout and restores it + """ + return _capture('stdout') + +def capture_stderr(): + """captures the standard error output + + returns a handle object which has a `restore()` method. + The restore() method returns the captured stderr and restores it + """ + return _capture('stderr') + def quiet_run(function, *args, **kwargs): - stdout, stderr = StringIO(), StringIO() - sys.stdout, sys.stderr = stdout, stderr + output = capture_stdout() + errput = capture_stderr() try: result = function(*args, **kwargs) finally: - sys.stdout, sys.stderr = sys.__stdout__, sys.__stderr__ - return result, stdout.getvalue(), stderr.getvalue() + return result, output.restore(), errput.restore() def unittest_main(): @@ -610,17 +688,52 @@ class TestCase(unittest.TestCase): def captured_output(self): return self._captured_stdout, self._captured_stderr - - def __run__(self, result=None): + def _start_capture(self): + if self.capture: + self._out, self._err = capture_stdout(), capture_stderr() + + def _stop_capture(self): + if self.capture: + out, err = self._out.restore(), self._err.restore() + self._captured_stdout += out + self._captured_stderr += err + + +## def quiet_run(self, func, *args, **kwargs): +## self._start_capture() +## try: +## func(*args, **kwargs) +## except KeyboardInterrupt: +## self._stop_capture() +## raise +## except: +## self._stop_capture() +## result.addError(self, self.__exc_info()) +## return +## self._stop_capture() + + + def __call__(self, result=None): + """rewrite TestCase.__call__ to support generative tests + This is mostly a copy/paste from unittest.py (i.e same + variable names, same logic, except for the generative tests part) + """ + if result is None: + result = self.defaultTestResult() + self.capture = getattr(result, 'capture', False) result.startTest(self) testMethod = getattr(self, self.__testMethodName) + self._start_capture() try: try: self.setUp() + self._stop_capture() except KeyboardInterrupt: + self._stop_capture() raise except: + self._stop_capture() result.addError(self, self.__exc_info()) return # generative tests @@ -630,32 +743,21 @@ class TestCase(unittest.TestCase): status = self._proceed(result, testMethod) success = (status == 0) try: + self._start_capture() self.tearDown() + self._stop_capture() except KeyboardInterrupt: + self._stop_capture() raise except: + self._stop_capture() result.addError(self, self.__exc_info()) success = False if success: result.addSuccess(self) finally: result.stopTest(self) - - def __call__(self, result=None): - """rewrite TestCase.__call__ to support generative tests - This is mostly a copy/paste from unittest.py (i.e same - variable names, same logic, except for the generative tests part) - """ - if result is None: - result = self.defaultTestResult() - capture = getattr(result, 'capture', False) - if capture: - retval, out, err = quiet_run(self.__run__, result) - self._captured_stdout = out - self._captured_stderr = err - return retval - else: - return self.__run__(result) + def _proceed_generative(self, result, testfunc, args=()): @@ -693,15 +795,20 @@ class TestCase(unittest.TestCase): for tearDown to be successfully executed to declare the test as successful """ + self._start_capture() kwargs = kwargs or {} try: testfunc(*args, **kwargs) + self._stop_capture() except self.failureException: + self._stop_capture() result.addFailure(self, self.__exc_info()) return 1 except KeyboardInterrupt: + self._stop_capture() raise except: + self._stop_capture() result.addError(self, self.__exc_info()) return 2 return 0 |