summaryrefslogtreecommitdiff
path: root/bzrlib/tests/test_url_policy_open.py
diff options
context:
space:
mode:
Diffstat (limited to 'bzrlib/tests/test_url_policy_open.py')
-rw-r--r--bzrlib/tests/test_url_policy_open.py359
1 files changed, 359 insertions, 0 deletions
diff --git a/bzrlib/tests/test_url_policy_open.py b/bzrlib/tests/test_url_policy_open.py
new file mode 100644
index 0000000..9bad00f
--- /dev/null
+++ b/bzrlib/tests/test_url_policy_open.py
@@ -0,0 +1,359 @@
+# Copyright (C) 2011 Canonical Ltd
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+"""Tests for the branch open with specific URL policy code."""
+
+from bzrlib import urlutils
+from bzrlib.branch import (
+ Branch,
+ BranchReferenceFormat,
+ )
+from bzrlib.bzrdir import (
+ BzrProber,
+ )
+from bzrlib.controldir import (
+ ControlDir,
+ ControlDirFormat,
+ )
+from bzrlib.errors import NotBranchError
+from bzrlib.url_policy_open import (
+ BadUrl,
+ _BlacklistPolicy,
+ BranchLoopError,
+ BranchReferenceForbidden,
+ open_only_scheme,
+ BranchOpener,
+ WhitelistPolicy,
+ )
+from bzrlib.tests import (
+ TestCase,
+ TestCaseWithTransport,
+ )
+from bzrlib.transport import chroot
+
+
+class TestBranchOpenerCheckAndFollowBranchReference(TestCase):
+ """Unit tests for `BranchOpener.check_and_follow_branch_reference`."""
+
+ def setUp(self):
+ super(TestBranchOpenerCheckAndFollowBranchReference, self).setUp()
+ BranchOpener.install_hook()
+
+ class StubbedBranchOpener(BranchOpener):
+ """BranchOpener that provides canned answers.
+
+ We implement the methods we need to to be able to control all the
+ inputs to the `follow_reference` method, which is what is
+ being tested in this class.
+ """
+
+ def __init__(self, references, policy):
+ parent_cls = TestBranchOpenerCheckAndFollowBranchReference
+ super(parent_cls.StubbedBranchOpener, self).__init__(policy)
+ self._reference_values = {}
+ for i in range(len(references) - 1):
+ self._reference_values[references[i]] = references[i + 1]
+ self.follow_reference_calls = []
+
+ def follow_reference(self, url):
+ self.follow_reference_calls.append(url)
+ return self._reference_values[url]
+
+ def make_branch_opener(self, should_follow_references, references,
+ unsafe_urls=None):
+ policy = _BlacklistPolicy(should_follow_references, unsafe_urls)
+ opener = self.StubbedBranchOpener(references, policy)
+ return opener
+
+ def test_check_initial_url(self):
+ # check_and_follow_branch_reference rejects all URLs that are not
+ # allowed.
+ opener = self.make_branch_opener(None, [], set(['a']))
+ self.assertRaises(
+ BadUrl, opener.check_and_follow_branch_reference, 'a')
+
+ def test_not_reference(self):
+ # When branch references are forbidden, check_and_follow_branch_reference
+ # does not raise on non-references.
+ opener = self.make_branch_opener(False, ['a', None])
+ self.assertEquals(
+ 'a', opener.check_and_follow_branch_reference('a'))
+ self.assertEquals(['a'], opener.follow_reference_calls)
+
+ def test_branch_reference_forbidden(self):
+ # check_and_follow_branch_reference raises BranchReferenceForbidden if
+ # branch references are forbidden and the source URL points to a
+ # branch reference.
+ opener = self.make_branch_opener(False, ['a', 'b'])
+ self.assertRaises(
+ BranchReferenceForbidden,
+ opener.check_and_follow_branch_reference, 'a')
+ self.assertEquals(['a'], opener.follow_reference_calls)
+
+ def test_allowed_reference(self):
+ # check_and_follow_branch_reference does not raise if following references
+ # is allowed and the source URL points to a branch reference to a
+ # permitted location.
+ opener = self.make_branch_opener(True, ['a', 'b', None])
+ self.assertEquals(
+ 'b', opener.check_and_follow_branch_reference('a'))
+ self.assertEquals(['a', 'b'], opener.follow_reference_calls)
+
+ def test_check_referenced_urls(self):
+ # check_and_follow_branch_reference checks if the URL a reference points
+ # to is safe.
+ opener = self.make_branch_opener(
+ True, ['a', 'b', None], unsafe_urls=set('b'))
+ self.assertRaises(
+ BadUrl, opener.check_and_follow_branch_reference, 'a')
+ self.assertEquals(['a'], opener.follow_reference_calls)
+
+ def test_self_referencing_branch(self):
+ # check_and_follow_branch_reference raises BranchReferenceLoopError if
+ # following references is allowed and the source url points to a
+ # self-referencing branch reference.
+ opener = self.make_branch_opener(True, ['a', 'a'])
+ self.assertRaises(
+ BranchLoopError, opener.check_and_follow_branch_reference, 'a')
+ self.assertEquals(['a'], opener.follow_reference_calls)
+
+ def test_branch_reference_loop(self):
+ # check_and_follow_branch_reference raises BranchReferenceLoopError if
+ # following references is allowed and the source url points to a loop
+ # of branch references.
+ references = ['a', 'b', 'a']
+ opener = self.make_branch_opener(True, references)
+ self.assertRaises(
+ BranchLoopError, opener.check_and_follow_branch_reference, 'a')
+ self.assertEquals(['a', 'b'], opener.follow_reference_calls)
+
+
+class TrackingProber(BzrProber):
+ """Subclass of BzrProber which tracks URLs it has been asked to open."""
+
+ seen_urls = []
+
+ @classmethod
+ def probe_transport(klass, transport):
+ klass.seen_urls.append(transport.base)
+ return BzrProber.probe_transport(transport)
+
+
+class TestBranchOpenerStacking(TestCaseWithTransport):
+
+ def setUp(self):
+ super(TestBranchOpenerStacking, self).setUp()
+ BranchOpener.install_hook()
+
+ def make_branch_opener(self, allowed_urls, probers=None):
+ policy = WhitelistPolicy(True, allowed_urls, True)
+ return BranchOpener(policy, probers)
+
+ def test_probers(self):
+ # Only the specified probers should be used
+ b = self.make_branch('branch')
+ opener = self.make_branch_opener([b.base], probers=[])
+ self.assertRaises(NotBranchError, opener.open, b.base)
+ opener = self.make_branch_opener([b.base], probers=[BzrProber])
+ self.assertEquals(b.base, opener.open(b.base).base)
+
+ def test_default_probers(self):
+ # If no probers are specified to the constructor
+ # of BranchOpener, then a safe set will be used,
+ # rather than all probers registered in bzr.
+ self.addCleanup(ControlDirFormat.unregister_prober, TrackingProber)
+ ControlDirFormat.register_prober(TrackingProber)
+ # Open a location without any branches, so that all probers are
+ # tried.
+ # First, check that the TrackingProber tracks correctly.
+ TrackingProber.seen_urls = []
+ opener = self.make_branch_opener(["."], probers=[TrackingProber])
+ self.assertRaises(NotBranchError, opener.open, ".")
+ self.assertEquals(1, len(TrackingProber.seen_urls))
+ TrackingProber.seen_urls = []
+ # And make sure it's registered in such a way that ControlDir.open would
+ # use it.
+ self.assertRaises(NotBranchError, ControlDir.open, ".")
+ self.assertEquals(1, len(TrackingProber.seen_urls))
+
+ def test_allowed_url(self):
+ # the opener does not raise an exception for branches stacked on
+ # branches with allowed URLs.
+ stacked_on_branch = self.make_branch('base-branch', format='1.6')
+ stacked_branch = self.make_branch('stacked-branch', format='1.6')
+ stacked_branch.set_stacked_on_url(stacked_on_branch.base)
+ opener = self.make_branch_opener(
+ [stacked_branch.base, stacked_on_branch.base])
+ # This doesn't raise an exception.
+ opener.open(stacked_branch.base)
+
+ def test_nstackable_repository(self):
+ # treats branches with UnstackableRepositoryFormats as
+ # being not stacked.
+ branch = self.make_branch('unstacked', format='knit')
+ opener = self.make_branch_opener([branch.base])
+ # This doesn't raise an exception.
+ opener.open(branch.base)
+
+ def test_allowed_relative_url(self):
+ # passes on absolute urls to check_one_url, even if the
+ # value of stacked_on_location in the config is set to a relative URL.
+ stacked_on_branch = self.make_branch('base-branch', format='1.6')
+ stacked_branch = self.make_branch('stacked-branch', format='1.6')
+ stacked_branch.set_stacked_on_url('../base-branch')
+ opener = self.make_branch_opener(
+ [stacked_branch.base, stacked_on_branch.base])
+ # Note that stacked_on_branch.base is not '../base-branch', it's an
+ # absolute URL.
+ self.assertNotEqual('../base-branch', stacked_on_branch.base)
+ # This doesn't raise an exception.
+ opener.open(stacked_branch.base)
+
+ def test_allowed_relative_nested(self):
+ # Relative URLs are resolved relative to the stacked branch.
+ self.get_transport().mkdir('subdir')
+ a = self.make_branch('subdir/a', format='1.6')
+ b = self.make_branch('b', format='1.6')
+ b.set_stacked_on_url('../subdir/a')
+ c = self.make_branch('subdir/c', format='1.6')
+ c.set_stacked_on_url('../../b')
+ opener = self.make_branch_opener([c.base, b.base, a.base])
+ # This doesn't raise an exception.
+ opener.open(c.base)
+
+ def test_forbidden_url(self):
+ # raises a BadUrl exception if a branch is stacked on a
+ # branch with a forbidden URL.
+ stacked_on_branch = self.make_branch('base-branch', format='1.6')
+ stacked_branch = self.make_branch('stacked-branch', format='1.6')
+ stacked_branch.set_stacked_on_url(stacked_on_branch.base)
+ opener = self.make_branch_opener([stacked_branch.base])
+ self.assertRaises(BadUrl, opener.open, stacked_branch.base)
+
+ def test_forbidden_url_nested(self):
+ # raises a BadUrl exception if a branch is stacked on a
+ # branch that is in turn stacked on a branch with a forbidden URL.
+ a = self.make_branch('a', format='1.6')
+ b = self.make_branch('b', format='1.6')
+ b.set_stacked_on_url(a.base)
+ c = self.make_branch('c', format='1.6')
+ c.set_stacked_on_url(b.base)
+ opener = self.make_branch_opener([c.base, b.base])
+ self.assertRaises(BadUrl, opener.open, c.base)
+
+ def test_self_stacked_branch(self):
+ # raises StackingLoopError if a branch is stacked on
+ # itself. This avoids infinite recursion errors.
+ a = self.make_branch('a', format='1.6')
+ # Bazaar 1.17 and up make it harder to create branches like this.
+ # It's still worth testing that we don't blow up in the face of them,
+ # so we grovel around a bit to create one anyway.
+ a.get_config().set_user_option('stacked_on_location', a.base)
+ opener = self.make_branch_opener([a.base])
+ self.assertRaises(BranchLoopError, opener.open, a.base)
+
+ def test_loop_stacked_branch(self):
+ # raises StackingLoopError if a branch is stacked in such
+ # a way so that it is ultimately stacked on itself. e.g. a stacked on
+ # b stacked on a.
+ a = self.make_branch('a', format='1.6')
+ b = self.make_branch('b', format='1.6')
+ a.set_stacked_on_url(b.base)
+ b.set_stacked_on_url(a.base)
+ opener = self.make_branch_opener([a.base, b.base])
+ self.assertRaises(BranchLoopError, opener.open, a.base)
+ self.assertRaises(BranchLoopError, opener.open, b.base)
+
+ def test_custom_opener(self):
+ # A custom function for opening a control dir can be specified.
+ a = self.make_branch('a', format='2a')
+ b = self.make_branch('b', format='2a')
+ b.set_stacked_on_url(a.base)
+
+ TrackingProber.seen_urls = []
+ opener = self.make_branch_opener(
+ [a.base, b.base], probers=[TrackingProber])
+ opener.open(b.base)
+ self.assertEquals(
+ set(TrackingProber.seen_urls), set([b.base, a.base]))
+
+ def test_custom_opener_with_branch_reference(self):
+ # A custom function for opening a control dir can be specified.
+ a = self.make_branch('a', format='2a')
+ b_dir = self.make_bzrdir('b')
+ b = BranchReferenceFormat().initialize(b_dir, target_branch=a)
+ TrackingProber.seen_urls = []
+ opener = self.make_branch_opener(
+ [a.base, b.base], probers=[TrackingProber])
+ opener.open(b.base)
+ self.assertEquals(
+ set(TrackingProber.seen_urls), set([b.base, a.base]))
+
+
+class TestOpenOnlyScheme(TestCaseWithTransport):
+ """Tests for `open_only_scheme`."""
+
+ def setUp(self):
+ super(TestOpenOnlyScheme, self).setUp()
+ BranchOpener.install_hook()
+
+ def test_hook_does_not_interfere(self):
+ # The transform_fallback_location hook does not interfere with regular
+ # stacked branch access outside of open_only_scheme.
+ self.make_branch('stacked')
+ self.make_branch('stacked-on')
+ Branch.open('stacked').set_stacked_on_url('../stacked-on')
+ Branch.open('stacked')
+
+ def get_chrooted_scheme(self, relpath):
+ """Create a server that is chrooted to `relpath`.
+
+ :return: ``(scheme, get_url)`` where ``scheme`` is the scheme of the
+ chroot server and ``get_url`` returns URLs on said server.
+ """
+ transport = self.get_transport(relpath)
+ chroot_server = chroot.ChrootServer(transport)
+ chroot_server.start_server()
+ self.addCleanup(chroot_server.stop_server)
+
+ def get_url(relpath):
+ return chroot_server.get_url() + relpath
+
+ return urlutils.URL.from_string(chroot_server.get_url()).scheme, get_url
+
+ def test_stacked_within_scheme(self):
+ # A branch that is stacked on a URL of the same scheme is safe to
+ # open.
+ self.get_transport().mkdir('inside')
+ self.make_branch('inside/stacked')
+ self.make_branch('inside/stacked-on')
+ scheme, get_chrooted_url = self.get_chrooted_scheme('inside')
+ Branch.open(get_chrooted_url('stacked')).set_stacked_on_url(
+ get_chrooted_url('stacked-on'))
+ open_only_scheme(scheme, get_chrooted_url('stacked'))
+
+ def test_stacked_outside_scheme(self):
+ # A branch that is stacked on a URL that is not of the same scheme is
+ # not safe to open.
+ self.get_transport().mkdir('inside')
+ self.get_transport().mkdir('outside')
+ self.make_branch('inside/stacked')
+ self.make_branch('outside/stacked-on')
+ scheme, get_chrooted_url = self.get_chrooted_scheme('inside')
+ Branch.open(get_chrooted_url('stacked')).set_stacked_on_url(
+ self.get_url('outside/stacked-on'))
+ self.assertRaises(
+ BadUrl, open_only_scheme, scheme, get_chrooted_url('stacked'))