summaryrefslogtreecommitdiff
path: root/bzrlib/url_policy_open.py
diff options
context:
space:
mode:
Diffstat (limited to 'bzrlib/url_policy_open.py')
-rw-r--r--bzrlib/url_policy_open.py314
1 files changed, 314 insertions, 0 deletions
diff --git a/bzrlib/url_policy_open.py b/bzrlib/url_policy_open.py
new file mode 100644
index 0000000..88dfde0
--- /dev/null
+++ b/bzrlib/url_policy_open.py
@@ -0,0 +1,314 @@
+# Copyright (C) 2011 Canonical Ltd
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+"""Branch opening with URL-based restrictions."""
+
+from __future__ import absolute_import
+
+import threading
+
+from bzrlib import (
+ errors,
+ urlutils,
+ )
+from bzrlib.branch import Branch
+from bzrlib.controldir import (
+ ControlDir,
+ )
+
+
+class BadUrl(errors.BzrError):
+
+ _fmt = "Tried to access a branch from bad URL %(url)s."
+
+
+class BranchReferenceForbidden(errors.BzrError):
+
+ _fmt = ("Trying to mirror a branch reference and the branch type "
+ "does not allow references.")
+
+
+class BranchLoopError(errors.BzrError):
+ """Encountered a branch cycle.
+
+ A URL may point to a branch reference or it may point to a stacked branch.
+ In either case, it's possible for there to be a cycle in these references,
+ and this exception is raised when we detect such a cycle.
+ """
+
+ _fmt = "Encountered a branch cycle"""
+
+
+class BranchOpenPolicy(object):
+ """Policy on how to open branches.
+
+ In particular, a policy determines which branches are okay to open by
+ checking their URLs and deciding whether or not to follow branch
+ references.
+ """
+
+ def should_follow_references(self):
+ """Whether we traverse references when mirroring.
+
+ Subclasses must override this method.
+
+ If we encounter a branch reference and this returns false, an error is
+ raised.
+
+ :returns: A boolean to indicate whether to follow a branch reference.
+ """
+ raise NotImplementedError(self.should_follow_references)
+
+ def transform_fallback_location(self, branch, url):
+ """Validate, maybe modify, 'url' to be used as a stacked-on location.
+
+ :param branch: The branch that is being opened.
+ :param url: The URL that the branch provides for its stacked-on
+ location.
+ :return: (new_url, check) where 'new_url' is the URL of the branch to
+ actually open and 'check' is true if 'new_url' needs to be
+ validated by check_and_follow_branch_reference.
+ """
+ raise NotImplementedError(self.transform_fallback_location)
+
+ def check_one_url(self, url):
+ """Check a URL.
+
+ Subclasses must override this method.
+
+ :param url: The source URL to check.
+ :raise BadUrl: subclasses are expected to raise this or a subclass
+ when it finds a URL it deems to be unacceptable.
+ """
+ raise NotImplementedError(self.check_one_url)
+
+
+class _BlacklistPolicy(BranchOpenPolicy):
+ """Branch policy that forbids certain URLs.
+
+ This doesn't cope with various alternative spellings of URLs,
+ with e.g. url encoding. It's mostly useful for tests.
+ """
+
+ def __init__(self, should_follow_references, bad_urls=None):
+ if bad_urls is None:
+ bad_urls = set()
+ self._bad_urls = bad_urls
+ self._should_follow_references = should_follow_references
+
+ def should_follow_references(self):
+ return self._should_follow_references
+
+ def check_one_url(self, url):
+ if url in self._bad_urls:
+ raise BadUrl(url)
+
+ def transform_fallback_location(self, branch, url):
+ """See `BranchOpenPolicy.transform_fallback_location`.
+
+ This class is not used for testing our smarter stacking features so we
+ just do the simplest thing: return the URL that would be used anyway
+ and don't check it.
+ """
+ return urlutils.join(branch.base, url), False
+
+
+class AcceptAnythingPolicy(_BlacklistPolicy):
+ """Accept anything, to make testing easier."""
+
+ def __init__(self):
+ super(AcceptAnythingPolicy, self).__init__(True, set())
+
+
+class WhitelistPolicy(BranchOpenPolicy):
+ """Branch policy that only allows certain URLs."""
+
+ def __init__(self, should_follow_references, allowed_urls=None,
+ check=False):
+ if allowed_urls is None:
+ allowed_urls = []
+ self.allowed_urls = set(url.rstrip('/') for url in allowed_urls)
+ self.check = check
+
+ def should_follow_references(self):
+ return self._should_follow_references
+
+ def check_one_url(self, url):
+ if url.rstrip('/') not in self.allowed_urls:
+ raise BadUrl(url)
+
+ def transform_fallback_location(self, branch, url):
+ """See `BranchOpenPolicy.transform_fallback_location`.
+
+ Here we return the URL that would be used anyway and optionally check
+ it.
+ """
+ return urlutils.join(branch.base, url), self.check
+
+
+class SingleSchemePolicy(BranchOpenPolicy):
+ """Branch open policy that rejects URLs not on the given scheme."""
+
+ def __init__(self, allowed_scheme):
+ self.allowed_scheme = allowed_scheme
+
+ def should_follow_references(self):
+ return True
+
+ def transform_fallback_location(self, branch, url):
+ return urlutils.join(branch.base, url), True
+
+ def check_one_url(self, url):
+ """Check that `url` is okay to open."""
+ if urlutils.URL.from_string(str(url)).scheme != self.allowed_scheme:
+ raise BadUrl(url)
+
+
+class BranchOpener(object):
+ """Branch opener which uses a URL policy.
+
+ All locations that are opened (stacked-on branches, references) are
+ checked against a policy object.
+
+ The policy object is expected to have the following methods:
+ * check_one_url
+ * should_follow_references
+ * transform_fallback_location
+ """
+
+ _threading_data = threading.local()
+
+ def __init__(self, policy, probers=None):
+ """Create a new BranchOpener.
+
+ :param policy: The opener policy to use.
+ :param probers: Optional list of probers to allow.
+ Defaults to local and remote bzr probers.
+ """
+ self.policy = policy
+ self._seen_urls = set()
+ self.probers = probers
+
+ @classmethod
+ def install_hook(cls):
+ """Install the ``transform_fallback_location`` hook.
+
+ This is done at module import time, but transform_fallback_locationHook
+ doesn't do anything unless the `_active_openers` threading.Local
+ object has a 'opener' attribute in this thread.
+
+ This is in a module-level function rather than performed at module
+ level so that it can be called in setUp for testing `BranchOpener`
+ as bzrlib.tests.TestCase.setUp clears hooks.
+ """
+ Branch.hooks.install_named_hook(
+ 'transform_fallback_location',
+ cls.transform_fallback_locationHook,
+ 'BranchOpener.transform_fallback_locationHook')
+
+ def check_and_follow_branch_reference(self, url):
+ """Check URL (and possibly the referenced URL).
+
+ This method checks that `url` passes the policy's `check_one_url`
+ method, and if `url` refers to a branch reference, it checks whether
+ references are allowed and whether the reference's URL passes muster
+ also -- recursively, until a real branch is found.
+
+ :param url: URL to check
+ :raise BranchLoopError: If the branch references form a loop.
+ :raise BranchReferenceForbidden: If this opener forbids branch
+ references.
+ """
+ while True:
+ if url in self._seen_urls:
+ raise BranchLoopError()
+ self._seen_urls.add(url)
+ self.policy.check_one_url(url)
+ next_url = self.follow_reference(url)
+ if next_url is None:
+ return url
+ url = next_url
+ if not self.policy.should_follow_references():
+ raise BranchReferenceForbidden(url)
+
+ @classmethod
+ def transform_fallback_locationHook(cls, branch, url):
+ """Installed as the 'transform_fallback_location' Branch hook.
+
+ This method calls `transform_fallback_location` on the policy object and
+ either returns the url it provides or passes it back to
+ check_and_follow_branch_reference.
+ """
+ try:
+ opener = getattr(cls._threading_data, "opener")
+ except AttributeError:
+ return url
+ new_url, check = opener.policy.transform_fallback_location(branch, url)
+ if check:
+ return opener.check_and_follow_branch_reference(new_url)
+ else:
+ return new_url
+
+ def run_with_transform_fallback_location_hook_installed(
+ self, callable, *args, **kw):
+ if (self.transform_fallback_locationHook not in
+ Branch.hooks['transform_fallback_location']):
+ raise AssertionError("hook not installed")
+ self._threading_data.opener = self
+ try:
+ return callable(*args, **kw)
+ finally:
+ del self._threading_data.opener
+ # We reset _seen_urls here to avoid multiple calls to open giving
+ # spurious loop exceptions.
+ self._seen_urls = set()
+
+ def follow_reference(self, url):
+ """Get the branch-reference value at the specified url.
+
+ This exists as a separate method only to be overriden in unit tests.
+ """
+ bzrdir = ControlDir.open(url, probers=self.probers)
+ return bzrdir.get_branch_reference()
+
+ def open(self, url):
+ """Open the Bazaar branch at url, first checking it.
+
+ What is acceptable means is defined by the policy's `follow_reference` and
+ `check_one_url` methods.
+ """
+ if type(url) != str:
+ raise TypeError
+
+ url = self.check_and_follow_branch_reference(url)
+
+ def open_branch(url):
+ dir = ControlDir.open(url, probers=self.probers)
+ return dir.open_branch()
+ return self.run_with_transform_fallback_location_hook_installed(
+ open_branch, url)
+
+
+def open_only_scheme(allowed_scheme, url):
+ """Open the branch at `url`, only accessing URLs on `allowed_scheme`.
+
+ :raises BadUrl: An attempt was made to open a URL that was not on
+ `allowed_scheme`.
+ """
+ return BranchOpener(SingleSchemePolicy(allowed_scheme)).open(url)
+
+
+BranchOpener.install_hook()