diff options
Diffstat (limited to 'bzrlib/fetch.py')
-rw-r--r-- | bzrlib/fetch.py | 430 |
1 files changed, 430 insertions, 0 deletions
diff --git a/bzrlib/fetch.py b/bzrlib/fetch.py new file mode 100644 index 0000000..8459d01 --- /dev/null +++ b/bzrlib/fetch.py @@ -0,0 +1,430 @@ +# Copyright (C) 2005-2011 Canonical Ltd +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +"""Copying of history from one branch to another. + +The basic plan is that every branch knows the history of everything +that has merged into it. As the first step of a merge, pull, or +branch operation we copy history from the source into the destination +branch. +""" + +from __future__ import absolute_import + +import operator + +from bzrlib.lazy_import import lazy_import +lazy_import(globals(), """ +from bzrlib import ( + tsort, + versionedfile, + vf_search, + ) +""") +from bzrlib import ( + errors, + ui, + ) +from bzrlib.i18n import gettext +from bzrlib.revision import NULL_REVISION +from bzrlib.trace import mutter + + +class RepoFetcher(object): + """Pull revisions and texts from one repository to another. + + This should not be used directly, it's essential a object to encapsulate + the logic in InterRepository.fetch(). + """ + + def __init__(self, to_repository, from_repository, last_revision=None, + find_ghosts=True, fetch_spec=None): + """Create a repo fetcher. + + :param last_revision: If set, try to limit to the data this revision + references. + :param fetch_spec: A SearchResult specifying which revisions to fetch. + If set, this overrides last_revision. + :param find_ghosts: If True search the entire history for ghosts. + """ + # repository.fetch has the responsibility for short-circuiting + # attempts to copy between a repository and itself. + self.to_repository = to_repository + self.from_repository = from_repository + self.sink = to_repository._get_sink() + # must not mutate self._last_revision as its potentially a shared instance + self._last_revision = last_revision + self._fetch_spec = fetch_spec + self.find_ghosts = find_ghosts + self.from_repository.lock_read() + mutter("Using fetch logic to copy between %s(%s) and %s(%s)", + self.from_repository, self.from_repository._format, + self.to_repository, self.to_repository._format) + try: + self.__fetch() + finally: + self.from_repository.unlock() + + def __fetch(self): + """Primary worker function. + + This initialises all the needed variables, and then fetches the + requested revisions, finally clearing the progress bar. + """ + # Roughly this is what we're aiming for fetch to become: + # + # missing = self.sink.insert_stream(self.source.get_stream(search)) + # if missing: + # missing = self.sink.insert_stream(self.source.get_items(missing)) + # assert not missing + self.count_total = 0 + self.file_ids_names = {} + pb = ui.ui_factory.nested_progress_bar() + pb.show_pct = pb.show_count = False + try: + pb.update(gettext("Finding revisions"), 0, 2) + search_result = self._revids_to_fetch() + mutter('fetching: %s', search_result) + if search_result.is_empty(): + return + pb.update(gettext("Fetching revisions"), 1, 2) + self._fetch_everything_for_search(search_result) + finally: + pb.finished() + + def _fetch_everything_for_search(self, search): + """Fetch all data for the given set of revisions.""" + # The first phase is "file". We pass the progress bar for it directly + # into item_keys_introduced_by, which has more information about how + # that phase is progressing than we do. Progress updates for the other + # phases are taken care of in this function. + # XXX: there should be a clear owner of the progress reporting. Perhaps + # item_keys_introduced_by should have a richer API than it does at the + # moment, so that it can feed the progress information back to this + # function? + if (self.from_repository._format.rich_root_data and + not self.to_repository._format.rich_root_data): + raise errors.IncompatibleRepositories( + self.from_repository, self.to_repository, + "different rich-root support") + pb = ui.ui_factory.nested_progress_bar() + try: + pb.update("Get stream source") + source = self.from_repository._get_source( + self.to_repository._format) + stream = source.get_stream(search) + from_format = self.from_repository._format + pb.update("Inserting stream") + resume_tokens, missing_keys = self.sink.insert_stream( + stream, from_format, []) + if missing_keys: + pb.update("Missing keys") + stream = source.get_stream_for_missing_keys(missing_keys) + pb.update("Inserting missing keys") + resume_tokens, missing_keys = self.sink.insert_stream( + stream, from_format, resume_tokens) + if missing_keys: + raise AssertionError( + "second push failed to complete a fetch %r." % ( + missing_keys,)) + if resume_tokens: + raise AssertionError( + "second push failed to commit the fetch %r." % ( + resume_tokens,)) + pb.update("Finishing stream") + self.sink.finished() + finally: + pb.finished() + + def _revids_to_fetch(self): + """Determines the exact revisions needed from self.from_repository to + install self._last_revision in self.to_repository. + + :returns: A SearchResult of some sort. (Possibly a + PendingAncestryResult, EmptySearchResult, etc.) + """ + if self._fetch_spec is not None: + # The fetch spec is already a concrete search result. + return self._fetch_spec + elif self._last_revision == NULL_REVISION: + # fetch_spec is None + last_revision is null => empty fetch. + # explicit limit of no revisions needed + return vf_search.EmptySearchResult() + elif self._last_revision is not None: + return vf_search.NotInOtherForRevs(self.to_repository, + self.from_repository, [self._last_revision], + find_ghosts=self.find_ghosts).execute() + else: # self._last_revision is None: + return vf_search.EverythingNotInOther(self.to_repository, + self.from_repository, + find_ghosts=self.find_ghosts).execute() + + +class Inter1and2Helper(object): + """Helper for operations that convert data from model 1 and 2 + + This is for use by fetchers and converters. + """ + + # This is a class variable so that the test suite can override it. + known_graph_threshold = 100 + + def __init__(self, source): + """Constructor. + + :param source: The repository data comes from + """ + self.source = source + + def iter_rev_trees(self, revs): + """Iterate through RevisionTrees efficiently. + + Additionally, the inventory's revision_id is set if unset. + + Trees are retrieved in batches of 100, and then yielded in the order + they were requested. + + :param revs: A list of revision ids + """ + # In case that revs is not a list. + revs = list(revs) + while revs: + for tree in self.source.revision_trees(revs[:100]): + if tree.root_inventory.revision_id is None: + tree.root_inventory.revision_id = tree.get_revision_id() + yield tree + revs = revs[100:] + + def _find_root_ids(self, revs, parent_map, graph): + revision_root = {} + for tree in self.iter_rev_trees(revs): + root_id = tree.get_root_id() + revision_id = tree.get_file_revision(root_id, u"") + revision_root[revision_id] = root_id + # Find out which parents we don't already know root ids for + parents = set() + for revision_parents in parent_map.itervalues(): + parents.update(revision_parents) + parents.difference_update(revision_root.keys() + [NULL_REVISION]) + # Limit to revisions present in the versionedfile + parents = graph.get_parent_map(parents).keys() + for tree in self.iter_rev_trees(parents): + root_id = tree.get_root_id() + revision_root[tree.get_revision_id()] = root_id + return revision_root + + def generate_root_texts(self, revs): + """Generate VersionedFiles for all root ids. + + :param revs: the revisions to include + """ + graph = self.source.get_graph() + parent_map = graph.get_parent_map(revs) + rev_order = tsort.topo_sort(parent_map) + rev_id_to_root_id = self._find_root_ids(revs, parent_map, graph) + root_id_order = [(rev_id_to_root_id[rev_id], rev_id) for rev_id in + rev_order] + # Guaranteed stable, this groups all the file id operations together + # retaining topological order within the revisions of a file id. + # File id splits and joins would invalidate this, but they don't exist + # yet, and are unlikely to in non-rich-root environments anyway. + root_id_order.sort(key=operator.itemgetter(0)) + # Create a record stream containing the roots to create. + if len(revs) > self.known_graph_threshold: + graph = self.source.get_known_graph_ancestry(revs) + new_roots_stream = _new_root_data_stream( + root_id_order, rev_id_to_root_id, parent_map, self.source, graph) + return [('texts', new_roots_stream)] + + +def _new_root_data_stream( + root_keys_to_create, rev_id_to_root_id_map, parent_map, repo, graph=None): + """Generate a texts substream of synthesised root entries. + + Used in fetches that do rich-root upgrades. + + :param root_keys_to_create: iterable of (root_id, rev_id) pairs describing + the root entries to create. + :param rev_id_to_root_id_map: dict of known rev_id -> root_id mappings for + calculating the parents. If a parent rev_id is not found here then it + will be recalculated. + :param parent_map: a parent map for all the revisions in + root_keys_to_create. + :param graph: a graph to use instead of repo.get_graph(). + """ + for root_key in root_keys_to_create: + root_id, rev_id = root_key + parent_keys = _parent_keys_for_root_version( + root_id, rev_id, rev_id_to_root_id_map, parent_map, repo, graph) + yield versionedfile.FulltextContentFactory( + root_key, parent_keys, None, '') + + +def _parent_keys_for_root_version( + root_id, rev_id, rev_id_to_root_id_map, parent_map, repo, graph=None): + """Get the parent keys for a given root id. + + A helper function for _new_root_data_stream. + """ + # Include direct parents of the revision, but only if they used the same + # root_id and are heads. + rev_parents = parent_map[rev_id] + parent_ids = [] + for parent_id in rev_parents: + if parent_id == NULL_REVISION: + continue + if parent_id not in rev_id_to_root_id_map: + # We probably didn't read this revision, go spend the extra effort + # to actually check + try: + tree = repo.revision_tree(parent_id) + except errors.NoSuchRevision: + # Ghost, fill out rev_id_to_root_id in case we encounter this + # again. + # But set parent_root_id to None since we don't really know + parent_root_id = None + else: + parent_root_id = tree.get_root_id() + rev_id_to_root_id_map[parent_id] = None + # XXX: why not: + # rev_id_to_root_id_map[parent_id] = parent_root_id + # memory consumption maybe? + else: + parent_root_id = rev_id_to_root_id_map[parent_id] + if root_id == parent_root_id: + # With stacking we _might_ want to refer to a non-local revision, + # but this code path only applies when we have the full content + # available, so ghosts really are ghosts, not just the edge of + # local data. + parent_ids.append(parent_id) + else: + # root_id may be in the parent anyway. + try: + tree = repo.revision_tree(parent_id) + except errors.NoSuchRevision: + # ghost, can't refer to it. + pass + else: + try: + parent_ids.append(tree.get_file_revision(root_id)) + except errors.NoSuchId: + # not in the tree + pass + # Drop non-head parents + if graph is None: + graph = repo.get_graph() + heads = graph.heads(parent_ids) + selected_ids = [] + for parent_id in parent_ids: + if parent_id in heads and parent_id not in selected_ids: + selected_ids.append(parent_id) + parent_keys = [(root_id, parent_id) for parent_id in selected_ids] + return parent_keys + + +class TargetRepoKinds(object): + """An enum-like set of constants. + + They are the possible values of FetchSpecFactory.target_repo_kinds. + """ + + PREEXISTING = 'preexisting' + STACKED = 'stacked' + EMPTY = 'empty' + + +class FetchSpecFactory(object): + """A helper for building the best fetch spec for a sprout call. + + Factors that go into determining the sort of fetch to perform: + * did the caller specify any revision IDs? + * did the caller specify a source branch (need to fetch its + heads_to_fetch(), usually the tip + tags) + * is there an existing target repo (don't need to refetch revs it + already has) + * target is stacked? (similar to pre-existing target repo: even if + the target itself is new don't want to refetch existing revs) + + :ivar source_branch: the source branch if one specified, else None. + :ivar source_branch_stop_revision_id: fetch up to this revision of + source_branch, rather than its tip. + :ivar source_repo: the source repository if one found, else None. + :ivar target_repo: the target repository acquired by sprout. + :ivar target_repo_kind: one of the TargetRepoKinds constants. + """ + + def __init__(self): + self._explicit_rev_ids = set() + self.source_branch = None + self.source_branch_stop_revision_id = None + self.source_repo = None + self.target_repo = None + self.target_repo_kind = None + self.limit = None + + def add_revision_ids(self, revision_ids): + """Add revision_ids to the set of revision_ids to be fetched.""" + self._explicit_rev_ids.update(revision_ids) + + def make_fetch_spec(self): + """Build a SearchResult or PendingAncestryResult or etc.""" + if self.target_repo_kind is None or self.source_repo is None: + raise AssertionError( + 'Incomplete FetchSpecFactory: %r' % (self.__dict__,)) + if len(self._explicit_rev_ids) == 0 and self.source_branch is None: + if self.limit is not None: + raise NotImplementedError( + "limit is only supported with a source branch set") + # Caller hasn't specified any revisions or source branch + if self.target_repo_kind == TargetRepoKinds.EMPTY: + return vf_search.EverythingResult(self.source_repo) + else: + # We want everything not already in the target (or target's + # fallbacks). + return vf_search.EverythingNotInOther( + self.target_repo, self.source_repo).execute() + heads_to_fetch = set(self._explicit_rev_ids) + if self.source_branch is not None: + must_fetch, if_present_fetch = self.source_branch.heads_to_fetch() + if self.source_branch_stop_revision_id is not None: + # Replace the tip rev from must_fetch with the stop revision + # XXX: this might be wrong if the tip rev is also in the + # must_fetch set for other reasons (e.g. it's the tip of + # multiple loom threads?), but then it's pretty unclear what it + # should mean to specify a stop_revision in that case anyway. + must_fetch.discard(self.source_branch.last_revision()) + must_fetch.add(self.source_branch_stop_revision_id) + heads_to_fetch.update(must_fetch) + else: + if_present_fetch = set() + if self.target_repo_kind == TargetRepoKinds.EMPTY: + # PendingAncestryResult does not raise errors if a requested head + # is absent. Ideally it would support the + # required_ids/if_present_ids distinction, but in practice + # heads_to_fetch will almost certainly be present so this doesn't + # matter much. + all_heads = heads_to_fetch.union(if_present_fetch) + ret = vf_search.PendingAncestryResult(all_heads, self.source_repo) + if self.limit is not None: + graph = self.source_repo.get_graph() + topo_order = list(graph.iter_topo_order(ret.get_keys())) + result_set = topo_order[:self.limit] + ret = self.source_repo.revision_ids_to_search_result(result_set) + return ret + else: + return vf_search.NotInOtherForRevs(self.target_repo, self.source_repo, + required_ids=heads_to_fetch, if_present_ids=if_present_fetch, + limit=self.limit).execute() |