From e74ebf98430fbf8020b92951b87875d5edf59c0c Mon Sep 17 00:00:00 2001 From: Emile Anclin Date: Mon, 6 Dec 2010 11:28:36 +0100 Subject: cleanup: remove DFCapture --- test/unittest_testlib.py | 75 ++------------------------ testlib.py | 138 ----------------------------------------------- 2 files changed, 3 insertions(+), 210 deletions(-) diff --git a/test/unittest_testlib.py b/test/unittest_testlib.py index b506a7b..a118620 100644 --- a/test/unittest_testlib.py +++ b/test/unittest_testlib.py @@ -29,11 +29,9 @@ try: except NameError: __file__ = sys.argv[0] -from logilab.common.testlib import unittest, TestSuite, unittest_main -from logilab.common.testlib import TestCase, Tags -from logilab.common.testlib import mock_object, create_files -from logilab.common.testlib import capture_stdout, InnerTest, with_tempdir, tag -from logilab.common.testlib import require_version, require_module +from logilab.common.testlib import (unittest, TestSuite, unittest_main, Tags, + TestCase, mock_object, create_files, InnerTest, with_tempdir, tag, + require_version, require_module) from logilab.common.pytest import SkipAwareTextTestRunner, NonStrictTestLoader @@ -629,11 +627,6 @@ class TestLoaderTC(TestCase): self.assertRunCount(None, MyMod, 2) -def bootstrap_print(msg, output=sys.stdout): - """sys.stdout will be evaluated at function parsing time""" - # print msg - output.write(msg) - class OutErrCaptureTC(TestCase): def setUp(self): @@ -644,68 +637,6 @@ class OutErrCaptureTC(TestCase): sys.stdout = sys.__stdout__ sys.stderr = sys.__stderr__ - @unittest.skipIf(not sys.stdout.isatty(), "need stdout") - def test_stdout_capture(self): - class FooTC(TestCase): - def test_stdout(self): - print "foo" - self.assert_(False) - test = FooTC('test_stdout') - result = self.runner.run(test) - captured_out, captured_err = test.captured_output() - self.assertEqual(captured_out.strip(), "foo") - self.assertEqual(captured_err.strip(), "") - - @unittest.skipIf(not sys.stderr.isatty(), "need stderr") - def test_stderr_capture(self): - class FooTC(TestCase): - def test_stderr(self): - print >> sys.stderr, "foo" - self.assert_(False) - test = FooTC('test_stderr') - result = self.runner.run(test) - captured_out, captured_err = test.captured_output() - self.assertEqual(captured_out.strip(), "") - self.assertEqual(captured_err.strip(), "foo") - - @unittest.skipIf(not sys.stderr.isatty(), "need stderr") - @unittest.skipIf(not sys.stdout.isatty(), "need stdout") - def test_both_capture(self): - class FooTC(TestCase): - def test_stderr(self): - print >> sys.stderr, "foo" - print "bar" - self.assert_(False) - test = FooTC('test_stderr') - result = self.runner.run(test) - captured_out, captured_err = test.captured_output() - self.assertEqual(captured_out.strip(), "bar") - self.assertEqual(captured_err.strip(), "foo") - - @unittest.skipIf(not sys.stderr.isatty(), "need stderr") - @unittest.skipIf(not sys.stdout.isatty(), "need stdout") - def test_no_capture(self): - class FooTC(TestCase): - def test_stderr(self): - print >> sys.stderr, "foo" - print "bar" - self.assert_(False) - test = FooTC('test_stderr') - # this runner should not capture stdout / stderr - runner = SkipAwareTextTestRunner(stream=StringIO(), exitfirst=True) - result = runner.run(test) - captured_out, captured_err = test.captured_output() - self.assertEqual(captured_out.strip(), "") - self.assertEqual(captured_err.strip(), "") - - @unittest.skipIf(not sys.stdout.isatty(), "need stdout") - def test_capture_core(self): - # output = capture_stdout() - # bootstrap_print("hello", output=sys.stdout) - # self.assertEqual(output.restore(), "hello") - output = capture_stdout() - bootstrap_print("hello") - self.assertEqual(output.restore(), "hello") def test_unicode_non_ascii_messages(self): class FooTC(TestCase): diff --git a/testlib.py b/testlib.py index 6750f7b..d10db6c 100644 --- a/testlib.py +++ b/testlib.py @@ -377,91 +377,6 @@ class starargs(tuple): def __new__(cls, *args): return tuple.__new__(cls, args) - -class FDCapture: - """adapted from py lib (http://codespeak.net/py) - Capture IO to/from a given os-level filedescriptor. - """ - def __init__(self, fd, attr='stdout', printonly=None): - self.targetfd = fd - self.tmpfile = os.tmpfile() # self.maketempfile() - self.printonly = printonly - # save original file descriptor - self._savefd = os.dup(fd) - # override original file descriptor - os.dup2(self.tmpfile.fileno(), fd) - # also modify sys module directly - self.oldval = getattr(sys, attr) - setattr(sys, attr, self) # self.tmpfile) - self.attr = attr - - def write(self, msg): - # msg might be composed of several lines - for line in msg.splitlines(): - line += '\n' # keepdend=True is not enough - if self.printonly is None or self.printonly.search(line) is None: - self.tmpfile.write(line) - else: - os.write(self._savefd, line) - -## def maketempfile(self): -## tmpf = os.tmpfile() -## fd = os.dup(tmpf.fileno()) -## newf = os.fdopen(fd, tmpf.mode, 0) # No buffering -## tmpf.close() -## return newf - - def restore(self): - """restore original fd and returns captured output""" - #XXX: hack hack hack - self.tmpfile.flush() - try: - ref_file = getattr(sys, '__%s__' % self.attr) - ref_file.flush() - except AttributeError: - pass - if hasattr(self.oldval, 'flush'): - self.oldval.flush() - # restore original file descriptor - os.dup2(self._savefd, self.targetfd) - # restore sys module - setattr(sys, self.attr, self.oldval) - # close backup descriptor - os.close(self._savefd) - # go to beginning of file and read it - self.tmpfile.seek(0) - return self.tmpfile.read() - - -def _capture(which='stdout', printonly=None): - """private method, should not be called directly - (cf. capture_stdout() and capture_stderr()) - """ - assert which in ('stdout', 'stderr' - ), "Can only capture stdout or stderr, not %s" % which - if which == 'stdout': - fd = 1 - else: - fd = 2 - return FDCapture(fd, which, printonly) - -def capture_stdout(printonly=None): - """captures the standard output - - returns a handle object which has a `restore()` method. - The restore() method returns the captured stdout and restores it - """ - return _capture('stdout', printonly) - -def capture_stderr(printonly=None): - """captures the standard error output - - returns a handle object which has a `restore()` method. - The restore() method returns the captured stderr and restores it - """ - return _capture('stderr', printonly) - - unittest_main = unittest.main @@ -547,8 +462,6 @@ class TestCase(unittest.TestCase): self._testMethodName = self.__testMethodName self._captured_stdout = "" self._captured_stderr = "" - self._out = [] - self._err = [] self._current_test_descr = None self._options_ = None @@ -590,57 +503,14 @@ class TestCase(unittest.TestCase): """return a two tuple with standard output and error stripped""" return self._captured_stdout.strip(), self._captured_stderr.strip() - def _start_capture(self): - """start_capture if enable""" - if self.capture: - warnings.simplefilter('ignore', DeprecationWarning) - self.start_capture() - - def _stop_capture(self): - """stop_capture and restore previous output""" - if self.capture: - self._force_output_restore() - - def start_capture(self, printonly=None): - """start_capture""" - self._out.append(capture_stdout(printonly or self._printonly)) - self._err.append(capture_stderr(printonly or self._printonly)) - - def printonly(self, pattern, flags=0): - """set the pattern of line to print""" - rgx = re.compile(pattern, flags) - if self._out: - self._out[-1].printonly = rgx - self._err[-1].printonly = rgx - else: - self.start_capture(printonly=rgx) - - def stop_capture(self): - """stop output and error capture""" - if self._out: - _out = self._out.pop() - _err = self._err.pop() - return _out.restore(), _err.restore() - return '', '' - - def _force_output_restore(self): - """remove all capture set""" - while self._out: - self._captured_stdout += self._out.pop().restore() - self._captured_stderr += self._err.pop().restore() - def quiet_run(self, result, func, *args, **kwargs): - self._start_capture() try: func(*args, **kwargs) except (KeyboardInterrupt, SystemExit): - self._stop_capture() raise except: - self._stop_capture() result.addError(self, self.__exc_info()) return False - self._stop_capture() return True def _get_test_method(self): @@ -664,7 +534,6 @@ class TestCase(unittest.TestCase): # in the user's TestCase class. If not, do what was asked on cmd line self.capture = self.capture or getattr(result, 'capture', False) self._options_ = options - self._printonly = getattr(result, 'printonly', None) # if result.cvg: # result.cvg.start() testMethod = self._get_test_method() @@ -709,7 +578,6 @@ succeeded test into", osp.join(os.getcwd(), FILE_RESTART) def _proceed_generative(self, result, testfunc, runcondition=None): # cancel startTest()'s increment result.testsRun -= 1 - self._start_capture() success = True try: for params in testfunc(): @@ -738,7 +606,6 @@ succeeded test into", osp.join(os.getcwd(), FILE_RESTART) # if an error occurs between two yield result.addError(self, self.__exc_info()) success = False - self._stop_capture() return success def _proceed(self, result, testfunc, args=(), kwargs=None): @@ -749,23 +616,18 @@ succeeded test into", osp.join(os.getcwd(), FILE_RESTART) for tearDown to be successfully executed to declare the test as successful """ - self._start_capture() kwargs = kwargs or {} try: testfunc(*args, **kwargs) - self._stop_capture() except self.failureException: - self._stop_capture() result.addFailure(self, self.__exc_info()) return 1 except KeyboardInterrupt: - self._stop_capture() raise except InnerTestSkipped, e: result.addSkip(self, e) return 1 except: - self._stop_capture() result.addError(self, self.__exc_info()) return 2 return 0 -- cgit v1.2.1