diff options
Diffstat (limited to 'bzrlib/tests/test_selftest.py')
-rw-r--r-- | bzrlib/tests/test_selftest.py | 3664 |
1 files changed, 3664 insertions, 0 deletions
diff --git a/bzrlib/tests/test_selftest.py b/bzrlib/tests/test_selftest.py new file mode 100644 index 0000000..b213297 --- /dev/null +++ b/bzrlib/tests/test_selftest.py @@ -0,0 +1,3664 @@ +# Copyright (C) 2005-2011 Canonical Ltd +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +"""Tests for the test framework.""" + +from cStringIO import StringIO +import gc +import doctest +import os +import signal +import sys +import threading +import time +import unittest +import warnings + +from testtools import ( + ExtendedToOriginalDecorator, + MultiTestResult, + ) +from testtools.content import Content +from testtools.content_type import ContentType +from testtools.matchers import ( + DocTestMatches, + Equals, + ) +import testtools.testresult.doubles + +import bzrlib +from bzrlib import ( + branchbuilder, + bzrdir, + controldir, + errors, + hooks, + lockdir, + memorytree, + osutils, + remote, + repository, + symbol_versioning, + tests, + transport, + workingtree, + workingtree_3, + workingtree_4, + ) +from bzrlib.repofmt import ( + groupcompress_repo, + ) +from bzrlib.symbol_versioning import ( + deprecated_function, + deprecated_in, + deprecated_method, + ) +from bzrlib.tests import ( + features, + test_lsprof, + test_server, + TestUtil, + ) +from bzrlib.trace import note, mutter +from bzrlib.transport import memory + + +def _test_ids(test_suite): + """Get the ids for the tests in a test suite.""" + return [t.id() for t in tests.iter_suite_tests(test_suite)] + + +class MetaTestLog(tests.TestCase): + + def test_logging(self): + """Test logs are captured when a test fails.""" + self.log('a test message') + details = self.getDetails() + log = details['log'] + self.assertThat(log.content_type, Equals(ContentType( + "text", "plain", {"charset": "utf8"}))) + self.assertThat(u"".join(log.iter_text()), Equals(self.get_log())) + self.assertThat(self.get_log(), + DocTestMatches(u"...a test message\n", doctest.ELLIPSIS)) + + +class TestTreeShape(tests.TestCaseInTempDir): + + def test_unicode_paths(self): + self.requireFeature(features.UnicodeFilenameFeature) + + filename = u'hell\u00d8' + self.build_tree_contents([(filename, 'contents of hello')]) + self.assertPathExists(filename) + + +class TestClassesAvailable(tests.TestCase): + """As a convenience we expose Test* classes from bzrlib.tests""" + + def test_test_case(self): + from bzrlib.tests import TestCase + + def test_test_loader(self): + from bzrlib.tests import TestLoader + + def test_test_suite(self): + from bzrlib.tests import TestSuite + + +class TestTransportScenarios(tests.TestCase): + """A group of tests that test the transport implementation adaption core. + + This is a meta test that the tests are applied to all available + transports. + + This will be generalised in the future which is why it is in this + test file even though it is specific to transport tests at the moment. + """ + + def test_get_transport_permutations(self): + # this checks that get_test_permutations defined by the module is + # called by the get_transport_test_permutations function. + class MockModule(object): + def get_test_permutations(self): + return sample_permutation + sample_permutation = [(1,2), (3,4)] + from bzrlib.tests.per_transport import get_transport_test_permutations + self.assertEqual(sample_permutation, + get_transport_test_permutations(MockModule())) + + def test_scenarios_include_all_modules(self): + # this checks that the scenario generator returns as many permutations + # as there are in all the registered transport modules - we assume if + # this matches its probably doing the right thing especially in + # combination with the tests for setting the right classes below. + from bzrlib.tests.per_transport import transport_test_permutations + from bzrlib.transport import _get_transport_modules + modules = _get_transport_modules() + permutation_count = 0 + for module in modules: + try: + permutation_count += len(reduce(getattr, + (module + ".get_test_permutations").split('.')[1:], + __import__(module))()) + except errors.DependencyNotPresent: + pass + scenarios = transport_test_permutations() + self.assertEqual(permutation_count, len(scenarios)) + + def test_scenarios_include_transport_class(self): + # This test used to know about all the possible transports and the + # order they were returned but that seems overly brittle (mbp + # 20060307) + from bzrlib.tests.per_transport import transport_test_permutations + scenarios = transport_test_permutations() + # there are at least that many builtin transports + self.assertTrue(len(scenarios) > 6) + one_scenario = scenarios[0] + self.assertIsInstance(one_scenario[0], str) + self.assertTrue(issubclass(one_scenario[1]["transport_class"], + bzrlib.transport.Transport)) + self.assertTrue(issubclass(one_scenario[1]["transport_server"], + bzrlib.transport.Server)) + + +class TestBranchScenarios(tests.TestCase): + + def test_scenarios(self): + # check that constructor parameters are passed through to the adapted + # test. + from bzrlib.tests.per_branch import make_scenarios + server1 = "a" + server2 = "b" + formats = [("c", "C"), ("d", "D")] + scenarios = make_scenarios(server1, server2, formats) + self.assertEqual(2, len(scenarios)) + self.assertEqual([ + ('str', + {'branch_format': 'c', + 'bzrdir_format': 'C', + 'transport_readonly_server': 'b', + 'transport_server': 'a'}), + ('str', + {'branch_format': 'd', + 'bzrdir_format': 'D', + 'transport_readonly_server': 'b', + 'transport_server': 'a'})], + scenarios) + + +class TestBzrDirScenarios(tests.TestCase): + + def test_scenarios(self): + # check that constructor parameters are passed through to the adapted + # test. + from bzrlib.tests.per_controldir import make_scenarios + vfs_factory = "v" + server1 = "a" + server2 = "b" + formats = ["c", "d"] + scenarios = make_scenarios(vfs_factory, server1, server2, formats) + self.assertEqual([ + ('str', + {'bzrdir_format': 'c', + 'transport_readonly_server': 'b', + 'transport_server': 'a', + 'vfs_transport_factory': 'v'}), + ('str', + {'bzrdir_format': 'd', + 'transport_readonly_server': 'b', + 'transport_server': 'a', + 'vfs_transport_factory': 'v'})], + scenarios) + + +class TestRepositoryScenarios(tests.TestCase): + + def test_formats_to_scenarios(self): + from bzrlib.tests.per_repository import formats_to_scenarios + formats = [("(c)", remote.RemoteRepositoryFormat()), + ("(d)", repository.format_registry.get( + 'Bazaar repository format 2a (needs bzr 1.16 or later)\n'))] + no_vfs_scenarios = formats_to_scenarios(formats, "server", "readonly", + None) + vfs_scenarios = formats_to_scenarios(formats, "server", "readonly", + vfs_transport_factory="vfs") + # no_vfs generate scenarios without vfs_transport_factory + expected = [ + ('RemoteRepositoryFormat(c)', + {'bzrdir_format': remote.RemoteBzrDirFormat(), + 'repository_format': remote.RemoteRepositoryFormat(), + 'transport_readonly_server': 'readonly', + 'transport_server': 'server'}), + ('RepositoryFormat2a(d)', + {'bzrdir_format': bzrdir.BzrDirMetaFormat1(), + 'repository_format': groupcompress_repo.RepositoryFormat2a(), + 'transport_readonly_server': 'readonly', + 'transport_server': 'server'})] + self.assertEqual(expected, no_vfs_scenarios) + self.assertEqual([ + ('RemoteRepositoryFormat(c)', + {'bzrdir_format': remote.RemoteBzrDirFormat(), + 'repository_format': remote.RemoteRepositoryFormat(), + 'transport_readonly_server': 'readonly', + 'transport_server': 'server', + 'vfs_transport_factory': 'vfs'}), + ('RepositoryFormat2a(d)', + {'bzrdir_format': bzrdir.BzrDirMetaFormat1(), + 'repository_format': groupcompress_repo.RepositoryFormat2a(), + 'transport_readonly_server': 'readonly', + 'transport_server': 'server', + 'vfs_transport_factory': 'vfs'})], + vfs_scenarios) + + +class TestTestScenarioApplication(tests.TestCase): + """Tests for the test adaption facilities.""" + + def test_apply_scenario(self): + from bzrlib.tests import apply_scenario + input_test = TestTestScenarioApplication("test_apply_scenario") + # setup two adapted tests + adapted_test1 = apply_scenario(input_test, + ("new id", + {"bzrdir_format":"bzr_format", + "repository_format":"repo_fmt", + "transport_server":"transport_server", + "transport_readonly_server":"readonly-server"})) + adapted_test2 = apply_scenario(input_test, + ("new id 2", {"bzrdir_format":None})) + # input_test should have been altered. + self.assertRaises(AttributeError, getattr, input_test, "bzrdir_format") + # the new tests are mutually incompatible, ensuring it has + # made new ones, and unspecified elements in the scenario + # should not have been altered. + self.assertEqual("bzr_format", adapted_test1.bzrdir_format) + self.assertEqual("repo_fmt", adapted_test1.repository_format) + self.assertEqual("transport_server", adapted_test1.transport_server) + self.assertEqual("readonly-server", + adapted_test1.transport_readonly_server) + self.assertEqual( + "bzrlib.tests.test_selftest.TestTestScenarioApplication." + "test_apply_scenario(new id)", + adapted_test1.id()) + self.assertEqual(None, adapted_test2.bzrdir_format) + self.assertEqual( + "bzrlib.tests.test_selftest.TestTestScenarioApplication." + "test_apply_scenario(new id 2)", + adapted_test2.id()) + + +class TestInterRepositoryScenarios(tests.TestCase): + + def test_scenarios(self): + # check that constructor parameters are passed through to the adapted + # test. + from bzrlib.tests.per_interrepository import make_scenarios + server1 = "a" + server2 = "b" + formats = [("C0", "C1", "C2", "C3"), ("D0", "D1", "D2", "D3")] + scenarios = make_scenarios(server1, server2, formats) + self.assertEqual([ + ('C0,str,str', + {'repository_format': 'C1', + 'repository_format_to': 'C2', + 'transport_readonly_server': 'b', + 'transport_server': 'a', + 'extra_setup': 'C3'}), + ('D0,str,str', + {'repository_format': 'D1', + 'repository_format_to': 'D2', + 'transport_readonly_server': 'b', + 'transport_server': 'a', + 'extra_setup': 'D3'})], + scenarios) + + +class TestWorkingTreeScenarios(tests.TestCase): + + def test_scenarios(self): + # check that constructor parameters are passed through to the adapted + # test. + from bzrlib.tests.per_workingtree import make_scenarios + server1 = "a" + server2 = "b" + formats = [workingtree_4.WorkingTreeFormat4(), + workingtree_3.WorkingTreeFormat3(),] + scenarios = make_scenarios(server1, server2, formats) + self.assertEqual([ + ('WorkingTreeFormat4', + {'bzrdir_format': formats[0]._matchingbzrdir, + 'transport_readonly_server': 'b', + 'transport_server': 'a', + 'workingtree_format': formats[0]}), + ('WorkingTreeFormat3', + {'bzrdir_format': formats[1]._matchingbzrdir, + 'transport_readonly_server': 'b', + 'transport_server': 'a', + 'workingtree_format': formats[1]})], + scenarios) + + +class TestTreeScenarios(tests.TestCase): + + def test_scenarios(self): + # the tree implementation scenario generator is meant to setup one + # instance for each working tree format, and one additional instance + # that will use the default wt format, but create a revision tree for + # the tests. this means that the wt ones should have the + # workingtree_to_test_tree attribute set to 'return_parameter' and the + # revision one set to revision_tree_from_workingtree. + + from bzrlib.tests.per_tree import ( + _dirstate_tree_from_workingtree, + make_scenarios, + preview_tree_pre, + preview_tree_post, + return_parameter, + revision_tree_from_workingtree + ) + server1 = "a" + server2 = "b" + formats = [workingtree_4.WorkingTreeFormat4(), + workingtree_3.WorkingTreeFormat3(),] + scenarios = make_scenarios(server1, server2, formats) + self.assertEqual(7, len(scenarios)) + default_wt_format = workingtree.format_registry.get_default() + wt4_format = workingtree_4.WorkingTreeFormat4() + wt5_format = workingtree_4.WorkingTreeFormat5() + expected_scenarios = [ + ('WorkingTreeFormat4', + {'bzrdir_format': formats[0]._matchingbzrdir, + 'transport_readonly_server': 'b', + 'transport_server': 'a', + 'workingtree_format': formats[0], + '_workingtree_to_test_tree': return_parameter, + }), + ('WorkingTreeFormat3', + {'bzrdir_format': formats[1]._matchingbzrdir, + 'transport_readonly_server': 'b', + 'transport_server': 'a', + 'workingtree_format': formats[1], + '_workingtree_to_test_tree': return_parameter, + }), + ('RevisionTree', + {'_workingtree_to_test_tree': revision_tree_from_workingtree, + 'bzrdir_format': default_wt_format._matchingbzrdir, + 'transport_readonly_server': 'b', + 'transport_server': 'a', + 'workingtree_format': default_wt_format, + }), + ('DirStateRevisionTree,WT4', + {'_workingtree_to_test_tree': _dirstate_tree_from_workingtree, + 'bzrdir_format': wt4_format._matchingbzrdir, + 'transport_readonly_server': 'b', + 'transport_server': 'a', + 'workingtree_format': wt4_format, + }), + ('DirStateRevisionTree,WT5', + {'_workingtree_to_test_tree': _dirstate_tree_from_workingtree, + 'bzrdir_format': wt5_format._matchingbzrdir, + 'transport_readonly_server': 'b', + 'transport_server': 'a', + 'workingtree_format': wt5_format, + }), + ('PreviewTree', + {'_workingtree_to_test_tree': preview_tree_pre, + 'bzrdir_format': default_wt_format._matchingbzrdir, + 'transport_readonly_server': 'b', + 'transport_server': 'a', + 'workingtree_format': default_wt_format}), + ('PreviewTreePost', + {'_workingtree_to_test_tree': preview_tree_post, + 'bzrdir_format': default_wt_format._matchingbzrdir, + 'transport_readonly_server': 'b', + 'transport_server': 'a', + 'workingtree_format': default_wt_format}), + ] + self.assertEqual(expected_scenarios, scenarios) + + +class TestInterTreeScenarios(tests.TestCase): + """A group of tests that test the InterTreeTestAdapter.""" + + def test_scenarios(self): + # check that constructor parameters are passed through to the adapted + # test. + # for InterTree tests we want the machinery to bring up two trees in + # each instance: the base one, and the one we are interacting with. + # because each optimiser can be direction specific, we need to test + # each optimiser in its chosen direction. + # unlike the TestProviderAdapter we dont want to automatically add a + # parameterized one for WorkingTree - the optimisers will tell us what + # ones to add. + from bzrlib.tests.per_tree import ( + return_parameter, + ) + from bzrlib.tests.per_intertree import ( + make_scenarios, + ) + from bzrlib.workingtree_3 import WorkingTreeFormat3 + from bzrlib.workingtree_4 import WorkingTreeFormat4 + input_test = TestInterTreeScenarios( + "test_scenarios") + server1 = "a" + server2 = "b" + format1 = WorkingTreeFormat4() + format2 = WorkingTreeFormat3() + formats = [("1", str, format1, format2, "converter1"), + ("2", int, format2, format1, "converter2")] + scenarios = make_scenarios(server1, server2, formats) + self.assertEqual(2, len(scenarios)) + expected_scenarios = [ + ("1", { + "bzrdir_format": format1._matchingbzrdir, + "intertree_class": formats[0][1], + "workingtree_format": formats[0][2], + "workingtree_format_to": formats[0][3], + "mutable_trees_to_test_trees": formats[0][4], + "_workingtree_to_test_tree": return_parameter, + "transport_server": server1, + "transport_readonly_server": server2, + }), + ("2", { + "bzrdir_format": format2._matchingbzrdir, + "intertree_class": formats[1][1], + "workingtree_format": formats[1][2], + "workingtree_format_to": formats[1][3], + "mutable_trees_to_test_trees": formats[1][4], + "_workingtree_to_test_tree": return_parameter, + "transport_server": server1, + "transport_readonly_server": server2, + }), + ] + self.assertEqual(scenarios, expected_scenarios) + + +class TestTestCaseInTempDir(tests.TestCaseInTempDir): + + def test_home_is_not_working(self): + self.assertNotEqual(self.test_dir, self.test_home_dir) + cwd = osutils.getcwd() + self.assertIsSameRealPath(self.test_dir, cwd) + self.assertIsSameRealPath(self.test_home_dir, os.environ['HOME']) + + def test_assertEqualStat_equal(self): + from bzrlib.tests.test_dirstate import _FakeStat + self.build_tree(["foo"]) + real = os.lstat("foo") + fake = _FakeStat(real.st_size, real.st_mtime, real.st_ctime, + real.st_dev, real.st_ino, real.st_mode) + self.assertEqualStat(real, fake) + + def test_assertEqualStat_notequal(self): + self.build_tree(["foo", "longname"]) + self.assertRaises(AssertionError, self.assertEqualStat, + os.lstat("foo"), os.lstat("longname")) + + def test_failUnlessExists(self): + """Deprecated failUnlessExists and failIfExists""" + self.applyDeprecated( + deprecated_in((2, 4)), + self.failUnlessExists, '.') + self.build_tree(['foo/', 'foo/bar']) + self.applyDeprecated( + deprecated_in((2, 4)), + self.failUnlessExists, 'foo/bar') + self.applyDeprecated( + deprecated_in((2, 4)), + self.failIfExists, 'foo/foo') + + def test_assertPathExists(self): + self.assertPathExists('.') + self.build_tree(['foo/', 'foo/bar']) + self.assertPathExists('foo/bar') + self.assertPathDoesNotExist('foo/foo') + + +class TestTestCaseWithMemoryTransport(tests.TestCaseWithMemoryTransport): + + def test_home_is_non_existant_dir_under_root(self): + """The test_home_dir for TestCaseWithMemoryTransport is missing. + + This is because TestCaseWithMemoryTransport is for tests that do not + need any disk resources: they should be hooked into bzrlib in such a + way that no global settings are being changed by the test (only a + few tests should need to do that), and having a missing dir as home is + an effective way to ensure that this is the case. + """ + self.assertIsSameRealPath( + self.TEST_ROOT + "/MemoryTransportMissingHomeDir", + self.test_home_dir) + self.assertIsSameRealPath(self.test_home_dir, os.environ['HOME']) + + def test_cwd_is_TEST_ROOT(self): + self.assertIsSameRealPath(self.test_dir, self.TEST_ROOT) + cwd = osutils.getcwd() + self.assertIsSameRealPath(self.test_dir, cwd) + + def test_BZR_HOME_and_HOME_are_bytestrings(self): + """The $BZR_HOME and $HOME environment variables should not be unicode. + + See https://bugs.launchpad.net/bzr/+bug/464174 + """ + self.assertIsInstance(os.environ['BZR_HOME'], str) + self.assertIsInstance(os.environ['HOME'], str) + + def test_make_branch_and_memory_tree(self): + """In TestCaseWithMemoryTransport we should not make the branch on disk. + + This is hard to comprehensively robustly test, so we settle for making + a branch and checking no directory was created at its relpath. + """ + tree = self.make_branch_and_memory_tree('dir') + # Guard against regression into MemoryTransport leaking + # files to disk instead of keeping them in memory. + self.assertFalse(osutils.lexists('dir')) + self.assertIsInstance(tree, memorytree.MemoryTree) + + def test_make_branch_and_memory_tree_with_format(self): + """make_branch_and_memory_tree should accept a format option.""" + format = bzrdir.BzrDirMetaFormat1() + format.repository_format = repository.format_registry.get_default() + tree = self.make_branch_and_memory_tree('dir', format=format) + # Guard against regression into MemoryTransport leaking + # files to disk instead of keeping them in memory. + self.assertFalse(osutils.lexists('dir')) + self.assertIsInstance(tree, memorytree.MemoryTree) + self.assertEqual(format.repository_format.__class__, + tree.branch.repository._format.__class__) + + def test_make_branch_builder(self): + builder = self.make_branch_builder('dir') + self.assertIsInstance(builder, branchbuilder.BranchBuilder) + # Guard against regression into MemoryTransport leaking + # files to disk instead of keeping them in memory. + self.assertFalse(osutils.lexists('dir')) + + def test_make_branch_builder_with_format(self): + # Use a repo layout that doesn't conform to a 'named' layout, to ensure + # that the format objects are used. + format = bzrdir.BzrDirMetaFormat1() + repo_format = repository.format_registry.get_default() + format.repository_format = repo_format + builder = self.make_branch_builder('dir', format=format) + the_branch = builder.get_branch() + # Guard against regression into MemoryTransport leaking + # files to disk instead of keeping them in memory. + self.assertFalse(osutils.lexists('dir')) + self.assertEqual(format.repository_format.__class__, + the_branch.repository._format.__class__) + self.assertEqual(repo_format.get_format_string(), + self.get_transport().get_bytes( + 'dir/.bzr/repository/format')) + + def test_make_branch_builder_with_format_name(self): + builder = self.make_branch_builder('dir', format='knit') + the_branch = builder.get_branch() + # Guard against regression into MemoryTransport leaking + # files to disk instead of keeping them in memory. + self.assertFalse(osutils.lexists('dir')) + dir_format = controldir.format_registry.make_bzrdir('knit') + self.assertEqual(dir_format.repository_format.__class__, + the_branch.repository._format.__class__) + self.assertEqual('Bazaar-NG Knit Repository Format 1', + self.get_transport().get_bytes( + 'dir/.bzr/repository/format')) + + def test_dangling_locks_cause_failures(self): + class TestDanglingLock(tests.TestCaseWithMemoryTransport): + def test_function(self): + t = self.get_transport_from_path('.') + l = lockdir.LockDir(t, 'lock') + l.create() + l.attempt_lock() + test = TestDanglingLock('test_function') + result = test.run() + total_failures = result.errors + result.failures + if self._lock_check_thorough: + self.assertEqual(1, len(total_failures)) + else: + # When _lock_check_thorough is disabled, then we don't trigger a + # failure + self.assertEqual(0, len(total_failures)) + + +class TestTestCaseWithTransport(tests.TestCaseWithTransport): + """Tests for the convenience functions TestCaseWithTransport introduces.""" + + def test_get_readonly_url_none(self): + from bzrlib.transport.readonly import ReadonlyTransportDecorator + self.vfs_transport_factory = memory.MemoryServer + self.transport_readonly_server = None + # calling get_readonly_transport() constructs a decorator on the url + # for the server + url = self.get_readonly_url() + url2 = self.get_readonly_url('foo/bar') + t = transport.get_transport_from_url(url) + t2 = transport.get_transport_from_url(url2) + self.assertIsInstance(t, ReadonlyTransportDecorator) + self.assertIsInstance(t2, ReadonlyTransportDecorator) + self.assertEqual(t2.base[:-1], t.abspath('foo/bar')) + + def test_get_readonly_url_http(self): + from bzrlib.tests.http_server import HttpServer + from bzrlib.transport.http import HttpTransportBase + self.transport_server = test_server.LocalURLServer + self.transport_readonly_server = HttpServer + # calling get_readonly_transport() gives us a HTTP server instance. + url = self.get_readonly_url() + url2 = self.get_readonly_url('foo/bar') + # the transport returned may be any HttpTransportBase subclass + t = transport.get_transport_from_url(url) + t2 = transport.get_transport_from_url(url2) + self.assertIsInstance(t, HttpTransportBase) + self.assertIsInstance(t2, HttpTransportBase) + self.assertEqual(t2.base[:-1], t.abspath('foo/bar')) + + def test_is_directory(self): + """Test assertIsDirectory assertion""" + t = self.get_transport() + self.build_tree(['a_dir/', 'a_file'], transport=t) + self.assertIsDirectory('a_dir', t) + self.assertRaises(AssertionError, self.assertIsDirectory, 'a_file', t) + self.assertRaises(AssertionError, self.assertIsDirectory, 'not_here', t) + + def test_make_branch_builder(self): + builder = self.make_branch_builder('dir') + rev_id = builder.build_commit() + self.assertPathExists('dir') + a_dir = controldir.ControlDir.open('dir') + self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree) + a_branch = a_dir.open_branch() + builder_branch = builder.get_branch() + self.assertEqual(a_branch.base, builder_branch.base) + self.assertEqual((1, rev_id), builder_branch.last_revision_info()) + self.assertEqual((1, rev_id), a_branch.last_revision_info()) + + +class TestTestCaseTransports(tests.TestCaseWithTransport): + + def setUp(self): + super(TestTestCaseTransports, self).setUp() + self.vfs_transport_factory = memory.MemoryServer + + def test_make_bzrdir_preserves_transport(self): + t = self.get_transport() + result_bzrdir = self.make_bzrdir('subdir') + self.assertIsInstance(result_bzrdir.transport, + memory.MemoryTransport) + # should not be on disk, should only be in memory + self.assertPathDoesNotExist('subdir') + + +class TestChrootedTest(tests.ChrootedTestCase): + + def test_root_is_root(self): + t = transport.get_transport_from_url(self.get_readonly_url()) + url = t.base + self.assertEqual(url, t.clone('..').base) + + +class TestProfileResult(tests.TestCase): + + def test_profiles_tests(self): + self.requireFeature(features.lsprof_feature) + terminal = testtools.testresult.doubles.ExtendedTestResult() + result = tests.ProfileResult(terminal) + class Sample(tests.TestCase): + def a(self): + self.sample_function() + def sample_function(self): + pass + test = Sample("a") + test.run(result) + case = terminal._events[0][1] + self.assertLength(1, case._benchcalls) + # We must be able to unpack it as the test reporting code wants + (_, _, _), stats = case._benchcalls[0] + self.assertTrue(callable(stats.pprint)) + + +class TestTestResult(tests.TestCase): + + def check_timing(self, test_case, expected_re): + result = bzrlib.tests.TextTestResult(self._log_file, + descriptions=0, + verbosity=1, + ) + capture = testtools.testresult.doubles.ExtendedTestResult() + test_case.run(MultiTestResult(result, capture)) + run_case = capture._events[0][1] + timed_string = result._testTimeString(run_case) + self.assertContainsRe(timed_string, expected_re) + + def test_test_reporting(self): + class ShortDelayTestCase(tests.TestCase): + def test_short_delay(self): + time.sleep(0.003) + def test_short_benchmark(self): + self.time(time.sleep, 0.003) + self.check_timing(ShortDelayTestCase('test_short_delay'), + r"^ +[0-9]+ms$") + # if a benchmark time is given, we now show just that time followed by + # a star + self.check_timing(ShortDelayTestCase('test_short_benchmark'), + r"^ +[0-9]+ms\*$") + + def test_unittest_reporting_unittest_class(self): + # getting the time from a non-bzrlib test works ok + class ShortDelayTestCase(unittest.TestCase): + def test_short_delay(self): + time.sleep(0.003) + self.check_timing(ShortDelayTestCase('test_short_delay'), + r"^ +[0-9]+ms$") + + def _time_hello_world_encoding(self): + """Profile two sleep calls + + This is used to exercise the test framework. + """ + self.time(unicode, 'hello', errors='replace') + self.time(unicode, 'world', errors='replace') + + def test_lsprofiling(self): + """Verbose test result prints lsprof statistics from test cases.""" + self.requireFeature(features.lsprof_feature) + result_stream = StringIO() + result = bzrlib.tests.VerboseTestResult( + result_stream, + descriptions=0, + verbosity=2, + ) + # we want profile a call of some sort and check it is output by + # addSuccess. We dont care about addError or addFailure as they + # are not that interesting for performance tuning. + # make a new test instance that when run will generate a profile + example_test_case = TestTestResult("_time_hello_world_encoding") + example_test_case._gather_lsprof_in_benchmarks = True + # execute the test, which should succeed and record profiles + example_test_case.run(result) + # lsprofile_something() + # if this worked we want + # LSProf output for <built in function unicode> (['hello'], {'errors': 'replace'}) + # CallCount Recursive Total(ms) Inline(ms) module:lineno(function) + # (the lsprof header) + # ... an arbitrary number of lines + # and the function call which is time.sleep. + # 1 0 ??? ??? ???(sleep) + # and then repeated but with 'world', rather than 'hello'. + # this should appear in the output stream of our test result. + output = result_stream.getvalue() + self.assertContainsRe(output, + r"LSProf output for <type 'unicode'>\(\('hello',\), {'errors': 'replace'}\)") + self.assertContainsRe(output, + r" *CallCount *Recursive *Total\(ms\) *Inline\(ms\) *module:lineno\(function\)\n") + self.assertContainsRe(output, + r"( +1 +0 +0\.\d+ +0\.\d+ +<method 'disable' of '_lsprof\.Profiler' objects>\n)?") + self.assertContainsRe(output, + r"LSProf output for <type 'unicode'>\(\('world',\), {'errors': 'replace'}\)\n") + + def test_uses_time_from_testtools(self): + """Test case timings in verbose results should use testtools times""" + import datetime + class TimeAddedVerboseTestResult(tests.VerboseTestResult): + def startTest(self, test): + self.time(datetime.datetime.utcfromtimestamp(1.145)) + super(TimeAddedVerboseTestResult, self).startTest(test) + def addSuccess(self, test): + self.time(datetime.datetime.utcfromtimestamp(51.147)) + super(TimeAddedVerboseTestResult, self).addSuccess(test) + def report_tests_starting(self): pass + sio = StringIO() + self.get_passing_test().run(TimeAddedVerboseTestResult(sio, 0, 2)) + self.assertEndsWith(sio.getvalue(), "OK 50002ms\n") + + def test_known_failure(self): + """Using knownFailure should trigger several result actions.""" + class InstrumentedTestResult(tests.ExtendedTestResult): + def stopTestRun(self): pass + def report_tests_starting(self): pass + def report_known_failure(self, test, err=None, details=None): + self._call = test, 'known failure' + result = InstrumentedTestResult(None, None, None, None) + class Test(tests.TestCase): + def test_function(self): + self.knownFailure('failed!') + test = Test("test_function") + test.run(result) + # it should invoke 'report_known_failure'. + self.assertEqual(2, len(result._call)) + self.assertEqual(test.id(), result._call[0].id()) + self.assertEqual('known failure', result._call[1]) + # we dont introspec the traceback, if the rest is ok, it would be + # exceptional for it not to be. + # it should update the known_failure_count on the object. + self.assertEqual(1, result.known_failure_count) + # the result should be successful. + self.assertTrue(result.wasSuccessful()) + + def test_verbose_report_known_failure(self): + # verbose test output formatting + result_stream = StringIO() + result = bzrlib.tests.VerboseTestResult( + result_stream, + descriptions=0, + verbosity=2, + ) + _get_test("test_xfail").run(result) + self.assertContainsRe(result_stream.getvalue(), + "\n\\S+\\.test_xfail\\s+XFAIL\\s+\\d+ms\n" + "\\s*(?:Text attachment: )?reason" + "(?:\n-+\n|: {{{)" + "this_fails" + "(?:\n-+\n|}}}\n)") + + def get_passing_test(self): + """Return a test object that can't be run usefully.""" + def passing_test(): + pass + return unittest.FunctionTestCase(passing_test) + + def test_add_not_supported(self): + """Test the behaviour of invoking addNotSupported.""" + class InstrumentedTestResult(tests.ExtendedTestResult): + def stopTestRun(self): pass + def report_tests_starting(self): pass + def report_unsupported(self, test, feature): + self._call = test, feature + result = InstrumentedTestResult(None, None, None, None) + test = SampleTestCase('_test_pass') + feature = features.Feature() + result.startTest(test) + result.addNotSupported(test, feature) + # it should invoke 'report_unsupported'. + self.assertEqual(2, len(result._call)) + self.assertEqual(test, result._call[0]) + self.assertEqual(feature, result._call[1]) + # the result should be successful. + self.assertTrue(result.wasSuccessful()) + # it should record the test against a count of tests not run due to + # this feature. + self.assertEqual(1, result.unsupported['Feature']) + # and invoking it again should increment that counter + result.addNotSupported(test, feature) + self.assertEqual(2, result.unsupported['Feature']) + + def test_verbose_report_unsupported(self): + # verbose test output formatting + result_stream = StringIO() + result = bzrlib.tests.VerboseTestResult( + result_stream, + descriptions=0, + verbosity=2, + ) + test = self.get_passing_test() + feature = features.Feature() + result.startTest(test) + prefix = len(result_stream.getvalue()) + result.report_unsupported(test, feature) + output = result_stream.getvalue()[prefix:] + lines = output.splitlines() + # We don't check for the final '0ms' since it may fail on slow hosts + self.assertStartsWith(lines[0], 'NODEP') + self.assertEqual(lines[1], + " The feature 'Feature' is not available.") + + def test_unavailable_exception(self): + """An UnavailableFeature being raised should invoke addNotSupported.""" + class InstrumentedTestResult(tests.ExtendedTestResult): + def stopTestRun(self): pass + def report_tests_starting(self): pass + def addNotSupported(self, test, feature): + self._call = test, feature + result = InstrumentedTestResult(None, None, None, None) + feature = features.Feature() + class Test(tests.TestCase): + def test_function(self): + raise tests.UnavailableFeature(feature) + test = Test("test_function") + test.run(result) + # it should invoke 'addNotSupported'. + self.assertEqual(2, len(result._call)) + self.assertEqual(test.id(), result._call[0].id()) + self.assertEqual(feature, result._call[1]) + # and not count as an error + self.assertEqual(0, result.error_count) + + def test_strict_with_unsupported_feature(self): + result = bzrlib.tests.TextTestResult(self._log_file, descriptions=0, + verbosity=1) + test = self.get_passing_test() + feature = "Unsupported Feature" + result.addNotSupported(test, feature) + self.assertFalse(result.wasStrictlySuccessful()) + self.assertEqual(None, result._extractBenchmarkTime(test)) + + def test_strict_with_known_failure(self): + result = bzrlib.tests.TextTestResult(self._log_file, descriptions=0, + verbosity=1) + test = _get_test("test_xfail") + test.run(result) + self.assertFalse(result.wasStrictlySuccessful()) + self.assertEqual(None, result._extractBenchmarkTime(test)) + + def test_strict_with_success(self): + result = bzrlib.tests.TextTestResult(self._log_file, descriptions=0, + verbosity=1) + test = self.get_passing_test() + result.addSuccess(test) + self.assertTrue(result.wasStrictlySuccessful()) + self.assertEqual(None, result._extractBenchmarkTime(test)) + + def test_startTests(self): + """Starting the first test should trigger startTests.""" + class InstrumentedTestResult(tests.ExtendedTestResult): + calls = 0 + def startTests(self): self.calls += 1 + result = InstrumentedTestResult(None, None, None, None) + def test_function(): + pass + test = unittest.FunctionTestCase(test_function) + test.run(result) + self.assertEquals(1, result.calls) + + def test_startTests_only_once(self): + """With multiple tests startTests should still only be called once""" + class InstrumentedTestResult(tests.ExtendedTestResult): + calls = 0 + def startTests(self): self.calls += 1 + result = InstrumentedTestResult(None, None, None, None) + suite = unittest.TestSuite([ + unittest.FunctionTestCase(lambda: None), + unittest.FunctionTestCase(lambda: None)]) + suite.run(result) + self.assertEquals(1, result.calls) + self.assertEquals(2, result.count) + + +class TestRunner(tests.TestCase): + + def dummy_test(self): + pass + + def run_test_runner(self, testrunner, test): + """Run suite in testrunner, saving global state and restoring it. + + This current saves and restores: + TestCaseInTempDir.TEST_ROOT + + There should be no tests in this file that use + bzrlib.tests.TextTestRunner without using this convenience method, + because of our use of global state. + """ + old_root = tests.TestCaseInTempDir.TEST_ROOT + try: + tests.TestCaseInTempDir.TEST_ROOT = None + return testrunner.run(test) + finally: + tests.TestCaseInTempDir.TEST_ROOT = old_root + + def test_known_failure_failed_run(self): + # run a test that generates a known failure which should be printed in + # the final output when real failures occur. + class Test(tests.TestCase): + def known_failure_test(self): + self.expectFailure('failed', self.assertTrue, False) + test = unittest.TestSuite() + test.addTest(Test("known_failure_test")) + def failing_test(): + raise AssertionError('foo') + test.addTest(unittest.FunctionTestCase(failing_test)) + stream = StringIO() + runner = tests.TextTestRunner(stream=stream) + result = self.run_test_runner(runner, test) + lines = stream.getvalue().splitlines() + self.assertContainsRe(stream.getvalue(), + '(?sm)^bzr selftest.*$' + '.*' + '^======================================================================\n' + '^FAIL: failing_test\n' + '^----------------------------------------------------------------------\n' + 'Traceback \\(most recent call last\\):\n' + ' .*' # File .*, line .*, in failing_test' - but maybe not from .pyc + ' raise AssertionError\\(\'foo\'\\)\n' + '.*' + '^----------------------------------------------------------------------\n' + '.*' + 'FAILED \\(failures=1, known_failure_count=1\\)' + ) + + def test_known_failure_ok_run(self): + # run a test that generates a known failure which should be printed in + # the final output. + class Test(tests.TestCase): + def known_failure_test(self): + self.knownFailure("Never works...") + test = Test("known_failure_test") + stream = StringIO() + runner = tests.TextTestRunner(stream=stream) + result = self.run_test_runner(runner, test) + self.assertContainsRe(stream.getvalue(), + '\n' + '-*\n' + 'Ran 1 test in .*\n' + '\n' + 'OK \\(known_failures=1\\)\n') + + def test_unexpected_success_bad(self): + class Test(tests.TestCase): + def test_truth(self): + self.expectFailure("No absolute truth", self.assertTrue, True) + runner = tests.TextTestRunner(stream=StringIO()) + result = self.run_test_runner(runner, Test("test_truth")) + self.assertContainsRe(runner.stream.getvalue(), + "=+\n" + "FAIL: \\S+\.test_truth\n" + "-+\n" + "(?:.*\n)*" + "\\s*(?:Text attachment: )?reason" + "(?:\n-+\n|: {{{)" + "No absolute truth" + "(?:\n-+\n|}}}\n)" + "(?:.*\n)*" + "-+\n" + "Ran 1 test in .*\n" + "\n" + "FAILED \\(failures=1\\)\n\\Z") + + def test_result_decorator(self): + # decorate results + calls = [] + class LoggingDecorator(ExtendedToOriginalDecorator): + def startTest(self, test): + ExtendedToOriginalDecorator.startTest(self, test) + calls.append('start') + test = unittest.FunctionTestCase(lambda:None) + stream = StringIO() + runner = tests.TextTestRunner(stream=stream, + result_decorators=[LoggingDecorator]) + result = self.run_test_runner(runner, test) + self.assertLength(1, calls) + + def test_skipped_test(self): + # run a test that is skipped, and check the suite as a whole still + # succeeds. + # skipping_test must be hidden in here so it's not run as a real test + class SkippingTest(tests.TestCase): + def skipping_test(self): + raise tests.TestSkipped('test intentionally skipped') + runner = tests.TextTestRunner(stream=self._log_file) + test = SkippingTest("skipping_test") + result = self.run_test_runner(runner, test) + self.assertTrue(result.wasSuccessful()) + + def test_skipped_from_setup(self): + calls = [] + class SkippedSetupTest(tests.TestCase): + + def setUp(self): + calls.append('setUp') + self.addCleanup(self.cleanup) + raise tests.TestSkipped('skipped setup') + + def test_skip(self): + self.fail('test reached') + + def cleanup(self): + calls.append('cleanup') + + runner = tests.TextTestRunner(stream=self._log_file) + test = SkippedSetupTest('test_skip') + result = self.run_test_runner(runner, test) + self.assertTrue(result.wasSuccessful()) + # Check if cleanup was called the right number of times. + self.assertEqual(['setUp', 'cleanup'], calls) + + def test_skipped_from_test(self): + calls = [] + class SkippedTest(tests.TestCase): + + def setUp(self): + tests.TestCase.setUp(self) + calls.append('setUp') + self.addCleanup(self.cleanup) + + def test_skip(self): + raise tests.TestSkipped('skipped test') + + def cleanup(self): + calls.append('cleanup') + + runner = tests.TextTestRunner(stream=self._log_file) + test = SkippedTest('test_skip') + result = self.run_test_runner(runner, test) + self.assertTrue(result.wasSuccessful()) + # Check if cleanup was called the right number of times. + self.assertEqual(['setUp', 'cleanup'], calls) + + def test_not_applicable(self): + # run a test that is skipped because it's not applicable + class Test(tests.TestCase): + def not_applicable_test(self): + raise tests.TestNotApplicable('this test never runs') + out = StringIO() + runner = tests.TextTestRunner(stream=out, verbosity=2) + test = Test("not_applicable_test") + result = self.run_test_runner(runner, test) + self._log_file.write(out.getvalue()) + self.assertTrue(result.wasSuccessful()) + self.assertTrue(result.wasStrictlySuccessful()) + self.assertContainsRe(out.getvalue(), + r'(?m)not_applicable_test * N/A') + self.assertContainsRe(out.getvalue(), + r'(?m)^ this test never runs') + + def test_unsupported_features_listed(self): + """When unsupported features are encountered they are detailed.""" + class Feature1(features.Feature): + def _probe(self): return False + class Feature2(features.Feature): + def _probe(self): return False + # create sample tests + test1 = SampleTestCase('_test_pass') + test1._test_needs_features = [Feature1()] + test2 = SampleTestCase('_test_pass') + test2._test_needs_features = [Feature2()] + test = unittest.TestSuite() + test.addTest(test1) + test.addTest(test2) + stream = StringIO() + runner = tests.TextTestRunner(stream=stream) + result = self.run_test_runner(runner, test) + lines = stream.getvalue().splitlines() + self.assertEqual([ + 'OK', + "Missing feature 'Feature1' skipped 1 tests.", + "Missing feature 'Feature2' skipped 1 tests.", + ], + lines[-3:]) + + def test_verbose_test_count(self): + """A verbose test run reports the right test count at the start""" + suite = TestUtil.TestSuite([ + unittest.FunctionTestCase(lambda:None), + unittest.FunctionTestCase(lambda:None)]) + self.assertEqual(suite.countTestCases(), 2) + stream = StringIO() + runner = tests.TextTestRunner(stream=stream, verbosity=2) + # Need to use the CountingDecorator as that's what sets num_tests + result = self.run_test_runner(runner, tests.CountingDecorator(suite)) + self.assertStartsWith(stream.getvalue(), "running 2 tests") + + def test_startTestRun(self): + """run should call result.startTestRun()""" + calls = [] + class LoggingDecorator(ExtendedToOriginalDecorator): + def startTestRun(self): + ExtendedToOriginalDecorator.startTestRun(self) + calls.append('startTestRun') + test = unittest.FunctionTestCase(lambda:None) + stream = StringIO() + runner = tests.TextTestRunner(stream=stream, + result_decorators=[LoggingDecorator]) + result = self.run_test_runner(runner, test) + self.assertLength(1, calls) + + def test_stopTestRun(self): + """run should call result.stopTestRun()""" + calls = [] + class LoggingDecorator(ExtendedToOriginalDecorator): + def stopTestRun(self): + ExtendedToOriginalDecorator.stopTestRun(self) + calls.append('stopTestRun') + test = unittest.FunctionTestCase(lambda:None) + stream = StringIO() + runner = tests.TextTestRunner(stream=stream, + result_decorators=[LoggingDecorator]) + result = self.run_test_runner(runner, test) + self.assertLength(1, calls) + + def test_unicode_test_output_on_ascii_stream(self): + """Showing results should always succeed even on an ascii console""" + class FailureWithUnicode(tests.TestCase): + def test_log_unicode(self): + self.log(u"\u2606") + self.fail("Now print that log!") + out = StringIO() + self.overrideAttr(osutils, "get_terminal_encoding", + lambda trace=False: "ascii") + result = self.run_test_runner(tests.TextTestRunner(stream=out), + FailureWithUnicode("test_log_unicode")) + self.assertContainsRe(out.getvalue(), + "(?:Text attachment: )?log" + "(?:\n-+\n|: {{{)" + "\d+\.\d+ \\\\u2606" + "(?:\n-+\n|}}}\n)") + + +class SampleTestCase(tests.TestCase): + + def _test_pass(self): + pass + +class _TestException(Exception): + pass + + +class TestTestCase(tests.TestCase): + """Tests that test the core bzrlib TestCase.""" + + def test_assertLength_matches_empty(self): + a_list = [] + self.assertLength(0, a_list) + + def test_assertLength_matches_nonempty(self): + a_list = [1, 2, 3] + self.assertLength(3, a_list) + + def test_assertLength_fails_different(self): + a_list = [] + self.assertRaises(AssertionError, self.assertLength, 1, a_list) + + def test_assertLength_shows_sequence_in_failure(self): + a_list = [1, 2, 3] + exception = self.assertRaises(AssertionError, self.assertLength, 2, + a_list) + self.assertEqual('Incorrect length: wanted 2, got 3 for [1, 2, 3]', + exception.args[0]) + + def test_base_setUp_not_called_causes_failure(self): + class TestCaseWithBrokenSetUp(tests.TestCase): + def setUp(self): + pass # does not call TestCase.setUp + def test_foo(self): + pass + test = TestCaseWithBrokenSetUp('test_foo') + result = unittest.TestResult() + test.run(result) + self.assertFalse(result.wasSuccessful()) + self.assertEqual(1, result.testsRun) + + def test_base_tearDown_not_called_causes_failure(self): + class TestCaseWithBrokenTearDown(tests.TestCase): + def tearDown(self): + pass # does not call TestCase.tearDown + def test_foo(self): + pass + test = TestCaseWithBrokenTearDown('test_foo') + result = unittest.TestResult() + test.run(result) + self.assertFalse(result.wasSuccessful()) + self.assertEqual(1, result.testsRun) + + def test_debug_flags_sanitised(self): + """The bzrlib debug flags should be sanitised by setUp.""" + if 'allow_debug' in tests.selftest_debug_flags: + raise tests.TestNotApplicable( + '-Eallow_debug option prevents debug flag sanitisation') + # we could set something and run a test that will check + # it gets santised, but this is probably sufficient for now: + # if someone runs the test with -Dsomething it will error. + flags = set() + if self._lock_check_thorough: + flags.add('strict_locks') + self.assertEqual(flags, bzrlib.debug.debug_flags) + + def change_selftest_debug_flags(self, new_flags): + self.overrideAttr(tests, 'selftest_debug_flags', set(new_flags)) + + def test_allow_debug_flag(self): + """The -Eallow_debug flag prevents bzrlib.debug.debug_flags from being + sanitised (i.e. cleared) before running a test. + """ + self.change_selftest_debug_flags(set(['allow_debug'])) + bzrlib.debug.debug_flags = set(['a-flag']) + class TestThatRecordsFlags(tests.TestCase): + def test_foo(nested_self): + self.flags = set(bzrlib.debug.debug_flags) + test = TestThatRecordsFlags('test_foo') + test.run(self.make_test_result()) + flags = set(['a-flag']) + if 'disable_lock_checks' not in tests.selftest_debug_flags: + flags.add('strict_locks') + self.assertEqual(flags, self.flags) + + def test_disable_lock_checks(self): + """The -Edisable_lock_checks flag disables thorough checks.""" + class TestThatRecordsFlags(tests.TestCase): + def test_foo(nested_self): + self.flags = set(bzrlib.debug.debug_flags) + self.test_lock_check_thorough = nested_self._lock_check_thorough + self.change_selftest_debug_flags(set()) + test = TestThatRecordsFlags('test_foo') + test.run(self.make_test_result()) + # By default we do strict lock checking and thorough lock/unlock + # tracking. + self.assertTrue(self.test_lock_check_thorough) + self.assertEqual(set(['strict_locks']), self.flags) + # Now set the disable_lock_checks flag, and show that this changed. + self.change_selftest_debug_flags(set(['disable_lock_checks'])) + test = TestThatRecordsFlags('test_foo') + test.run(self.make_test_result()) + self.assertFalse(self.test_lock_check_thorough) + self.assertEqual(set(), self.flags) + + def test_this_fails_strict_lock_check(self): + class TestThatRecordsFlags(tests.TestCase): + def test_foo(nested_self): + self.flags1 = set(bzrlib.debug.debug_flags) + self.thisFailsStrictLockCheck() + self.flags2 = set(bzrlib.debug.debug_flags) + # Make sure lock checking is active + self.change_selftest_debug_flags(set()) + test = TestThatRecordsFlags('test_foo') + test.run(self.make_test_result()) + self.assertEqual(set(['strict_locks']), self.flags1) + self.assertEqual(set(), self.flags2) + + def test_debug_flags_restored(self): + """The bzrlib debug flags should be restored to their original state + after the test was run, even if allow_debug is set. + """ + self.change_selftest_debug_flags(set(['allow_debug'])) + # Now run a test that modifies debug.debug_flags. + bzrlib.debug.debug_flags = set(['original-state']) + class TestThatModifiesFlags(tests.TestCase): + def test_foo(self): + bzrlib.debug.debug_flags = set(['modified']) + test = TestThatModifiesFlags('test_foo') + test.run(self.make_test_result()) + self.assertEqual(set(['original-state']), bzrlib.debug.debug_flags) + + def make_test_result(self): + """Get a test result that writes to the test log file.""" + return tests.TextTestResult(self._log_file, descriptions=0, verbosity=1) + + def inner_test(self): + # the inner child test + note("inner_test") + + def outer_child(self): + # the outer child test + note("outer_start") + self.inner_test = TestTestCase("inner_child") + result = self.make_test_result() + self.inner_test.run(result) + note("outer finish") + self.addCleanup(osutils.delete_any, self._log_file_name) + + def test_trace_nesting(self): + # this tests that each test case nests its trace facility correctly. + # we do this by running a test case manually. That test case (A) + # should setup a new log, log content to it, setup a child case (B), + # which should log independently, then case (A) should log a trailer + # and return. + # we do two nested children so that we can verify the state of the + # logs after the outer child finishes is correct, which a bad clean + # up routine in tearDown might trigger a fault in our test with only + # one child, we should instead see the bad result inside our test with + # the two children. + # the outer child test + original_trace = bzrlib.trace._trace_file + outer_test = TestTestCase("outer_child") + result = self.make_test_result() + outer_test.run(result) + self.assertEqual(original_trace, bzrlib.trace._trace_file) + + def method_that_times_a_bit_twice(self): + # call self.time twice to ensure it aggregates + self.time(time.sleep, 0.007) + self.time(time.sleep, 0.007) + + def test_time_creates_benchmark_in_result(self): + """Test that the TestCase.time() method accumulates a benchmark time.""" + sample_test = TestTestCase("method_that_times_a_bit_twice") + output_stream = StringIO() + result = bzrlib.tests.VerboseTestResult( + output_stream, + descriptions=0, + verbosity=2) + sample_test.run(result) + self.assertContainsRe( + output_stream.getvalue(), + r"\d+ms\*\n$") + + def test_hooks_sanitised(self): + """The bzrlib hooks should be sanitised by setUp.""" + # Note this test won't fail with hooks that the core library doesn't + # use - but it trigger with a plugin that adds hooks, so its still a + # useful warning in that case. + self.assertEqual(bzrlib.branch.BranchHooks(), bzrlib.branch.Branch.hooks) + self.assertEqual( + bzrlib.smart.server.SmartServerHooks(), + bzrlib.smart.server.SmartTCPServer.hooks) + self.assertEqual( + bzrlib.commands.CommandHooks(), bzrlib.commands.Command.hooks) + + def test__gather_lsprof_in_benchmarks(self): + """When _gather_lsprof_in_benchmarks is on, accumulate profile data. + + Each self.time() call is individually and separately profiled. + """ + self.requireFeature(features.lsprof_feature) + # overrides the class member with an instance member so no cleanup + # needed. + self._gather_lsprof_in_benchmarks = True + self.time(time.sleep, 0.000) + self.time(time.sleep, 0.003) + self.assertEqual(2, len(self._benchcalls)) + self.assertEqual((time.sleep, (0.000,), {}), self._benchcalls[0][0]) + self.assertEqual((time.sleep, (0.003,), {}), self._benchcalls[1][0]) + self.assertIsInstance(self._benchcalls[0][1], bzrlib.lsprof.Stats) + self.assertIsInstance(self._benchcalls[1][1], bzrlib.lsprof.Stats) + del self._benchcalls[:] + + def test_knownFailure(self): + """Self.knownFailure() should raise a KnownFailure exception.""" + self.assertRaises(tests.KnownFailure, self.knownFailure, "A Failure") + + def test_open_bzrdir_safe_roots(self): + # even a memory transport should fail to open when its url isn't + # permitted. + # Manually set one up (TestCase doesn't and shouldn't provide magic + # machinery) + transport_server = memory.MemoryServer() + transport_server.start_server() + self.addCleanup(transport_server.stop_server) + t = transport.get_transport_from_url(transport_server.get_url()) + controldir.ControlDir.create(t.base) + self.assertRaises(errors.BzrError, + controldir.ControlDir.open_from_transport, t) + # But if we declare this as safe, we can open the bzrdir. + self.permit_url(t.base) + self._bzr_selftest_roots.append(t.base) + controldir.ControlDir.open_from_transport(t) + + def test_requireFeature_available(self): + """self.requireFeature(available) is a no-op.""" + class Available(features.Feature): + def _probe(self):return True + feature = Available() + self.requireFeature(feature) + + def test_requireFeature_unavailable(self): + """self.requireFeature(unavailable) raises UnavailableFeature.""" + class Unavailable(features.Feature): + def _probe(self):return False + feature = Unavailable() + self.assertRaises(tests.UnavailableFeature, + self.requireFeature, feature) + + def test_run_no_parameters(self): + test = SampleTestCase('_test_pass') + test.run() + + def test_run_enabled_unittest_result(self): + """Test we revert to regular behaviour when the test is enabled.""" + test = SampleTestCase('_test_pass') + class EnabledFeature(object): + def available(self): + return True + test._test_needs_features = [EnabledFeature()] + result = unittest.TestResult() + test.run(result) + self.assertEqual(1, result.testsRun) + self.assertEqual([], result.errors) + self.assertEqual([], result.failures) + + def test_run_disabled_unittest_result(self): + """Test our compatability for disabled tests with unittest results.""" + test = SampleTestCase('_test_pass') + class DisabledFeature(object): + def available(self): + return False + test._test_needs_features = [DisabledFeature()] + result = unittest.TestResult() + test.run(result) + self.assertEqual(1, result.testsRun) + self.assertEqual([], result.errors) + self.assertEqual([], result.failures) + + def test_run_disabled_supporting_result(self): + """Test disabled tests behaviour with support aware results.""" + test = SampleTestCase('_test_pass') + class DisabledFeature(object): + def __eq__(self, other): + return isinstance(other, DisabledFeature) + def available(self): + return False + the_feature = DisabledFeature() + test._test_needs_features = [the_feature] + class InstrumentedTestResult(unittest.TestResult): + def __init__(self): + unittest.TestResult.__init__(self) + self.calls = [] + def startTest(self, test): + self.calls.append(('startTest', test)) + def stopTest(self, test): + self.calls.append(('stopTest', test)) + def addNotSupported(self, test, feature): + self.calls.append(('addNotSupported', test, feature)) + result = InstrumentedTestResult() + test.run(result) + case = result.calls[0][1] + self.assertEqual([ + ('startTest', case), + ('addNotSupported', case, the_feature), + ('stopTest', case), + ], + result.calls) + + def test_start_server_registers_url(self): + transport_server = memory.MemoryServer() + # A little strict, but unlikely to be changed soon. + self.assertEqual([], self._bzr_selftest_roots) + self.start_server(transport_server) + self.assertSubset([transport_server.get_url()], + self._bzr_selftest_roots) + + def test_assert_list_raises_on_generator(self): + def generator_which_will_raise(): + # This will not raise until after the first yield + yield 1 + raise _TestException() + + e = self.assertListRaises(_TestException, generator_which_will_raise) + self.assertIsInstance(e, _TestException) + + e = self.assertListRaises(Exception, generator_which_will_raise) + self.assertIsInstance(e, _TestException) + + def test_assert_list_raises_on_plain(self): + def plain_exception(): + raise _TestException() + return [] + + e = self.assertListRaises(_TestException, plain_exception) + self.assertIsInstance(e, _TestException) + + e = self.assertListRaises(Exception, plain_exception) + self.assertIsInstance(e, _TestException) + + def test_assert_list_raises_assert_wrong_exception(self): + class _NotTestException(Exception): + pass + + def wrong_exception(): + raise _NotTestException() + + def wrong_exception_generator(): + yield 1 + yield 2 + raise _NotTestException() + + # Wrong exceptions are not intercepted + self.assertRaises(_NotTestException, + self.assertListRaises, _TestException, wrong_exception) + self.assertRaises(_NotTestException, + self.assertListRaises, _TestException, wrong_exception_generator) + + def test_assert_list_raises_no_exception(self): + def success(): + return [] + + def success_generator(): + yield 1 + yield 2 + + self.assertRaises(AssertionError, + self.assertListRaises, _TestException, success) + + self.assertRaises(AssertionError, + self.assertListRaises, _TestException, success_generator) + + def test_overrideAttr_without_value(self): + self.test_attr = 'original' # Define a test attribute + obj = self # Make 'obj' visible to the embedded test + class Test(tests.TestCase): + + def setUp(self): + tests.TestCase.setUp(self) + self.orig = self.overrideAttr(obj, 'test_attr') + + def test_value(self): + self.assertEqual('original', self.orig) + self.assertEqual('original', obj.test_attr) + obj.test_attr = 'modified' + self.assertEqual('modified', obj.test_attr) + + test = Test('test_value') + test.run(unittest.TestResult()) + self.assertEqual('original', obj.test_attr) + + def test_overrideAttr_with_value(self): + self.test_attr = 'original' # Define a test attribute + obj = self # Make 'obj' visible to the embedded test + class Test(tests.TestCase): + + def setUp(self): + tests.TestCase.setUp(self) + self.orig = self.overrideAttr(obj, 'test_attr', new='modified') + + def test_value(self): + self.assertEqual('original', self.orig) + self.assertEqual('modified', obj.test_attr) + + test = Test('test_value') + test.run(unittest.TestResult()) + self.assertEqual('original', obj.test_attr) + + def test_recordCalls(self): + from bzrlib.tests import test_selftest + calls = self.recordCalls( + test_selftest, '_add_numbers') + self.assertEqual(test_selftest._add_numbers(2, 10), + 12) + self.assertEquals(calls, [((2, 10), {})]) + + +def _add_numbers(a, b): + return a + b + + +class _MissingFeature(features.Feature): + def _probe(self): + return False +missing_feature = _MissingFeature() + + +def _get_test(name): + """Get an instance of a specific example test. + + We protect this in a function so that they don't auto-run in the test + suite. + """ + + class ExampleTests(tests.TestCase): + + def test_fail(self): + mutter('this was a failing test') + self.fail('this test will fail') + + def test_error(self): + mutter('this test errored') + raise RuntimeError('gotcha') + + def test_missing_feature(self): + mutter('missing the feature') + self.requireFeature(missing_feature) + + def test_skip(self): + mutter('this test will be skipped') + raise tests.TestSkipped('reason') + + def test_success(self): + mutter('this test succeeds') + + def test_xfail(self): + mutter('test with expected failure') + self.knownFailure('this_fails') + + def test_unexpected_success(self): + mutter('test with unexpected success') + self.expectFailure('should_fail', lambda: None) + + return ExampleTests(name) + + +class TestTestCaseLogDetails(tests.TestCase): + + def _run_test(self, test_name): + test = _get_test(test_name) + result = testtools.TestResult() + test.run(result) + return result + + def test_fail_has_log(self): + result = self._run_test('test_fail') + self.assertEqual(1, len(result.failures)) + result_content = result.failures[0][1] + self.assertContainsRe(result_content, + '(?m)^(?:Text attachment: )?log(?:$|: )') + self.assertContainsRe(result_content, 'this was a failing test') + + def test_error_has_log(self): + result = self._run_test('test_error') + self.assertEqual(1, len(result.errors)) + result_content = result.errors[0][1] + self.assertContainsRe(result_content, + '(?m)^(?:Text attachment: )?log(?:$|: )') + self.assertContainsRe(result_content, 'this test errored') + + def test_skip_has_no_log(self): + result = self._run_test('test_skip') + self.assertEqual(['reason'], result.skip_reasons.keys()) + skips = result.skip_reasons['reason'] + self.assertEqual(1, len(skips)) + test = skips[0] + self.assertFalse('log' in test.getDetails()) + + def test_missing_feature_has_no_log(self): + # testtools doesn't know about addNotSupported, so it just gets + # considered as a skip + result = self._run_test('test_missing_feature') + self.assertEqual([missing_feature], result.skip_reasons.keys()) + skips = result.skip_reasons[missing_feature] + self.assertEqual(1, len(skips)) + test = skips[0] + self.assertFalse('log' in test.getDetails()) + + def test_xfail_has_no_log(self): + result = self._run_test('test_xfail') + self.assertEqual(1, len(result.expectedFailures)) + result_content = result.expectedFailures[0][1] + self.assertNotContainsRe(result_content, + '(?m)^(?:Text attachment: )?log(?:$|: )') + self.assertNotContainsRe(result_content, 'test with expected failure') + + def test_unexpected_success_has_log(self): + result = self._run_test('test_unexpected_success') + self.assertEqual(1, len(result.unexpectedSuccesses)) + # Inconsistency, unexpectedSuccesses is a list of tests, + # expectedFailures is a list of reasons? + test = result.unexpectedSuccesses[0] + details = test.getDetails() + self.assertTrue('log' in details) + + +class TestTestCloning(tests.TestCase): + """Tests that test cloning of TestCases (as used by multiply_tests).""" + + def test_cloned_testcase_does_not_share_details(self): + """A TestCase cloned with clone_test does not share mutable attributes + such as details or cleanups. + """ + class Test(tests.TestCase): + def test_foo(self): + self.addDetail('foo', Content('text/plain', lambda: 'foo')) + orig_test = Test('test_foo') + cloned_test = tests.clone_test(orig_test, orig_test.id() + '(cloned)') + orig_test.run(unittest.TestResult()) + self.assertEqual('foo', orig_test.getDetails()['foo'].iter_bytes()) + self.assertEqual(None, cloned_test.getDetails().get('foo')) + + def test_double_apply_scenario_preserves_first_scenario(self): + """Applying two levels of scenarios to a test preserves the attributes + added by both scenarios. + """ + class Test(tests.TestCase): + def test_foo(self): + pass + test = Test('test_foo') + scenarios_x = [('x=1', {'x': 1}), ('x=2', {'x': 2})] + scenarios_y = [('y=1', {'y': 1}), ('y=2', {'y': 2})] + suite = tests.multiply_tests(test, scenarios_x, unittest.TestSuite()) + suite = tests.multiply_tests(suite, scenarios_y, unittest.TestSuite()) + all_tests = list(tests.iter_suite_tests(suite)) + self.assertLength(4, all_tests) + all_xys = sorted((t.x, t.y) for t in all_tests) + self.assertEqual([(1, 1), (1, 2), (2, 1), (2, 2)], all_xys) + + +# NB: Don't delete this; it's not actually from 0.11! +@deprecated_function(deprecated_in((0, 11, 0))) +def sample_deprecated_function(): + """A deprecated function to test applyDeprecated with.""" + return 2 + + +def sample_undeprecated_function(a_param): + """A undeprecated function to test applyDeprecated with.""" + + +class ApplyDeprecatedHelper(object): + """A helper class for ApplyDeprecated tests.""" + + @deprecated_method(deprecated_in((0, 11, 0))) + def sample_deprecated_method(self, param_one): + """A deprecated method for testing with.""" + return param_one + + def sample_normal_method(self): + """A undeprecated method.""" + + @deprecated_method(deprecated_in((0, 10, 0))) + def sample_nested_deprecation(self): + return sample_deprecated_function() + + +class TestExtraAssertions(tests.TestCase): + """Tests for new test assertions in bzrlib test suite""" + + def test_assert_isinstance(self): + self.assertIsInstance(2, int) + self.assertIsInstance(u'', basestring) + e = self.assertRaises(AssertionError, self.assertIsInstance, None, int) + self.assertEquals(str(e), + "None is an instance of <type 'NoneType'> rather than <type 'int'>") + self.assertRaises(AssertionError, self.assertIsInstance, 23.3, int) + e = self.assertRaises(AssertionError, + self.assertIsInstance, None, int, "it's just not") + self.assertEquals(str(e), + "None is an instance of <type 'NoneType'> rather than <type 'int'>" + ": it's just not") + + def test_assertEndsWith(self): + self.assertEndsWith('foo', 'oo') + self.assertRaises(AssertionError, self.assertEndsWith, 'o', 'oo') + + def test_assertEqualDiff(self): + e = self.assertRaises(AssertionError, + self.assertEqualDiff, '', '\n') + self.assertEquals(str(e), + # Don't blink ! The '+' applies to the second string + 'first string is missing a final newline.\n+ \n') + e = self.assertRaises(AssertionError, + self.assertEqualDiff, '\n', '') + self.assertEquals(str(e), + # Don't blink ! The '-' applies to the second string + 'second string is missing a final newline.\n- \n') + + +class TestDeprecations(tests.TestCase): + + def test_applyDeprecated_not_deprecated(self): + sample_object = ApplyDeprecatedHelper() + # calling an undeprecated callable raises an assertion + self.assertRaises(AssertionError, self.applyDeprecated, + deprecated_in((0, 11, 0)), + sample_object.sample_normal_method) + self.assertRaises(AssertionError, self.applyDeprecated, + deprecated_in((0, 11, 0)), + sample_undeprecated_function, "a param value") + # calling a deprecated callable (function or method) with the wrong + # expected deprecation fails. + self.assertRaises(AssertionError, self.applyDeprecated, + deprecated_in((0, 10, 0)), + sample_object.sample_deprecated_method, "a param value") + self.assertRaises(AssertionError, self.applyDeprecated, + deprecated_in((0, 10, 0)), + sample_deprecated_function) + # calling a deprecated callable (function or method) with the right + # expected deprecation returns the functions result. + self.assertEqual("a param value", + self.applyDeprecated(deprecated_in((0, 11, 0)), + sample_object.sample_deprecated_method, "a param value")) + self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 11, 0)), + sample_deprecated_function)) + # calling a nested deprecation with the wrong deprecation version + # fails even if a deeper nested function was deprecated with the + # supplied version. + self.assertRaises(AssertionError, self.applyDeprecated, + deprecated_in((0, 11, 0)), sample_object.sample_nested_deprecation) + # calling a nested deprecation with the right deprecation value + # returns the calls result. + self.assertEqual(2, self.applyDeprecated(deprecated_in((0, 10, 0)), + sample_object.sample_nested_deprecation)) + + def test_callDeprecated(self): + def testfunc(be_deprecated, result=None): + if be_deprecated is True: + symbol_versioning.warn('i am deprecated', DeprecationWarning, + stacklevel=1) + return result + result = self.callDeprecated(['i am deprecated'], testfunc, True) + self.assertIs(None, result) + result = self.callDeprecated([], testfunc, False, 'result') + self.assertEqual('result', result) + self.callDeprecated(['i am deprecated'], testfunc, be_deprecated=True) + self.callDeprecated([], testfunc, be_deprecated=False) + + +class TestWarningTests(tests.TestCase): + """Tests for calling methods that raise warnings.""" + + def test_callCatchWarnings(self): + def meth(a, b): + warnings.warn("this is your last warning") + return a + b + wlist, result = self.callCatchWarnings(meth, 1, 2) + self.assertEquals(3, result) + # would like just to compare them, but UserWarning doesn't implement + # eq well + w0, = wlist + self.assertIsInstance(w0, UserWarning) + self.assertEquals("this is your last warning", str(w0)) + + +class TestConvenienceMakers(tests.TestCaseWithTransport): + """Test for the make_* convenience functions.""" + + def test_make_branch_and_tree_with_format(self): + # we should be able to supply a format to make_branch_and_tree + self.make_branch_and_tree('a', format=bzrlib.bzrdir.BzrDirMetaFormat1()) + self.assertIsInstance(bzrlib.controldir.ControlDir.open('a')._format, + bzrlib.bzrdir.BzrDirMetaFormat1) + + def test_make_branch_and_memory_tree(self): + # we should be able to get a new branch and a mutable tree from + # TestCaseWithTransport + tree = self.make_branch_and_memory_tree('a') + self.assertIsInstance(tree, bzrlib.memorytree.MemoryTree) + + def test_make_tree_for_local_vfs_backed_transport(self): + # make_branch_and_tree has to use local branch and repositories + # when the vfs transport and local disk are colocated, even if + # a different transport is in use for url generation. + self.transport_server = test_server.FakeVFATServer + self.assertFalse(self.get_url('t1').startswith('file://')) + tree = self.make_branch_and_tree('t1') + base = tree.bzrdir.root_transport.base + self.assertStartsWith(base, 'file://') + self.assertEquals(tree.bzrdir.root_transport, + tree.branch.bzrdir.root_transport) + self.assertEquals(tree.bzrdir.root_transport, + tree.branch.repository.bzrdir.root_transport) + + +class SelfTestHelper(object): + + def run_selftest(self, **kwargs): + """Run selftest returning its output.""" + output = StringIO() + old_transport = bzrlib.tests.default_transport + old_root = tests.TestCaseWithMemoryTransport.TEST_ROOT + tests.TestCaseWithMemoryTransport.TEST_ROOT = None + try: + self.assertEqual(True, tests.selftest(stream=output, **kwargs)) + finally: + bzrlib.tests.default_transport = old_transport + tests.TestCaseWithMemoryTransport.TEST_ROOT = old_root + output.seek(0) + return output + + +class TestSelftest(tests.TestCase, SelfTestHelper): + """Tests of bzrlib.tests.selftest.""" + + def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(self): + factory_called = [] + def factory(): + factory_called.append(True) + return TestUtil.TestSuite() + out = StringIO() + err = StringIO() + self.apply_redirected(out, err, None, bzrlib.tests.selftest, + test_suite_factory=factory) + self.assertEqual([True], factory_called) + + def factory(self): + """A test suite factory.""" + class Test(tests.TestCase): + def a(self): + pass + def b(self): + pass + def c(self): + pass + return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")]) + + def test_list_only(self): + output = self.run_selftest(test_suite_factory=self.factory, + list_only=True) + self.assertEqual(3, len(output.readlines())) + + def test_list_only_filtered(self): + output = self.run_selftest(test_suite_factory=self.factory, + list_only=True, pattern="Test.b") + self.assertEndsWith(output.getvalue(), "Test.b\n") + self.assertLength(1, output.readlines()) + + def test_list_only_excludes(self): + output = self.run_selftest(test_suite_factory=self.factory, + list_only=True, exclude_pattern="Test.b") + self.assertNotContainsRe("Test.b", output.getvalue()) + self.assertLength(2, output.readlines()) + + def test_lsprof_tests(self): + self.requireFeature(features.lsprof_feature) + results = [] + class Test(object): + def __call__(test, result): + test.run(result) + def run(test, result): + results.append(result) + def countTestCases(self): + return 1 + self.run_selftest(test_suite_factory=Test, lsprof_tests=True) + self.assertLength(1, results) + self.assertIsInstance(results.pop(), ExtendedToOriginalDecorator) + + def test_random(self): + # test randomising by listing a number of tests. + output_123 = self.run_selftest(test_suite_factory=self.factory, + list_only=True, random_seed="123") + output_234 = self.run_selftest(test_suite_factory=self.factory, + list_only=True, random_seed="234") + self.assertNotEqual(output_123, output_234) + # "Randominzing test order..\n\n + self.assertLength(5, output_123.readlines()) + self.assertLength(5, output_234.readlines()) + + def test_random_reuse_is_same_order(self): + # test randomising by listing a number of tests. + expected = self.run_selftest(test_suite_factory=self.factory, + list_only=True, random_seed="123") + repeated = self.run_selftest(test_suite_factory=self.factory, + list_only=True, random_seed="123") + self.assertEqual(expected.getvalue(), repeated.getvalue()) + + def test_runner_class(self): + self.requireFeature(features.subunit) + from subunit import ProtocolTestCase + stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner, + test_suite_factory=self.factory) + test = ProtocolTestCase(stream) + result = unittest.TestResult() + test.run(result) + self.assertEqual(3, result.testsRun) + + def test_starting_with_single_argument(self): + output = self.run_selftest(test_suite_factory=self.factory, + starting_with=['bzrlib.tests.test_selftest.Test.a'], + list_only=True) + self.assertEqual('bzrlib.tests.test_selftest.Test.a\n', + output.getvalue()) + + def test_starting_with_multiple_argument(self): + output = self.run_selftest(test_suite_factory=self.factory, + starting_with=['bzrlib.tests.test_selftest.Test.a', + 'bzrlib.tests.test_selftest.Test.b'], + list_only=True) + self.assertEqual('bzrlib.tests.test_selftest.Test.a\n' + 'bzrlib.tests.test_selftest.Test.b\n', + output.getvalue()) + + def check_transport_set(self, transport_server): + captured_transport = [] + def seen_transport(a_transport): + captured_transport.append(a_transport) + class Capture(tests.TestCase): + def a(self): + seen_transport(bzrlib.tests.default_transport) + def factory(): + return TestUtil.TestSuite([Capture("a")]) + self.run_selftest(transport=transport_server, test_suite_factory=factory) + self.assertEqual(transport_server, captured_transport[0]) + + def test_transport_sftp(self): + self.requireFeature(features.paramiko) + from bzrlib.tests import stub_sftp + self.check_transport_set(stub_sftp.SFTPAbsoluteServer) + + def test_transport_memory(self): + self.check_transport_set(memory.MemoryServer) + + +class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper): + # Does IO: reads test.list + + def test_load_list(self): + # Provide a list with one test - this test. + test_id_line = '%s\n' % self.id() + self.build_tree_contents([('test.list', test_id_line)]) + # And generate a list of the tests in the suite. + stream = self.run_selftest(load_list='test.list', list_only=True) + self.assertEqual(test_id_line, stream.getvalue()) + + def test_load_unknown(self): + # Provide a list with one test - this test. + # And generate a list of the tests in the suite. + err = self.assertRaises(errors.NoSuchFile, self.run_selftest, + load_list='missing file name', list_only=True) + + +class TestSubunitLogDetails(tests.TestCase, SelfTestHelper): + + _test_needs_features = [features.subunit] + + def run_subunit_stream(self, test_name): + from subunit import ProtocolTestCase + def factory(): + return TestUtil.TestSuite([_get_test(test_name)]) + stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner, + test_suite_factory=factory) + test = ProtocolTestCase(stream) + result = testtools.TestResult() + test.run(result) + content = stream.getvalue() + return content, result + + def test_fail_has_log(self): + content, result = self.run_subunit_stream('test_fail') + self.assertEqual(1, len(result.failures)) + self.assertContainsRe(content, '(?m)^log$') + self.assertContainsRe(content, 'this test will fail') + + def test_error_has_log(self): + content, result = self.run_subunit_stream('test_error') + self.assertContainsRe(content, '(?m)^log$') + self.assertContainsRe(content, 'this test errored') + + def test_skip_has_no_log(self): + content, result = self.run_subunit_stream('test_skip') + self.assertNotContainsRe(content, '(?m)^log$') + self.assertNotContainsRe(content, 'this test will be skipped') + self.assertEqual(['reason'], result.skip_reasons.keys()) + skips = result.skip_reasons['reason'] + self.assertEqual(1, len(skips)) + test = skips[0] + # RemotedTestCase doesn't preserve the "details" + ## self.assertFalse('log' in test.getDetails()) + + def test_missing_feature_has_no_log(self): + content, result = self.run_subunit_stream('test_missing_feature') + self.assertNotContainsRe(content, '(?m)^log$') + self.assertNotContainsRe(content, 'missing the feature') + self.assertEqual(['_MissingFeature\n'], result.skip_reasons.keys()) + skips = result.skip_reasons['_MissingFeature\n'] + self.assertEqual(1, len(skips)) + test = skips[0] + # RemotedTestCase doesn't preserve the "details" + ## self.assertFalse('log' in test.getDetails()) + + def test_xfail_has_no_log(self): + content, result = self.run_subunit_stream('test_xfail') + self.assertNotContainsRe(content, '(?m)^log$') + self.assertNotContainsRe(content, 'test with expected failure') + self.assertEqual(1, len(result.expectedFailures)) + result_content = result.expectedFailures[0][1] + self.assertNotContainsRe(result_content, + '(?m)^(?:Text attachment: )?log(?:$|: )') + self.assertNotContainsRe(result_content, 'test with expected failure') + + def test_unexpected_success_has_log(self): + content, result = self.run_subunit_stream('test_unexpected_success') + self.assertContainsRe(content, '(?m)^log$') + self.assertContainsRe(content, 'test with unexpected success') + # GZ 2011-05-18: Old versions of subunit treat unexpected success as a + # success, if a min version check is added remove this + from subunit import TestProtocolClient as _Client + if _Client.addUnexpectedSuccess.im_func is _Client.addSuccess.im_func: + self.expectFailure('subunit treats "unexpectedSuccess"' + ' as a plain success', + self.assertEqual, 1, len(result.unexpectedSuccesses)) + self.assertEqual(1, len(result.unexpectedSuccesses)) + test = result.unexpectedSuccesses[0] + # RemotedTestCase doesn't preserve the "details" + ## self.assertTrue('log' in test.getDetails()) + + def test_success_has_no_log(self): + content, result = self.run_subunit_stream('test_success') + self.assertEqual(1, result.testsRun) + self.assertNotContainsRe(content, '(?m)^log$') + self.assertNotContainsRe(content, 'this test succeeds') + + +class TestRunBzr(tests.TestCase): + + out = '' + err = '' + + def _run_bzr_core(self, argv, retcode=0, encoding=None, stdin=None, + working_dir=None): + """Override _run_bzr_core to test how it is invoked by run_bzr. + + Attempts to run bzr from inside this class don't actually run it. + + We test how run_bzr actually invokes bzr in another location. Here we + only need to test that it passes the right parameters to run_bzr. + """ + self.argv = list(argv) + self.retcode = retcode + self.encoding = encoding + self.stdin = stdin + self.working_dir = working_dir + return self.retcode, self.out, self.err + + def test_run_bzr_error(self): + self.out = "It sure does!\n" + out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=34) + self.assertEqual(['rocks'], self.argv) + self.assertEqual(34, self.retcode) + self.assertEqual('It sure does!\n', out) + self.assertEquals(out, self.out) + self.assertEqual('', err) + self.assertEquals(err, self.err) + + def test_run_bzr_error_regexes(self): + self.out = '' + self.err = "bzr: ERROR: foobarbaz is not versioned" + out, err = self.run_bzr_error( + ["bzr: ERROR: foobarbaz is not versioned"], + ['file-id', 'foobarbaz']) + + def test_encoding(self): + """Test that run_bzr passes encoding to _run_bzr_core""" + self.run_bzr('foo bar') + self.assertEqual(None, self.encoding) + self.assertEqual(['foo', 'bar'], self.argv) + + self.run_bzr('foo bar', encoding='baz') + self.assertEqual('baz', self.encoding) + self.assertEqual(['foo', 'bar'], self.argv) + + def test_retcode(self): + """Test that run_bzr passes retcode to _run_bzr_core""" + # Default is retcode == 0 + self.run_bzr('foo bar') + self.assertEqual(0, self.retcode) + self.assertEqual(['foo', 'bar'], self.argv) + + self.run_bzr('foo bar', retcode=1) + self.assertEqual(1, self.retcode) + self.assertEqual(['foo', 'bar'], self.argv) + + self.run_bzr('foo bar', retcode=None) + self.assertEqual(None, self.retcode) + self.assertEqual(['foo', 'bar'], self.argv) + + self.run_bzr(['foo', 'bar'], retcode=3) + self.assertEqual(3, self.retcode) + self.assertEqual(['foo', 'bar'], self.argv) + + def test_stdin(self): + # test that the stdin keyword to run_bzr is passed through to + # _run_bzr_core as-is. We do this by overriding + # _run_bzr_core in this class, and then calling run_bzr, + # which is a convenience function for _run_bzr_core, so + # should invoke it. + self.run_bzr('foo bar', stdin='gam') + self.assertEqual('gam', self.stdin) + self.assertEqual(['foo', 'bar'], self.argv) + + self.run_bzr('foo bar', stdin='zippy') + self.assertEqual('zippy', self.stdin) + self.assertEqual(['foo', 'bar'], self.argv) + + def test_working_dir(self): + """Test that run_bzr passes working_dir to _run_bzr_core""" + self.run_bzr('foo bar') + self.assertEqual(None, self.working_dir) + self.assertEqual(['foo', 'bar'], self.argv) + + self.run_bzr('foo bar', working_dir='baz') + self.assertEqual('baz', self.working_dir) + self.assertEqual(['foo', 'bar'], self.argv) + + def test_reject_extra_keyword_arguments(self): + self.assertRaises(TypeError, self.run_bzr, "foo bar", + error_regex=['error message']) + + +class TestRunBzrCaptured(tests.TestCaseWithTransport): + # Does IO when testing the working_dir parameter. + + def apply_redirected(self, stdin=None, stdout=None, stderr=None, + a_callable=None, *args, **kwargs): + self.stdin = stdin + self.factory_stdin = getattr(bzrlib.ui.ui_factory, "stdin", None) + self.factory = bzrlib.ui.ui_factory + self.working_dir = osutils.getcwd() + stdout.write('foo\n') + stderr.write('bar\n') + return 0 + + def test_stdin(self): + # test that the stdin keyword to _run_bzr_core is passed through to + # apply_redirected as a StringIO. We do this by overriding + # apply_redirected in this class, and then calling _run_bzr_core, + # which calls apply_redirected. + self.run_bzr(['foo', 'bar'], stdin='gam') + self.assertEqual('gam', self.stdin.read()) + self.assertTrue(self.stdin is self.factory_stdin) + self.run_bzr(['foo', 'bar'], stdin='zippy') + self.assertEqual('zippy', self.stdin.read()) + self.assertTrue(self.stdin is self.factory_stdin) + + def test_ui_factory(self): + # each invocation of self.run_bzr should get its + # own UI factory, which is an instance of TestUIFactory, + # with stdin, stdout and stderr attached to the stdin, + # stdout and stderr of the invoked run_bzr + current_factory = bzrlib.ui.ui_factory + self.run_bzr(['foo']) + self.assertFalse(current_factory is self.factory) + self.assertNotEqual(sys.stdout, self.factory.stdout) + self.assertNotEqual(sys.stderr, self.factory.stderr) + self.assertEqual('foo\n', self.factory.stdout.getvalue()) + self.assertEqual('bar\n', self.factory.stderr.getvalue()) + self.assertIsInstance(self.factory, tests.TestUIFactory) + + def test_working_dir(self): + self.build_tree(['one/', 'two/']) + cwd = osutils.getcwd() + + # Default is to work in the current directory + self.run_bzr(['foo', 'bar']) + self.assertEqual(cwd, self.working_dir) + + self.run_bzr(['foo', 'bar'], working_dir=None) + self.assertEqual(cwd, self.working_dir) + + # The function should be run in the alternative directory + # but afterwards the current working dir shouldn't be changed + self.run_bzr(['foo', 'bar'], working_dir='one') + self.assertNotEqual(cwd, self.working_dir) + self.assertEndsWith(self.working_dir, 'one') + self.assertEqual(cwd, osutils.getcwd()) + + self.run_bzr(['foo', 'bar'], working_dir='two') + self.assertNotEqual(cwd, self.working_dir) + self.assertEndsWith(self.working_dir, 'two') + self.assertEqual(cwd, osutils.getcwd()) + + +class StubProcess(object): + """A stub process for testing run_bzr_subprocess.""" + + def __init__(self, out="", err="", retcode=0): + self.out = out + self.err = err + self.returncode = retcode + + def communicate(self): + return self.out, self.err + + +class TestWithFakedStartBzrSubprocess(tests.TestCaseWithTransport): + """Base class for tests testing how we might run bzr.""" + + def setUp(self): + tests.TestCaseWithTransport.setUp(self) + self.subprocess_calls = [] + + def start_bzr_subprocess(self, process_args, env_changes=None, + skip_if_plan_to_signal=False, + working_dir=None, + allow_plugins=False): + """capture what run_bzr_subprocess tries to do.""" + self.subprocess_calls.append({'process_args':process_args, + 'env_changes':env_changes, + 'skip_if_plan_to_signal':skip_if_plan_to_signal, + 'working_dir':working_dir, 'allow_plugins':allow_plugins}) + return self.next_subprocess + + +class TestRunBzrSubprocess(TestWithFakedStartBzrSubprocess): + + def assertRunBzrSubprocess(self, expected_args, process, *args, **kwargs): + """Run run_bzr_subprocess with args and kwargs using a stubbed process. + + Inside TestRunBzrSubprocessCommands we use a stub start_bzr_subprocess + that will return static results. This assertion method populates those + results and also checks the arguments run_bzr_subprocess generates. + """ + self.next_subprocess = process + try: + result = self.run_bzr_subprocess(*args, **kwargs) + except: + self.next_subprocess = None + for key, expected in expected_args.iteritems(): + self.assertEqual(expected, self.subprocess_calls[-1][key]) + raise + else: + self.next_subprocess = None + for key, expected in expected_args.iteritems(): + self.assertEqual(expected, self.subprocess_calls[-1][key]) + return result + + def test_run_bzr_subprocess(self): + """The run_bzr_helper_external command behaves nicely.""" + self.assertRunBzrSubprocess({'process_args':['--version']}, + StubProcess(), '--version') + self.assertRunBzrSubprocess({'process_args':['--version']}, + StubProcess(), ['--version']) + # retcode=None disables retcode checking + result = self.assertRunBzrSubprocess({}, + StubProcess(retcode=3), '--version', retcode=None) + result = self.assertRunBzrSubprocess({}, + StubProcess(out="is free software"), '--version') + self.assertContainsRe(result[0], 'is free software') + # Running a subcommand that is missing errors + self.assertRaises(AssertionError, self.assertRunBzrSubprocess, + {'process_args':['--versionn']}, StubProcess(retcode=3), + '--versionn') + # Unless it is told to expect the error from the subprocess + result = self.assertRunBzrSubprocess({}, + StubProcess(retcode=3), '--versionn', retcode=3) + # Or to ignore retcode checking + result = self.assertRunBzrSubprocess({}, + StubProcess(err="unknown command", retcode=3), '--versionn', + retcode=None) + self.assertContainsRe(result[1], 'unknown command') + + def test_env_change_passes_through(self): + self.assertRunBzrSubprocess( + {'env_changes':{'new':'value', 'changed':'newvalue', 'deleted':None}}, + StubProcess(), '', + env_changes={'new':'value', 'changed':'newvalue', 'deleted':None}) + + def test_no_working_dir_passed_as_None(self): + self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '') + + def test_no_working_dir_passed_through(self): + self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '', + working_dir='dir') + + def test_run_bzr_subprocess_no_plugins(self): + self.assertRunBzrSubprocess({'allow_plugins': False}, + StubProcess(), '') + + def test_allow_plugins(self): + self.assertRunBzrSubprocess({'allow_plugins': True}, + StubProcess(), '', allow_plugins=True) + + +class TestFinishBzrSubprocess(TestWithFakedStartBzrSubprocess): + + def test_finish_bzr_subprocess_with_error(self): + """finish_bzr_subprocess allows specification of the desired exit code. + """ + process = StubProcess(err="unknown command", retcode=3) + result = self.finish_bzr_subprocess(process, retcode=3) + self.assertEqual('', result[0]) + self.assertContainsRe(result[1], 'unknown command') + + def test_finish_bzr_subprocess_ignoring_retcode(self): + """finish_bzr_subprocess allows the exit code to be ignored.""" + process = StubProcess(err="unknown command", retcode=3) + result = self.finish_bzr_subprocess(process, retcode=None) + self.assertEqual('', result[0]) + self.assertContainsRe(result[1], 'unknown command') + + def test_finish_subprocess_with_unexpected_retcode(self): + """finish_bzr_subprocess raises self.failureException if the retcode is + not the expected one. + """ + process = StubProcess(err="unknown command", retcode=3) + self.assertRaises(self.failureException, self.finish_bzr_subprocess, + process) + + +class _DontSpawnProcess(Exception): + """A simple exception which just allows us to skip unnecessary steps""" + + +class TestStartBzrSubProcess(tests.TestCase): + """Stub test start_bzr_subprocess.""" + + def _subprocess_log_cleanup(self): + """Inhibits the base version as we don't produce a log file.""" + + def _popen(self, *args, **kwargs): + """Override the base version to record the command that is run. + + From there we can ensure it is correct without spawning a real process. + """ + self.check_popen_state() + self._popen_args = args + self._popen_kwargs = kwargs + raise _DontSpawnProcess() + + def check_popen_state(self): + """Replace to make assertions when popen is called.""" + + def test_run_bzr_subprocess_no_plugins(self): + self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, []) + command = self._popen_args[0] + self.assertEqual(sys.executable, command[0]) + self.assertEqual(self.get_bzr_path(), command[1]) + self.assertEqual(['--no-plugins'], command[2:]) + + def test_allow_plugins(self): + self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [], + allow_plugins=True) + command = self._popen_args[0] + self.assertEqual([], command[2:]) + + def test_set_env(self): + self.assertFalse('EXISTANT_ENV_VAR' in os.environ) + # set in the child + def check_environment(): + self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR']) + self.check_popen_state = check_environment + self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [], + env_changes={'EXISTANT_ENV_VAR':'set variable'}) + # not set in theparent + self.assertFalse('EXISTANT_ENV_VAR' in os.environ) + + def test_run_bzr_subprocess_env_del(self): + """run_bzr_subprocess can remove environment variables too.""" + self.assertFalse('EXISTANT_ENV_VAR' in os.environ) + def check_environment(): + self.assertFalse('EXISTANT_ENV_VAR' in os.environ) + os.environ['EXISTANT_ENV_VAR'] = 'set variable' + self.check_popen_state = check_environment + self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [], + env_changes={'EXISTANT_ENV_VAR':None}) + # Still set in parent + self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR']) + del os.environ['EXISTANT_ENV_VAR'] + + def test_env_del_missing(self): + self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ) + def check_environment(): + self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ) + self.check_popen_state = check_environment + self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [], + env_changes={'NON_EXISTANT_ENV_VAR':None}) + + def test_working_dir(self): + """Test that we can specify the working dir for the child""" + orig_getcwd = osutils.getcwd + orig_chdir = os.chdir + chdirs = [] + def chdir(path): + chdirs.append(path) + self.overrideAttr(os, 'chdir', chdir) + def getcwd(): + return 'current' + self.overrideAttr(osutils, 'getcwd', getcwd) + self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [], + working_dir='foo') + self.assertEqual(['foo', 'current'], chdirs) + + def test_get_bzr_path_with_cwd_bzrlib(self): + self.get_source_path = lambda: "" + self.overrideAttr(os.path, "isfile", lambda path: True) + self.assertEqual(self.get_bzr_path(), "bzr") + + +class TestActuallyStartBzrSubprocess(tests.TestCaseWithTransport): + """Tests that really need to do things with an external bzr.""" + + def test_start_and_stop_bzr_subprocess_send_signal(self): + """finish_bzr_subprocess raises self.failureException if the retcode is + not the expected one. + """ + self.disable_missing_extensions_warning() + process = self.start_bzr_subprocess(['wait-until-signalled'], + skip_if_plan_to_signal=True) + self.assertEqual('running\n', process.stdout.readline()) + result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT, + retcode=3) + self.assertEqual('', result[0]) + self.assertEqual('bzr: interrupted\n', result[1]) + + +class TestSelftestFiltering(tests.TestCase): + + def setUp(self): + tests.TestCase.setUp(self) + self.suite = TestUtil.TestSuite() + self.loader = TestUtil.TestLoader() + self.suite.addTest(self.loader.loadTestsFromModule( + sys.modules['bzrlib.tests.test_selftest'])) + self.all_names = _test_ids(self.suite) + + def test_condition_id_re(self): + test_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.' + 'test_condition_id_re') + filtered_suite = tests.filter_suite_by_condition( + self.suite, tests.condition_id_re('test_condition_id_re')) + self.assertEqual([test_name], _test_ids(filtered_suite)) + + def test_condition_id_in_list(self): + test_names = ['bzrlib.tests.test_selftest.TestSelftestFiltering.' + 'test_condition_id_in_list'] + id_list = tests.TestIdList(test_names) + filtered_suite = tests.filter_suite_by_condition( + self.suite, tests.condition_id_in_list(id_list)) + my_pattern = 'TestSelftestFiltering.*test_condition_id_in_list' + re_filtered = tests.filter_suite_by_re(self.suite, my_pattern) + self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite)) + + def test_condition_id_startswith(self): + klass = 'bzrlib.tests.test_selftest.TestSelftestFiltering.' + start1 = klass + 'test_condition_id_starts' + start2 = klass + 'test_condition_id_in' + test_names = [ klass + 'test_condition_id_in_list', + klass + 'test_condition_id_startswith', + ] + filtered_suite = tests.filter_suite_by_condition( + self.suite, tests.condition_id_startswith([start1, start2])) + self.assertEqual(test_names, _test_ids(filtered_suite)) + + def test_condition_isinstance(self): + filtered_suite = tests.filter_suite_by_condition( + self.suite, tests.condition_isinstance(self.__class__)) + class_pattern = 'bzrlib.tests.test_selftest.TestSelftestFiltering.' + re_filtered = tests.filter_suite_by_re(self.suite, class_pattern) + self.assertEqual(_test_ids(re_filtered), _test_ids(filtered_suite)) + + def test_exclude_tests_by_condition(self): + excluded_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.' + 'test_exclude_tests_by_condition') + filtered_suite = tests.exclude_tests_by_condition(self.suite, + lambda x:x.id() == excluded_name) + self.assertEqual(len(self.all_names) - 1, + filtered_suite.countTestCases()) + self.assertFalse(excluded_name in _test_ids(filtered_suite)) + remaining_names = list(self.all_names) + remaining_names.remove(excluded_name) + self.assertEqual(remaining_names, _test_ids(filtered_suite)) + + def test_exclude_tests_by_re(self): + self.all_names = _test_ids(self.suite) + filtered_suite = tests.exclude_tests_by_re(self.suite, + 'exclude_tests_by_re') + excluded_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.' + 'test_exclude_tests_by_re') + self.assertEqual(len(self.all_names) - 1, + filtered_suite.countTestCases()) + self.assertFalse(excluded_name in _test_ids(filtered_suite)) + remaining_names = list(self.all_names) + remaining_names.remove(excluded_name) + self.assertEqual(remaining_names, _test_ids(filtered_suite)) + + def test_filter_suite_by_condition(self): + test_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.' + 'test_filter_suite_by_condition') + filtered_suite = tests.filter_suite_by_condition(self.suite, + lambda x:x.id() == test_name) + self.assertEqual([test_name], _test_ids(filtered_suite)) + + def test_filter_suite_by_re(self): + filtered_suite = tests.filter_suite_by_re(self.suite, + 'test_filter_suite_by_r') + filtered_names = _test_ids(filtered_suite) + self.assertEqual(filtered_names, ['bzrlib.tests.test_selftest.' + 'TestSelftestFiltering.test_filter_suite_by_re']) + + def test_filter_suite_by_id_list(self): + test_list = ['bzrlib.tests.test_selftest.' + 'TestSelftestFiltering.test_filter_suite_by_id_list'] + filtered_suite = tests.filter_suite_by_id_list( + self.suite, tests.TestIdList(test_list)) + filtered_names = _test_ids(filtered_suite) + self.assertEqual( + filtered_names, + ['bzrlib.tests.test_selftest.' + 'TestSelftestFiltering.test_filter_suite_by_id_list']) + + def test_filter_suite_by_id_startswith(self): + # By design this test may fail if another test is added whose name also + # begins with one of the start value used. + klass = 'bzrlib.tests.test_selftest.TestSelftestFiltering.' + start1 = klass + 'test_filter_suite_by_id_starts' + start2 = klass + 'test_filter_suite_by_id_li' + test_list = [klass + 'test_filter_suite_by_id_list', + klass + 'test_filter_suite_by_id_startswith', + ] + filtered_suite = tests.filter_suite_by_id_startswith( + self.suite, [start1, start2]) + self.assertEqual( + test_list, + _test_ids(filtered_suite), + ) + + def test_preserve_input(self): + # NB: Surely this is something in the stdlib to do this? + self.assertTrue(self.suite is tests.preserve_input(self.suite)) + self.assertTrue("@#$" is tests.preserve_input("@#$")) + + def test_randomize_suite(self): + randomized_suite = tests.randomize_suite(self.suite) + # randomizing should not add or remove test names. + self.assertEqual(set(_test_ids(self.suite)), + set(_test_ids(randomized_suite))) + # Technically, this *can* fail, because random.shuffle(list) can be + # equal to list. Trying multiple times just pushes the frequency back. + # As its len(self.all_names)!:1, the failure frequency should be low + # enough to ignore. RBC 20071021. + # It should change the order. + self.assertNotEqual(self.all_names, _test_ids(randomized_suite)) + # But not the length. (Possibly redundant with the set test, but not + # necessarily.) + self.assertEqual(len(self.all_names), len(_test_ids(randomized_suite))) + + def test_split_suit_by_condition(self): + self.all_names = _test_ids(self.suite) + condition = tests.condition_id_re('test_filter_suite_by_r') + split_suite = tests.split_suite_by_condition(self.suite, condition) + filtered_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.' + 'test_filter_suite_by_re') + self.assertEqual([filtered_name], _test_ids(split_suite[0])) + self.assertFalse(filtered_name in _test_ids(split_suite[1])) + remaining_names = list(self.all_names) + remaining_names.remove(filtered_name) + self.assertEqual(remaining_names, _test_ids(split_suite[1])) + + def test_split_suit_by_re(self): + self.all_names = _test_ids(self.suite) + split_suite = tests.split_suite_by_re(self.suite, + 'test_filter_suite_by_r') + filtered_name = ('bzrlib.tests.test_selftest.TestSelftestFiltering.' + 'test_filter_suite_by_re') + self.assertEqual([filtered_name], _test_ids(split_suite[0])) + self.assertFalse(filtered_name in _test_ids(split_suite[1])) + remaining_names = list(self.all_names) + remaining_names.remove(filtered_name) + self.assertEqual(remaining_names, _test_ids(split_suite[1])) + + +class TestCheckTreeShape(tests.TestCaseWithTransport): + + def test_check_tree_shape(self): + files = ['a', 'b/', 'b/c'] + tree = self.make_branch_and_tree('.') + self.build_tree(files) + tree.add(files) + tree.lock_read() + try: + self.check_tree_shape(tree, files) + finally: + tree.unlock() + + +class TestBlackboxSupport(tests.TestCase): + """Tests for testsuite blackbox features.""" + + def test_run_bzr_failure_not_caught(self): + # When we run bzr in blackbox mode, we want any unexpected errors to + # propagate up to the test suite so that it can show the error in the + # usual way, and we won't get a double traceback. + e = self.assertRaises( + AssertionError, + self.run_bzr, ['assert-fail']) + # make sure we got the real thing, not an error from somewhere else in + # the test framework + self.assertEquals('always fails', str(e)) + # check that there's no traceback in the test log + self.assertNotContainsRe(self.get_log(), r'Traceback') + + def test_run_bzr_user_error_caught(self): + # Running bzr in blackbox mode, normal/expected/user errors should be + # caught in the regular way and turned into an error message plus exit + # code. + transport_server = memory.MemoryServer() + transport_server.start_server() + self.addCleanup(transport_server.stop_server) + url = transport_server.get_url() + self.permit_url(url) + out, err = self.run_bzr(["log", "%s/nonexistantpath" % url], retcode=3) + self.assertEqual(out, '') + self.assertContainsRe(err, + 'bzr: ERROR: Not a branch: ".*nonexistantpath/".\n') + + +class TestTestLoader(tests.TestCase): + """Tests for the test loader.""" + + def _get_loader_and_module(self): + """Gets a TestLoader and a module with one test in it.""" + loader = TestUtil.TestLoader() + module = {} + class Stub(tests.TestCase): + def test_foo(self): + pass + class MyModule(object): + pass + MyModule.a_class = Stub + module = MyModule() + return loader, module + + def test_module_no_load_tests_attribute_loads_classes(self): + loader, module = self._get_loader_and_module() + self.assertEqual(1, loader.loadTestsFromModule(module).countTestCases()) + + def test_module_load_tests_attribute_gets_called(self): + loader, module = self._get_loader_and_module() + # 'self' is here because we're faking the module with a class. Regular + # load_tests do not need that :) + def load_tests(self, standard_tests, module, loader): + result = loader.suiteClass() + for test in tests.iter_suite_tests(standard_tests): + result.addTests([test, test]) + return result + # add a load_tests() method which multiplies the tests from the module. + module.__class__.load_tests = load_tests + self.assertEqual(2, loader.loadTestsFromModule(module).countTestCases()) + + def test_load_tests_from_module_name_smoke_test(self): + loader = TestUtil.TestLoader() + suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler') + self.assertEquals(['bzrlib.tests.test_sampler.DemoTest.test_nothing'], + _test_ids(suite)) + + def test_load_tests_from_module_name_with_bogus_module_name(self): + loader = TestUtil.TestLoader() + self.assertRaises(ImportError, loader.loadTestsFromModuleName, 'bogus') + + +class TestTestIdList(tests.TestCase): + + def _create_id_list(self, test_list): + return tests.TestIdList(test_list) + + def _create_suite(self, test_id_list): + + class Stub(tests.TestCase): + def test_foo(self): + pass + + def _create_test_id(id): + return lambda: id + + suite = TestUtil.TestSuite() + for id in test_id_list: + t = Stub('test_foo') + t.id = _create_test_id(id) + suite.addTest(t) + return suite + + def _test_ids(self, test_suite): + """Get the ids for the tests in a test suite.""" + return [t.id() for t in tests.iter_suite_tests(test_suite)] + + def test_empty_list(self): + id_list = self._create_id_list([]) + self.assertEquals({}, id_list.tests) + self.assertEquals({}, id_list.modules) + + def test_valid_list(self): + id_list = self._create_id_list( + ['mod1.cl1.meth1', 'mod1.cl1.meth2', + 'mod1.func1', 'mod1.cl2.meth2', + 'mod1.submod1', + 'mod1.submod2.cl1.meth1', 'mod1.submod2.cl2.meth2', + ]) + self.assertTrue(id_list.refers_to('mod1')) + self.assertTrue(id_list.refers_to('mod1.submod1')) + self.assertTrue(id_list.refers_to('mod1.submod2')) + self.assertTrue(id_list.includes('mod1.cl1.meth1')) + self.assertTrue(id_list.includes('mod1.submod1')) + self.assertTrue(id_list.includes('mod1.func1')) + + def test_bad_chars_in_params(self): + id_list = self._create_id_list(['mod1.cl1.meth1(xx.yy)']) + self.assertTrue(id_list.refers_to('mod1')) + self.assertTrue(id_list.includes('mod1.cl1.meth1(xx.yy)')) + + def test_module_used(self): + id_list = self._create_id_list(['mod.class.meth']) + self.assertTrue(id_list.refers_to('mod')) + self.assertTrue(id_list.refers_to('mod.class')) + self.assertTrue(id_list.refers_to('mod.class.meth')) + + def test_test_suite_matches_id_list_with_unknown(self): + loader = TestUtil.TestLoader() + suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler') + test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing', + 'bogus'] + not_found, duplicates = tests.suite_matches_id_list(suite, test_list) + self.assertEquals(['bogus'], not_found) + self.assertEquals([], duplicates) + + def test_suite_matches_id_list_with_duplicates(self): + loader = TestUtil.TestLoader() + suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler') + dupes = loader.suiteClass() + for test in tests.iter_suite_tests(suite): + dupes.addTest(test) + dupes.addTest(test) # Add it again + + test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing',] + not_found, duplicates = tests.suite_matches_id_list( + dupes, test_list) + self.assertEquals([], not_found) + self.assertEquals(['bzrlib.tests.test_sampler.DemoTest.test_nothing'], + duplicates) + + +class TestTestSuite(tests.TestCase): + + def test__test_suite_testmod_names(self): + # Test that a plausible list of test module names are returned + # by _test_suite_testmod_names. + test_list = tests._test_suite_testmod_names() + self.assertSubset([ + 'bzrlib.tests.blackbox', + 'bzrlib.tests.per_transport', + 'bzrlib.tests.test_selftest', + ], + test_list) + + def test__test_suite_modules_to_doctest(self): + # Test that a plausible list of modules to doctest is returned + # by _test_suite_modules_to_doctest. + test_list = tests._test_suite_modules_to_doctest() + if __doc__ is None: + # When docstrings are stripped, there are no modules to doctest + self.assertEqual([], test_list) + return + self.assertSubset([ + 'bzrlib.timestamp', + ], + test_list) + + def test_test_suite(self): + # test_suite() loads the entire test suite to operate. To avoid this + # overhead, and yet still be confident that things are happening, + # we temporarily replace two functions used by test_suite with + # test doubles that supply a few sample tests to load, and check they + # are loaded. + calls = [] + def testmod_names(): + calls.append("testmod_names") + return [ + 'bzrlib.tests.blackbox.test_branch', + 'bzrlib.tests.per_transport', + 'bzrlib.tests.test_selftest', + ] + self.overrideAttr(tests, '_test_suite_testmod_names', testmod_names) + def doctests(): + calls.append("modules_to_doctest") + if __doc__ is None: + return [] + return ['bzrlib.timestamp'] + self.overrideAttr(tests, '_test_suite_modules_to_doctest', doctests) + expected_test_list = [ + # testmod_names + 'bzrlib.tests.blackbox.test_branch.TestBranch.test_branch', + ('bzrlib.tests.per_transport.TransportTests' + '.test_abspath(LocalTransport,LocalURLServer)'), + 'bzrlib.tests.test_selftest.TestTestSuite.test_test_suite', + # plugins can't be tested that way since selftest may be run with + # --no-plugins + ] + if __doc__ is not None: + expected_test_list.extend([ + # modules_to_doctest + 'bzrlib.timestamp.format_highres_date', + ]) + suite = tests.test_suite() + self.assertEqual(set(["testmod_names", "modules_to_doctest"]), + set(calls)) + self.assertSubset(expected_test_list, _test_ids(suite)) + + def test_test_suite_list_and_start(self): + # We cannot test this at the same time as the main load, because we want + # to know that starting_with == None works. So a second load is + # incurred - note that the starting_with parameter causes a partial load + # rather than a full load so this test should be pretty quick. + test_list = ['bzrlib.tests.test_selftest.TestTestSuite.test_test_suite'] + suite = tests.test_suite(test_list, + ['bzrlib.tests.test_selftest.TestTestSuite']) + # test_test_suite_list_and_start is not included + self.assertEquals(test_list, _test_ids(suite)) + + +class TestLoadTestIdList(tests.TestCaseInTempDir): + + def _create_test_list_file(self, file_name, content): + fl = open(file_name, 'wt') + fl.write(content) + fl.close() + + def test_load_unknown(self): + self.assertRaises(errors.NoSuchFile, + tests.load_test_id_list, 'i_do_not_exist') + + def test_load_test_list(self): + test_list_fname = 'test.list' + self._create_test_list_file(test_list_fname, + 'mod1.cl1.meth1\nmod2.cl2.meth2\n') + tlist = tests.load_test_id_list(test_list_fname) + self.assertEquals(2, len(tlist)) + self.assertEquals('mod1.cl1.meth1', tlist[0]) + self.assertEquals('mod2.cl2.meth2', tlist[1]) + + def test_load_dirty_file(self): + test_list_fname = 'test.list' + self._create_test_list_file(test_list_fname, + ' mod1.cl1.meth1\n\nmod2.cl2.meth2 \n' + 'bar baz\n') + tlist = tests.load_test_id_list(test_list_fname) + self.assertEquals(4, len(tlist)) + self.assertEquals('mod1.cl1.meth1', tlist[0]) + self.assertEquals('', tlist[1]) + self.assertEquals('mod2.cl2.meth2', tlist[2]) + self.assertEquals('bar baz', tlist[3]) + + +class TestFilteredByModuleTestLoader(tests.TestCase): + + def _create_loader(self, test_list): + id_filter = tests.TestIdList(test_list) + loader = TestUtil.FilteredByModuleTestLoader(id_filter.refers_to) + return loader + + def test_load_tests(self): + test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing'] + loader = self._create_loader(test_list) + suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler') + self.assertEquals(test_list, _test_ids(suite)) + + def test_exclude_tests(self): + test_list = ['bogus'] + loader = self._create_loader(test_list) + suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler') + self.assertEquals([], _test_ids(suite)) + + +class TestFilteredByNameStartTestLoader(tests.TestCase): + + def _create_loader(self, name_start): + def needs_module(name): + return name.startswith(name_start) or name_start.startswith(name) + loader = TestUtil.FilteredByModuleTestLoader(needs_module) + return loader + + def test_load_tests(self): + test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing'] + loader = self._create_loader('bzrlib.tests.test_samp') + + suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler') + self.assertEquals(test_list, _test_ids(suite)) + + def test_load_tests_inside_module(self): + test_list = ['bzrlib.tests.test_sampler.DemoTest.test_nothing'] + loader = self._create_loader('bzrlib.tests.test_sampler.Demo') + + suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler') + self.assertEquals(test_list, _test_ids(suite)) + + def test_exclude_tests(self): + test_list = ['bogus'] + loader = self._create_loader('bogus') + + suite = loader.loadTestsFromModuleName('bzrlib.tests.test_sampler') + self.assertEquals([], _test_ids(suite)) + + +class TestTestPrefixRegistry(tests.TestCase): + + def _get_registry(self): + tp_registry = tests.TestPrefixAliasRegistry() + return tp_registry + + def test_register_new_prefix(self): + tpr = self._get_registry() + tpr.register('foo', 'fff.ooo.ooo') + self.assertEquals('fff.ooo.ooo', tpr.get('foo')) + + def test_register_existing_prefix(self): + tpr = self._get_registry() + tpr.register('bar', 'bbb.aaa.rrr') + tpr.register('bar', 'bBB.aAA.rRR') + self.assertEquals('bbb.aaa.rrr', tpr.get('bar')) + self.assertThat(self.get_log(), + DocTestMatches("...bar...bbb.aaa.rrr...BB.aAA.rRR", + doctest.ELLIPSIS)) + + def test_get_unknown_prefix(self): + tpr = self._get_registry() + self.assertRaises(KeyError, tpr.get, 'I am not a prefix') + + def test_resolve_prefix(self): + tpr = self._get_registry() + tpr.register('bar', 'bb.aa.rr') + self.assertEquals('bb.aa.rr', tpr.resolve_alias('bar')) + + def test_resolve_unknown_alias(self): + tpr = self._get_registry() + self.assertRaises(errors.BzrCommandError, + tpr.resolve_alias, 'I am not a prefix') + + def test_predefined_prefixes(self): + tpr = tests.test_prefix_alias_registry + self.assertEquals('bzrlib', tpr.resolve_alias('bzrlib')) + self.assertEquals('bzrlib.doc', tpr.resolve_alias('bd')) + self.assertEquals('bzrlib.utils', tpr.resolve_alias('bu')) + self.assertEquals('bzrlib.tests', tpr.resolve_alias('bt')) + self.assertEquals('bzrlib.tests.blackbox', tpr.resolve_alias('bb')) + self.assertEquals('bzrlib.plugins', tpr.resolve_alias('bp')) + + +class TestThreadLeakDetection(tests.TestCase): + """Ensure when tests leak threads we detect and report it""" + + class LeakRecordingResult(tests.ExtendedTestResult): + def __init__(self): + tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1) + self.leaks = [] + def _report_thread_leak(self, test, leaks, alive): + self.leaks.append((test, leaks)) + + def test_testcase_without_addCleanups(self): + """Check old TestCase instances don't break with leak detection""" + class Test(unittest.TestCase): + def runTest(self): + pass + result = self.LeakRecordingResult() + test = Test() + result.startTestRun() + test.run(result) + result.stopTestRun() + self.assertEqual(result._tests_leaking_threads_count, 0) + self.assertEqual(result.leaks, []) + + def test_thread_leak(self): + """Ensure a thread that outlives the running of a test is reported + + Uses a thread that blocks on an event, and is started by the inner + test case. As the thread outlives the inner case's run, it should be + detected as a leak, but the event is then set so that the thread can + be safely joined in cleanup so it's not leaked for real. + """ + event = threading.Event() + thread = threading.Thread(name="Leaker", target=event.wait) + class Test(tests.TestCase): + def test_leak(self): + thread.start() + result = self.LeakRecordingResult() + test = Test("test_leak") + self.addCleanup(thread.join) + self.addCleanup(event.set) + result.startTestRun() + test.run(result) + result.stopTestRun() + self.assertEqual(result._tests_leaking_threads_count, 1) + self.assertEqual(result._first_thread_leaker_id, test.id()) + self.assertEqual(result.leaks, [(test, set([thread]))]) + self.assertContainsString(result.stream.getvalue(), "leaking threads") + + def test_multiple_leaks(self): + """Check multiple leaks are blamed on the test cases at fault + + Same concept as the previous test, but has one inner test method that + leaks two threads, and one that doesn't leak at all. + """ + event = threading.Event() + thread_a = threading.Thread(name="LeakerA", target=event.wait) + thread_b = threading.Thread(name="LeakerB", target=event.wait) + thread_c = threading.Thread(name="LeakerC", target=event.wait) + class Test(tests.TestCase): + def test_first_leak(self): + thread_b.start() + def test_second_no_leak(self): + pass + def test_third_leak(self): + thread_c.start() + thread_a.start() + result = self.LeakRecordingResult() + first_test = Test("test_first_leak") + third_test = Test("test_third_leak") + self.addCleanup(thread_a.join) + self.addCleanup(thread_b.join) + self.addCleanup(thread_c.join) + self.addCleanup(event.set) + result.startTestRun() + unittest.TestSuite( + [first_test, Test("test_second_no_leak"), third_test] + ).run(result) + result.stopTestRun() + self.assertEqual(result._tests_leaking_threads_count, 2) + self.assertEqual(result._first_thread_leaker_id, first_test.id()) + self.assertEqual(result.leaks, [ + (first_test, set([thread_b])), + (third_test, set([thread_a, thread_c]))]) + self.assertContainsString(result.stream.getvalue(), "leaking threads") + + +class TestPostMortemDebugging(tests.TestCase): + """Check post mortem debugging works when tests fail or error""" + + class TracebackRecordingResult(tests.ExtendedTestResult): + def __init__(self): + tests.ExtendedTestResult.__init__(self, StringIO(), 0, 1) + self.postcode = None + def _post_mortem(self, tb=None): + """Record the code object at the end of the current traceback""" + tb = tb or sys.exc_info()[2] + if tb is not None: + next = tb.tb_next + while next is not None: + tb = next + next = next.tb_next + self.postcode = tb.tb_frame.f_code + def report_error(self, test, err): + pass + def report_failure(self, test, err): + pass + + def test_location_unittest_error(self): + """Needs right post mortem traceback with erroring unittest case""" + class Test(unittest.TestCase): + def runTest(self): + raise RuntimeError + result = self.TracebackRecordingResult() + Test().run(result) + self.assertEqual(result.postcode, Test.runTest.func_code) + + def test_location_unittest_failure(self): + """Needs right post mortem traceback with failing unittest case""" + class Test(unittest.TestCase): + def runTest(self): + raise self.failureException + result = self.TracebackRecordingResult() + Test().run(result) + self.assertEqual(result.postcode, Test.runTest.func_code) + + def test_location_bt_error(self): + """Needs right post mortem traceback with erroring bzrlib.tests case""" + class Test(tests.TestCase): + def test_error(self): + raise RuntimeError + result = self.TracebackRecordingResult() + Test("test_error").run(result) + self.assertEqual(result.postcode, Test.test_error.func_code) + + def test_location_bt_failure(self): + """Needs right post mortem traceback with failing bzrlib.tests case""" + class Test(tests.TestCase): + def test_failure(self): + raise self.failureException + result = self.TracebackRecordingResult() + Test("test_failure").run(result) + self.assertEqual(result.postcode, Test.test_failure.func_code) + + def test_env_var_triggers_post_mortem(self): + """Check pdb.post_mortem is called iff BZR_TEST_PDB is set""" + import pdb + result = tests.ExtendedTestResult(StringIO(), 0, 1) + post_mortem_calls = [] + self.overrideAttr(pdb, "post_mortem", post_mortem_calls.append) + self.overrideEnv('BZR_TEST_PDB', None) + result._post_mortem(1) + self.overrideEnv('BZR_TEST_PDB', 'on') + result._post_mortem(2) + self.assertEqual([2], post_mortem_calls) + + +class TestRunSuite(tests.TestCase): + + def test_runner_class(self): + """run_suite accepts and uses a runner_class keyword argument.""" + class Stub(tests.TestCase): + def test_foo(self): + pass + suite = Stub("test_foo") + calls = [] + class MyRunner(tests.TextTestRunner): + def run(self, test): + calls.append(test) + return tests.ExtendedTestResult(self.stream, self.descriptions, + self.verbosity) + tests.run_suite(suite, runner_class=MyRunner, stream=StringIO()) + self.assertLength(1, calls) + + +class _Selftest(object): + """Mixin for tests needing full selftest output""" + + def _inject_stream_into_subunit(self, stream): + """To be overridden by subclasses that run tests out of process""" + + def _run_selftest(self, **kwargs): + sio = StringIO() + self._inject_stream_into_subunit(sio) + tests.selftest(stream=sio, stop_on_failure=False, **kwargs) + return sio.getvalue() + + +class _ForkedSelftest(_Selftest): + """Mixin for tests needing full selftest output with forked children""" + + _test_needs_features = [features.subunit] + + def _inject_stream_into_subunit(self, stream): + """Monkey-patch subunit so the extra output goes to stream not stdout + + Some APIs need rewriting so this kind of bogus hackery can be replaced + by passing the stream param from run_tests down into ProtocolTestCase. + """ + from subunit import ProtocolTestCase + _original_init = ProtocolTestCase.__init__ + def _init_with_passthrough(self, *args, **kwargs): + _original_init(self, *args, **kwargs) + self._passthrough = stream + self.overrideAttr(ProtocolTestCase, "__init__", _init_with_passthrough) + + def _run_selftest(self, **kwargs): + # GZ 2011-05-26: Add a PosixSystem feature so this check can go away + if getattr(os, "fork", None) is None: + raise tests.TestNotApplicable("Platform doesn't support forking") + # Make sure the fork code is actually invoked by claiming two cores + self.overrideAttr(osutils, "local_concurrency", lambda: 2) + kwargs.setdefault("suite_decorators", []).append(tests.fork_decorator) + return super(_ForkedSelftest, self)._run_selftest(**kwargs) + + +class TestParallelFork(_ForkedSelftest, tests.TestCase): + """Check operation of --parallel=fork selftest option""" + + def test_error_in_child_during_fork(self): + """Error in a forked child during test setup should get reported""" + class Test(tests.TestCase): + def testMethod(self): + pass + # We don't care what, just break something that a child will run + self.overrideAttr(tests, "workaround_zealous_crypto_random", None) + out = self._run_selftest(test_suite_factory=Test) + # Lines from the tracebacks of the two child processes may be mixed + # together due to the way subunit parses and forwards the streams, + # so permit extra lines between each part of the error output. + self.assertContainsRe(out, + "Traceback.*:\n" + "(?:.*\n)*" + ".+ in fork_for_tests\n" + "(?:.*\n)*" + "\s*workaround_zealous_crypto_random\(\)\n" + "(?:.*\n)*" + "TypeError:") + + +class TestUncollectedWarnings(_Selftest, tests.TestCase): + """Check a test case still alive after being run emits a warning""" + + class Test(tests.TestCase): + def test_pass(self): + pass + def test_self_ref(self): + self.also_self = self.test_self_ref + def test_skip(self): + self.skip("Don't need") + + def _get_suite(self): + return TestUtil.TestSuite([ + self.Test("test_pass"), + self.Test("test_self_ref"), + self.Test("test_skip"), + ]) + + def _run_selftest_with_suite(self, **kwargs): + old_flags = tests.selftest_debug_flags + tests.selftest_debug_flags = old_flags.union(["uncollected_cases"]) + gc_on = gc.isenabled() + if gc_on: + gc.disable() + try: + output = self._run_selftest(test_suite_factory=self._get_suite, + **kwargs) + finally: + if gc_on: + gc.enable() + tests.selftest_debug_flags = old_flags + self.assertNotContainsRe(output, "Uncollected test case.*test_pass") + self.assertContainsRe(output, "Uncollected test case.*test_self_ref") + return output + + def test_testsuite(self): + self._run_selftest_with_suite() + + def test_pattern(self): + out = self._run_selftest_with_suite(pattern="test_(?:pass|self_ref)$") + self.assertNotContainsRe(out, "test_skip") + + def test_exclude_pattern(self): + out = self._run_selftest_with_suite(exclude_pattern="test_skip$") + self.assertNotContainsRe(out, "test_skip") + + def test_random_seed(self): + self._run_selftest_with_suite(random_seed="now") + + def test_matching_tests_first(self): + self._run_selftest_with_suite(matching_tests_first=True, + pattern="test_self_ref$") + + def test_starting_with_and_exclude(self): + out = self._run_selftest_with_suite(starting_with=["bt."], + exclude_pattern="test_skip$") + self.assertNotContainsRe(out, "test_skip") + + def test_additonal_decorator(self): + out = self._run_selftest_with_suite( + suite_decorators=[tests.TestDecorator]) + + +class TestUncollectedWarningsSubunit(TestUncollectedWarnings): + """Check warnings from tests staying alive are emitted with subunit""" + + _test_needs_features = [features.subunit] + + def _run_selftest_with_suite(self, **kwargs): + return TestUncollectedWarnings._run_selftest_with_suite(self, + runner_class=tests.SubUnitBzrRunner, **kwargs) + + +class TestUncollectedWarningsForked(_ForkedSelftest, TestUncollectedWarnings): + """Check warnings from tests staying alive are emitted when forking""" + + +class TestEnvironHandling(tests.TestCase): + + def test_overrideEnv_None_called_twice_doesnt_leak(self): + self.assertFalse('MYVAR' in os.environ) + self.overrideEnv('MYVAR', '42') + # We use an embedded test to make sure we fix the _captureVar bug + class Test(tests.TestCase): + def test_me(self): + # The first call save the 42 value + self.overrideEnv('MYVAR', None) + self.assertEquals(None, os.environ.get('MYVAR')) + # Make sure we can call it twice + self.overrideEnv('MYVAR', None) + self.assertEquals(None, os.environ.get('MYVAR')) + output = StringIO() + result = tests.TextTestResult(output, 0, 1) + Test('test_me').run(result) + if not result.wasStrictlySuccessful(): + self.fail(output.getvalue()) + # We get our value back + self.assertEquals('42', os.environ.get('MYVAR')) + + +class TestIsolatedEnv(tests.TestCase): + """Test isolating tests from os.environ. + + Since we use tests that are already isolated from os.environ a bit of care + should be taken when designing the tests to avoid bootstrap side-effects. + The tests start an already clean os.environ which allow doing valid + assertions about which variables are present or not and design tests around + these assertions. + """ + + class ScratchMonkey(tests.TestCase): + + def test_me(self): + pass + + def test_basics(self): + # Make sure we know the definition of BZR_HOME: not part of os.environ + # for tests.TestCase. + self.assertTrue('BZR_HOME' in tests.isolated_environ) + self.assertEquals(None, tests.isolated_environ['BZR_HOME']) + # Being part of isolated_environ, BZR_HOME should not appear here + self.assertFalse('BZR_HOME' in os.environ) + # Make sure we know the definition of LINES: part of os.environ for + # tests.TestCase + self.assertTrue('LINES' in tests.isolated_environ) + self.assertEquals('25', tests.isolated_environ['LINES']) + self.assertEquals('25', os.environ['LINES']) + + def test_injecting_unknown_variable(self): + # BZR_HOME is known to be absent from os.environ + test = self.ScratchMonkey('test_me') + tests.override_os_environ(test, {'BZR_HOME': 'foo'}) + self.assertEquals('foo', os.environ['BZR_HOME']) + tests.restore_os_environ(test) + self.assertFalse('BZR_HOME' in os.environ) + + def test_injecting_known_variable(self): + test = self.ScratchMonkey('test_me') + # LINES is known to be present in os.environ + tests.override_os_environ(test, {'LINES': '42'}) + self.assertEquals('42', os.environ['LINES']) + tests.restore_os_environ(test) + self.assertEquals('25', os.environ['LINES']) + + def test_deleting_variable(self): + test = self.ScratchMonkey('test_me') + # LINES is known to be present in os.environ + tests.override_os_environ(test, {'LINES': None}) + self.assertTrue('LINES' not in os.environ) + tests.restore_os_environ(test) + self.assertEquals('25', os.environ['LINES']) + + +class TestDocTestSuiteIsolation(tests.TestCase): + """Test that `tests.DocTestSuite` isolates doc tests from os.environ. + + Since tests.TestCase alreay provides an isolation from os.environ, we use + the clean environment as a base for testing. To precisely capture the + isolation provided by tests.DocTestSuite, we use doctest.DocTestSuite to + compare against. + + We want to make sure `tests.DocTestSuite` respect `tests.isolated_environ`, + not `os.environ` so each test overrides it to suit its needs. + + """ + + def get_doctest_suite_for_string(self, klass, string): + class Finder(doctest.DocTestFinder): + + def find(*args, **kwargs): + test = doctest.DocTestParser().get_doctest( + string, {}, 'foo', 'foo.py', 0) + return [test] + + suite = klass(test_finder=Finder()) + return suite + + def run_doctest_suite_for_string(self, klass, string): + suite = self.get_doctest_suite_for_string(klass, string) + output = StringIO() + result = tests.TextTestResult(output, 0, 1) + suite.run(result) + return result, output + + def assertDocTestStringSucceds(self, klass, string): + result, output = self.run_doctest_suite_for_string(klass, string) + if not result.wasStrictlySuccessful(): + self.fail(output.getvalue()) + + def assertDocTestStringFails(self, klass, string): + result, output = self.run_doctest_suite_for_string(klass, string) + if result.wasStrictlySuccessful(): + self.fail(output.getvalue()) + + def test_injected_variable(self): + self.overrideAttr(tests, 'isolated_environ', {'LINES': '42'}) + test = """ + >>> import os + >>> os.environ['LINES'] + '42' + """ + # doctest.DocTestSuite fails as it sees '25' + self.assertDocTestStringFails(doctest.DocTestSuite, test) + # tests.DocTestSuite sees '42' + self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test) + + def test_deleted_variable(self): + self.overrideAttr(tests, 'isolated_environ', {'LINES': None}) + test = """ + >>> import os + >>> os.environ.get('LINES') + """ + # doctest.DocTestSuite fails as it sees '25' + self.assertDocTestStringFails(doctest.DocTestSuite, test) + # tests.DocTestSuite sees None + self.assertDocTestStringSucceds(tests.IsolatedDocTestSuite, test) + + +class TestSelftestExcludePatterns(tests.TestCase): + + def setUp(self): + super(TestSelftestExcludePatterns, self).setUp() + self.overrideAttr(tests, 'test_suite', self.suite_factory) + + def suite_factory(self, keep_only=None, starting_with=None): + """A test suite factory with only a few tests.""" + class Test(tests.TestCase): + def id(self): + # We don't need the full class path + return self._testMethodName + def a(self): + pass + def b(self): + pass + def c(self): + pass + return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")]) + + def assertTestList(self, expected, *selftest_args): + # We rely on setUp installing the right test suite factory so we can + # test at the command level without loading the whole test suite + out, err = self.run_bzr(('selftest', '--list') + selftest_args) + actual = out.splitlines() + self.assertEquals(expected, actual) + + def test_full_list(self): + self.assertTestList(['a', 'b', 'c']) + + def test_single_exclude(self): + self.assertTestList(['b', 'c'], '-x', 'a') + + def test_mutiple_excludes(self): + self.assertTestList(['c'], '-x', 'a', '-x', 'b') + + +class TestCounterHooks(tests.TestCase, SelfTestHelper): + + _test_needs_features = [features.subunit] + + def setUp(self): + super(TestCounterHooks, self).setUp() + class Test(tests.TestCase): + + def setUp(self): + super(Test, self).setUp() + self.hooks = hooks.Hooks() + self.hooks.add_hook('myhook', 'Foo bar blah', (2,4)) + self.install_counter_hook(self.hooks, 'myhook') + + def no_hook(self): + pass + + def run_hook_once(self): + for hook in self.hooks['myhook']: + hook(self) + + self.test_class = Test + + def assertHookCalls(self, expected_calls, test_name): + test = self.test_class(test_name) + result = unittest.TestResult() + test.run(result) + self.assertTrue(hasattr(test, '_counters')) + self.assertTrue(test._counters.has_key('myhook')) + self.assertEquals(expected_calls, test._counters['myhook']) + + def test_no_hook(self): + self.assertHookCalls(0, 'no_hook') + + def test_run_hook_once(self): + tt = features.testtools + if tt.module.__version__ < (0, 9, 8): + raise tests.TestSkipped('testtools-0.9.8 required for addDetail') + self.assertHookCalls(1, 'run_hook_once') |