summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Anclin <emile.anclin@logilab.fr>2010-12-06 11:28:36 +0100
committerEmile Anclin <emile.anclin@logilab.fr>2010-12-06 11:28:36 +0100
commite74ebf98430fbf8020b92951b87875d5edf59c0c (patch)
tree1914c6c27c7e2e244fa560c4c60e722a7cfe4679
parent07982f1384a38583546b0b7d451ab4b1ad985c22 (diff)
downloadlogilab-common-e74ebf98430fbf8020b92951b87875d5edf59c0c.tar.gz
cleanup: remove DFCapture
-rw-r--r--test/unittest_testlib.py75
-rw-r--r--testlib.py138
2 files changed, 3 insertions, 210 deletions
diff --git a/test/unittest_testlib.py b/test/unittest_testlib.py
index b506a7b..a118620 100644
--- a/test/unittest_testlib.py
+++ b/test/unittest_testlib.py
@@ -29,11 +29,9 @@ try:
except NameError:
__file__ = sys.argv[0]
-from logilab.common.testlib import unittest, TestSuite, unittest_main
-from logilab.common.testlib import TestCase, Tags
-from logilab.common.testlib import mock_object, create_files
-from logilab.common.testlib import capture_stdout, InnerTest, with_tempdir, tag
-from logilab.common.testlib import require_version, require_module
+from logilab.common.testlib import (unittest, TestSuite, unittest_main, Tags,
+ TestCase, mock_object, create_files, InnerTest, with_tempdir, tag,
+ require_version, require_module)
from logilab.common.pytest import SkipAwareTextTestRunner, NonStrictTestLoader
@@ -629,11 +627,6 @@ class TestLoaderTC(TestCase):
self.assertRunCount(None, MyMod, 2)
-def bootstrap_print(msg, output=sys.stdout):
- """sys.stdout will be evaluated at function parsing time"""
- # print msg
- output.write(msg)
-
class OutErrCaptureTC(TestCase):
def setUp(self):
@@ -644,68 +637,6 @@ class OutErrCaptureTC(TestCase):
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
- @unittest.skipIf(not sys.stdout.isatty(), "need stdout")
- def test_stdout_capture(self):
- class FooTC(TestCase):
- def test_stdout(self):
- print "foo"
- self.assert_(False)
- test = FooTC('test_stdout')
- result = self.runner.run(test)
- captured_out, captured_err = test.captured_output()
- self.assertEqual(captured_out.strip(), "foo")
- self.assertEqual(captured_err.strip(), "")
-
- @unittest.skipIf(not sys.stderr.isatty(), "need stderr")
- def test_stderr_capture(self):
- class FooTC(TestCase):
- def test_stderr(self):
- print >> sys.stderr, "foo"
- self.assert_(False)
- test = FooTC('test_stderr')
- result = self.runner.run(test)
- captured_out, captured_err = test.captured_output()
- self.assertEqual(captured_out.strip(), "")
- self.assertEqual(captured_err.strip(), "foo")
-
- @unittest.skipIf(not sys.stderr.isatty(), "need stderr")
- @unittest.skipIf(not sys.stdout.isatty(), "need stdout")
- def test_both_capture(self):
- class FooTC(TestCase):
- def test_stderr(self):
- print >> sys.stderr, "foo"
- print "bar"
- self.assert_(False)
- test = FooTC('test_stderr')
- result = self.runner.run(test)
- captured_out, captured_err = test.captured_output()
- self.assertEqual(captured_out.strip(), "bar")
- self.assertEqual(captured_err.strip(), "foo")
-
- @unittest.skipIf(not sys.stderr.isatty(), "need stderr")
- @unittest.skipIf(not sys.stdout.isatty(), "need stdout")
- def test_no_capture(self):
- class FooTC(TestCase):
- def test_stderr(self):
- print >> sys.stderr, "foo"
- print "bar"
- self.assert_(False)
- test = FooTC('test_stderr')
- # this runner should not capture stdout / stderr
- runner = SkipAwareTextTestRunner(stream=StringIO(), exitfirst=True)
- result = runner.run(test)
- captured_out, captured_err = test.captured_output()
- self.assertEqual(captured_out.strip(), "")
- self.assertEqual(captured_err.strip(), "")
-
- @unittest.skipIf(not sys.stdout.isatty(), "need stdout")
- def test_capture_core(self):
- # output = capture_stdout()
- # bootstrap_print("hello", output=sys.stdout)
- # self.assertEqual(output.restore(), "hello")
- output = capture_stdout()
- bootstrap_print("hello")
- self.assertEqual(output.restore(), "hello")
def test_unicode_non_ascii_messages(self):
class FooTC(TestCase):
diff --git a/testlib.py b/testlib.py
index 6750f7b..d10db6c 100644
--- a/testlib.py
+++ b/testlib.py
@@ -377,91 +377,6 @@ class starargs(tuple):
def __new__(cls, *args):
return tuple.__new__(cls, args)
-
-class FDCapture:
- """adapted from py lib (http://codespeak.net/py)
- Capture IO to/from a given os-level filedescriptor.
- """
- def __init__(self, fd, attr='stdout', printonly=None):
- self.targetfd = fd
- self.tmpfile = os.tmpfile() # self.maketempfile()
- self.printonly = printonly
- # save original file descriptor
- self._savefd = os.dup(fd)
- # override original file descriptor
- os.dup2(self.tmpfile.fileno(), fd)
- # also modify sys module directly
- self.oldval = getattr(sys, attr)
- setattr(sys, attr, self) # self.tmpfile)
- self.attr = attr
-
- def write(self, msg):
- # msg might be composed of several lines
- for line in msg.splitlines():
- line += '\n' # keepdend=True is not enough
- if self.printonly is None or self.printonly.search(line) is None:
- self.tmpfile.write(line)
- else:
- os.write(self._savefd, line)
-
-## def maketempfile(self):
-## tmpf = os.tmpfile()
-## fd = os.dup(tmpf.fileno())
-## newf = os.fdopen(fd, tmpf.mode, 0) # No buffering
-## tmpf.close()
-## return newf
-
- def restore(self):
- """restore original fd and returns captured output"""
- #XXX: hack hack hack
- self.tmpfile.flush()
- try:
- ref_file = getattr(sys, '__%s__' % self.attr)
- ref_file.flush()
- except AttributeError:
- pass
- if hasattr(self.oldval, 'flush'):
- self.oldval.flush()
- # restore original file descriptor
- os.dup2(self._savefd, self.targetfd)
- # restore sys module
- setattr(sys, self.attr, self.oldval)
- # close backup descriptor
- os.close(self._savefd)
- # go to beginning of file and read it
- self.tmpfile.seek(0)
- return self.tmpfile.read()
-
-
-def _capture(which='stdout', printonly=None):
- """private method, should not be called directly
- (cf. capture_stdout() and capture_stderr())
- """
- assert which in ('stdout', 'stderr'
- ), "Can only capture stdout or stderr, not %s" % which
- if which == 'stdout':
- fd = 1
- else:
- fd = 2
- return FDCapture(fd, which, printonly)
-
-def capture_stdout(printonly=None):
- """captures the standard output
-
- returns a handle object which has a `restore()` method.
- The restore() method returns the captured stdout and restores it
- """
- return _capture('stdout', printonly)
-
-def capture_stderr(printonly=None):
- """captures the standard error output
-
- returns a handle object which has a `restore()` method.
- The restore() method returns the captured stderr and restores it
- """
- return _capture('stderr', printonly)
-
-
unittest_main = unittest.main
@@ -547,8 +462,6 @@ class TestCase(unittest.TestCase):
self._testMethodName = self.__testMethodName
self._captured_stdout = ""
self._captured_stderr = ""
- self._out = []
- self._err = []
self._current_test_descr = None
self._options_ = None
@@ -590,57 +503,14 @@ class TestCase(unittest.TestCase):
"""return a two tuple with standard output and error stripped"""
return self._captured_stdout.strip(), self._captured_stderr.strip()
- def _start_capture(self):
- """start_capture if enable"""
- if self.capture:
- warnings.simplefilter('ignore', DeprecationWarning)
- self.start_capture()
-
- def _stop_capture(self):
- """stop_capture and restore previous output"""
- if self.capture:
- self._force_output_restore()
-
- def start_capture(self, printonly=None):
- """start_capture"""
- self._out.append(capture_stdout(printonly or self._printonly))
- self._err.append(capture_stderr(printonly or self._printonly))
-
- def printonly(self, pattern, flags=0):
- """set the pattern of line to print"""
- rgx = re.compile(pattern, flags)
- if self._out:
- self._out[-1].printonly = rgx
- self._err[-1].printonly = rgx
- else:
- self.start_capture(printonly=rgx)
-
- def stop_capture(self):
- """stop output and error capture"""
- if self._out:
- _out = self._out.pop()
- _err = self._err.pop()
- return _out.restore(), _err.restore()
- return '', ''
-
- def _force_output_restore(self):
- """remove all capture set"""
- while self._out:
- self._captured_stdout += self._out.pop().restore()
- self._captured_stderr += self._err.pop().restore()
-
def quiet_run(self, result, func, *args, **kwargs):
- self._start_capture()
try:
func(*args, **kwargs)
except (KeyboardInterrupt, SystemExit):
- self._stop_capture()
raise
except:
- self._stop_capture()
result.addError(self, self.__exc_info())
return False
- self._stop_capture()
return True
def _get_test_method(self):
@@ -664,7 +534,6 @@ class TestCase(unittest.TestCase):
# in the user's TestCase class. If not, do what was asked on cmd line
self.capture = self.capture or getattr(result, 'capture', False)
self._options_ = options
- self._printonly = getattr(result, 'printonly', None)
# if result.cvg:
# result.cvg.start()
testMethod = self._get_test_method()
@@ -709,7 +578,6 @@ succeeded test into", osp.join(os.getcwd(), FILE_RESTART)
def _proceed_generative(self, result, testfunc, runcondition=None):
# cancel startTest()'s increment
result.testsRun -= 1
- self._start_capture()
success = True
try:
for params in testfunc():
@@ -738,7 +606,6 @@ succeeded test into", osp.join(os.getcwd(), FILE_RESTART)
# if an error occurs between two yield
result.addError(self, self.__exc_info())
success = False
- self._stop_capture()
return success
def _proceed(self, result, testfunc, args=(), kwargs=None):
@@ -749,23 +616,18 @@ succeeded test into", osp.join(os.getcwd(), FILE_RESTART)
for tearDown to be successfully executed to declare the test as
successful
"""
- self._start_capture()
kwargs = kwargs or {}
try:
testfunc(*args, **kwargs)
- self._stop_capture()
except self.failureException:
- self._stop_capture()
result.addFailure(self, self.__exc_info())
return 1
except KeyboardInterrupt:
- self._stop_capture()
raise
except InnerTestSkipped, e:
result.addSkip(self, e)
return 1
except:
- self._stop_capture()
result.addError(self, self.__exc_info())
return 2
return 0