summaryrefslogtreecommitdiff
path: root/bzrlib/repofmt
diff options
context:
space:
mode:
authorLorry <lorry@roadtrain.codethink.co.uk>2012-08-22 15:47:16 +0100
committerLorry <lorry@roadtrain.codethink.co.uk>2012-08-22 15:47:16 +0100
commit25335618bf8755ce6b116ee14f47f5a1f2c821e9 (patch)
treed889d7ab3f9f985d0c54c534cb8052bd2e6d7163 /bzrlib/repofmt
downloadbzr-tarball-25335618bf8755ce6b116ee14f47f5a1f2c821e9.tar.gz
Tarball conversion
Diffstat (limited to 'bzrlib/repofmt')
-rw-r--r--bzrlib/repofmt/__init__.py20
-rw-r--r--bzrlib/repofmt/groupcompress_repo.py1426
-rw-r--r--bzrlib/repofmt/knitpack_repo.py1156
-rw-r--r--bzrlib/repofmt/knitrepo.py522
-rw-r--r--bzrlib/repofmt/pack_repo.py2091
5 files changed, 5215 insertions, 0 deletions
diff --git a/bzrlib/repofmt/__init__.py b/bzrlib/repofmt/__init__.py
new file mode 100644
index 0000000..afe2457
--- /dev/null
+++ b/bzrlib/repofmt/__init__.py
@@ -0,0 +1,20 @@
+# Copyright (C) 2007 Canonical Ltd
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+"""Repository formats"""
+
+from __future__ import absolute_import
+
diff --git a/bzrlib/repofmt/groupcompress_repo.py b/bzrlib/repofmt/groupcompress_repo.py
new file mode 100644
index 0000000..3a088f5
--- /dev/null
+++ b/bzrlib/repofmt/groupcompress_repo.py
@@ -0,0 +1,1426 @@
+# Copyright (C) 2008-2011 Canonical Ltd
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+"""Repository formats using CHK inventories and groupcompress compression."""
+
+from __future__ import absolute_import
+
+import time
+
+from bzrlib import (
+ controldir,
+ chk_map,
+ chk_serializer,
+ debug,
+ errors,
+ index as _mod_index,
+ inventory,
+ osutils,
+ pack,
+ revision as _mod_revision,
+ trace,
+ ui,
+ versionedfile,
+ )
+from bzrlib.btree_index import (
+ BTreeGraphIndex,
+ BTreeBuilder,
+ )
+from bzrlib.decorators import needs_write_lock
+from bzrlib.groupcompress import (
+ _GCGraphIndex,
+ GroupCompressVersionedFiles,
+ )
+from bzrlib.repofmt.pack_repo import (
+ _DirectPackAccess,
+ Pack,
+ NewPack,
+ PackRepository,
+ PackRootCommitBuilder,
+ RepositoryPackCollection,
+ RepositoryFormatPack,
+ ResumedPack,
+ Packer,
+ )
+from bzrlib.vf_repository import (
+ StreamSource,
+ )
+from bzrlib.static_tuple import StaticTuple
+
+
+class GCPack(NewPack):
+
+ def __init__(self, pack_collection, upload_suffix='', file_mode=None):
+ """Create a NewPack instance.
+
+ :param pack_collection: A PackCollection into which this is being
+ inserted.
+ :param upload_suffix: An optional suffix to be given to any temporary
+ files created during the pack creation. e.g '.autopack'
+ :param file_mode: An optional file mode to create the new files with.
+ """
+ # replaced from NewPack to:
+ # - change inventory reference list length to 1
+ # - change texts reference lists to 1
+ # TODO: patch this to be parameterised
+
+ # The relative locations of the packs are constrained, but all are
+ # passed in because the caller has them, so as to avoid object churn.
+ index_builder_class = pack_collection._index_builder_class
+ # from brisbane-core
+ if pack_collection.chk_index is not None:
+ chk_index = index_builder_class(reference_lists=0)
+ else:
+ chk_index = None
+ Pack.__init__(self,
+ # Revisions: parents list, no text compression.
+ index_builder_class(reference_lists=1),
+ # Inventory: We want to map compression only, but currently the
+ # knit code hasn't been updated enough to understand that, so we
+ # have a regular 2-list index giving parents and compression
+ # source.
+ index_builder_class(reference_lists=1),
+ # Texts: per file graph, for all fileids - so one reference list
+ # and two elements in the key tuple.
+ index_builder_class(reference_lists=1, key_elements=2),
+ # Signatures: Just blobs to store, no compression, no parents
+ # listing.
+ index_builder_class(reference_lists=0),
+ # CHK based storage - just blobs, no compression or parents.
+ chk_index=chk_index
+ )
+ self._pack_collection = pack_collection
+ # When we make readonly indices, we need this.
+ self.index_class = pack_collection._index_class
+ # where should the new pack be opened
+ self.upload_transport = pack_collection._upload_transport
+ # where are indices written out to
+ self.index_transport = pack_collection._index_transport
+ # where is the pack renamed to when it is finished?
+ self.pack_transport = pack_collection._pack_transport
+ # What file mode to upload the pack and indices with.
+ self._file_mode = file_mode
+ # tracks the content written to the .pack file.
+ self._hash = osutils.md5()
+ # a four-tuple with the length in bytes of the indices, once the pack
+ # is finalised. (rev, inv, text, sigs)
+ self.index_sizes = None
+ # How much data to cache when writing packs. Note that this is not
+ # synchronised with reads, because it's not in the transport layer, so
+ # is not safe unless the client knows it won't be reading from the pack
+ # under creation.
+ self._cache_limit = 0
+ # the temporary pack file name.
+ self.random_name = osutils.rand_chars(20) + upload_suffix
+ # when was this pack started ?
+ self.start_time = time.time()
+ # open an output stream for the data added to the pack.
+ self.write_stream = self.upload_transport.open_write_stream(
+ self.random_name, mode=self._file_mode)
+ if 'pack' in debug.debug_flags:
+ trace.mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
+ time.ctime(), self.upload_transport.base, self.random_name,
+ time.time() - self.start_time)
+ # A list of byte sequences to be written to the new pack, and the
+ # aggregate size of them. Stored as a list rather than separate
+ # variables so that the _write_data closure below can update them.
+ self._buffer = [[], 0]
+ # create a callable for adding data
+ #
+ # robertc says- this is a closure rather than a method on the object
+ # so that the variables are locals, and faster than accessing object
+ # members.
+ def _write_data(bytes, flush=False, _buffer=self._buffer,
+ _write=self.write_stream.write, _update=self._hash.update):
+ _buffer[0].append(bytes)
+ _buffer[1] += len(bytes)
+ # buffer cap
+ if _buffer[1] > self._cache_limit or flush:
+ bytes = ''.join(_buffer[0])
+ _write(bytes)
+ _update(bytes)
+ _buffer[:] = [[], 0]
+ # expose this on self, for the occasion when clients want to add data.
+ self._write_data = _write_data
+ # a pack writer object to serialise pack records.
+ self._writer = pack.ContainerWriter(self._write_data)
+ self._writer.begin()
+ # what state is the pack in? (open, finished, aborted)
+ self._state = 'open'
+ # no name until we finish writing the content
+ self.name = None
+
+ def _check_references(self):
+ """Make sure our external references are present.
+
+ Packs are allowed to have deltas whose base is not in the pack, but it
+ must be present somewhere in this collection. It is not allowed to
+ have deltas based on a fallback repository.
+ (See <https://bugs.launchpad.net/bzr/+bug/288751>)
+ """
+ # Groupcompress packs don't have any external references, arguably CHK
+ # pages have external references, but we cannot 'cheaply' determine
+ # them without actually walking all of the chk pages.
+
+
+class ResumedGCPack(ResumedPack):
+
+ def _check_references(self):
+ """Make sure our external compression parents are present."""
+ # See GCPack._check_references for why this is empty
+
+ def _get_external_refs(self, index):
+ # GC repositories don't have compression parents external to a given
+ # pack file
+ return set()
+
+
+class GCCHKPacker(Packer):
+ """This class understand what it takes to collect a GCCHK repo."""
+
+ def __init__(self, pack_collection, packs, suffix, revision_ids=None,
+ reload_func=None):
+ super(GCCHKPacker, self).__init__(pack_collection, packs, suffix,
+ revision_ids=revision_ids,
+ reload_func=reload_func)
+ self._pack_collection = pack_collection
+ # ATM, We only support this for GCCHK repositories
+ if pack_collection.chk_index is None:
+ raise AssertionError('pack_collection.chk_index should not be None')
+ self._gather_text_refs = False
+ self._chk_id_roots = []
+ self._chk_p_id_roots = []
+ self._text_refs = None
+ # set by .pack() if self.revision_ids is not None
+ self.revision_keys = None
+
+ def _get_progress_stream(self, source_vf, keys, message, pb):
+ def pb_stream():
+ substream = source_vf.get_record_stream(keys, 'groupcompress', True)
+ for idx, record in enumerate(substream):
+ if pb is not None:
+ pb.update(message, idx + 1, len(keys))
+ yield record
+ return pb_stream()
+
+ def _get_filtered_inv_stream(self, source_vf, keys, message, pb=None):
+ """Filter the texts of inventories, to find the chk pages."""
+ total_keys = len(keys)
+ def _filtered_inv_stream():
+ id_roots_set = set()
+ p_id_roots_set = set()
+ stream = source_vf.get_record_stream(keys, 'groupcompress', True)
+ for idx, record in enumerate(stream):
+ # Inventories should always be with revisions; assume success.
+ bytes = record.get_bytes_as('fulltext')
+ chk_inv = inventory.CHKInventory.deserialise(None, bytes,
+ record.key)
+ if pb is not None:
+ pb.update('inv', idx, total_keys)
+ key = chk_inv.id_to_entry.key()
+ if key not in id_roots_set:
+ self._chk_id_roots.append(key)
+ id_roots_set.add(key)
+ p_id_map = chk_inv.parent_id_basename_to_file_id
+ if p_id_map is None:
+ raise AssertionError('Parent id -> file_id map not set')
+ key = p_id_map.key()
+ if key not in p_id_roots_set:
+ p_id_roots_set.add(key)
+ self._chk_p_id_roots.append(key)
+ yield record
+ # We have finished processing all of the inventory records, we
+ # don't need these sets anymore
+ id_roots_set.clear()
+ p_id_roots_set.clear()
+ return _filtered_inv_stream()
+
+ def _get_chk_streams(self, source_vf, keys, pb=None):
+ # We want to stream the keys from 'id_roots', and things they
+ # reference, and then stream things from p_id_roots and things they
+ # reference, and then any remaining keys that we didn't get to.
+
+ # We also group referenced texts together, so if one root references a
+ # text with prefix 'a', and another root references a node with prefix
+ # 'a', we want to yield those nodes before we yield the nodes for 'b'
+ # This keeps 'similar' nodes together.
+
+ # Note: We probably actually want multiple streams here, to help the
+ # client understand that the different levels won't compress well
+ # against each other.
+ # Test the difference between using one Group per level, and
+ # using 1 Group per prefix. (so '' (root) would get a group, then
+ # all the references to search-key 'a' would get a group, etc.)
+ total_keys = len(keys)
+ remaining_keys = set(keys)
+ counter = [0]
+ if self._gather_text_refs:
+ self._text_refs = set()
+ def _get_referenced_stream(root_keys, parse_leaf_nodes=False):
+ cur_keys = root_keys
+ while cur_keys:
+ keys_by_search_prefix = {}
+ remaining_keys.difference_update(cur_keys)
+ next_keys = set()
+ def handle_internal_node(node):
+ for prefix, value in node._items.iteritems():
+ # We don't want to request the same key twice, and we
+ # want to order it by the first time it is seen.
+ # Even further, we don't want to request a key which is
+ # not in this group of pack files (it should be in the
+ # repo, but it doesn't have to be in the group being
+ # packed.)
+ # TODO: consider how to treat externally referenced chk
+ # pages as 'external_references' so that we
+ # always fill them in for stacked branches
+ if value not in next_keys and value in remaining_keys:
+ keys_by_search_prefix.setdefault(prefix,
+ []).append(value)
+ next_keys.add(value)
+ def handle_leaf_node(node):
+ # Store is None, because we know we have a LeafNode, and we
+ # just want its entries
+ for file_id, bytes in node.iteritems(None):
+ self._text_refs.add(chk_map._bytes_to_text_key(bytes))
+ def next_stream():
+ stream = source_vf.get_record_stream(cur_keys,
+ 'as-requested', True)
+ for record in stream:
+ if record.storage_kind == 'absent':
+ # An absent CHK record: we assume that the missing
+ # record is in a different pack - e.g. a page not
+ # altered by the commit we're packing.
+ continue
+ bytes = record.get_bytes_as('fulltext')
+ # We don't care about search_key_func for this code,
+ # because we only care about external references.
+ node = chk_map._deserialise(bytes, record.key,
+ search_key_func=None)
+ common_base = node._search_prefix
+ if isinstance(node, chk_map.InternalNode):
+ handle_internal_node(node)
+ elif parse_leaf_nodes:
+ handle_leaf_node(node)
+ counter[0] += 1
+ if pb is not None:
+ pb.update('chk node', counter[0], total_keys)
+ yield record
+ yield next_stream()
+ # Double check that we won't be emitting any keys twice
+ # If we get rid of the pre-calculation of all keys, we could
+ # turn this around and do
+ # next_keys.difference_update(seen_keys)
+ # However, we also may have references to chk pages in another
+ # pack file during autopack. We filter earlier, so we should no
+ # longer need to do this
+ # next_keys = next_keys.intersection(remaining_keys)
+ cur_keys = []
+ for prefix in sorted(keys_by_search_prefix):
+ cur_keys.extend(keys_by_search_prefix.pop(prefix))
+ for stream in _get_referenced_stream(self._chk_id_roots,
+ self._gather_text_refs):
+ yield stream
+ del self._chk_id_roots
+ # while it isn't really possible for chk_id_roots to not be in the
+ # local group of packs, it is possible that the tree shape has not
+ # changed recently, so we need to filter _chk_p_id_roots by the
+ # available keys
+ chk_p_id_roots = [key for key in self._chk_p_id_roots
+ if key in remaining_keys]
+ del self._chk_p_id_roots
+ for stream in _get_referenced_stream(chk_p_id_roots, False):
+ yield stream
+ if remaining_keys:
+ trace.mutter('There were %d keys in the chk index, %d of which'
+ ' were not referenced', total_keys,
+ len(remaining_keys))
+ if self.revision_ids is None:
+ stream = source_vf.get_record_stream(remaining_keys,
+ 'unordered', True)
+ yield stream
+
+ def _build_vf(self, index_name, parents, delta, for_write=False):
+ """Build a VersionedFiles instance on top of this group of packs."""
+ index_name = index_name + '_index'
+ index_to_pack = {}
+ access = _DirectPackAccess(index_to_pack,
+ reload_func=self._reload_func)
+ if for_write:
+ # Use new_pack
+ if self.new_pack is None:
+ raise AssertionError('No new pack has been set')
+ index = getattr(self.new_pack, index_name)
+ index_to_pack[index] = self.new_pack.access_tuple()
+ index.set_optimize(for_size=True)
+ access.set_writer(self.new_pack._writer, index,
+ self.new_pack.access_tuple())
+ add_callback = index.add_nodes
+ else:
+ indices = []
+ for pack in self.packs:
+ sub_index = getattr(pack, index_name)
+ index_to_pack[sub_index] = pack.access_tuple()
+ indices.append(sub_index)
+ index = _mod_index.CombinedGraphIndex(indices)
+ add_callback = None
+ vf = GroupCompressVersionedFiles(
+ _GCGraphIndex(index,
+ add_callback=add_callback,
+ parents=parents,
+ is_locked=self._pack_collection.repo.is_locked),
+ access=access,
+ delta=delta)
+ return vf
+
+ def _build_vfs(self, index_name, parents, delta):
+ """Build the source and target VersionedFiles."""
+ source_vf = self._build_vf(index_name, parents,
+ delta, for_write=False)
+ target_vf = self._build_vf(index_name, parents,
+ delta, for_write=True)
+ return source_vf, target_vf
+
+ def _copy_stream(self, source_vf, target_vf, keys, message, vf_to_stream,
+ pb_offset):
+ trace.mutter('repacking %d %s', len(keys), message)
+ self.pb.update('repacking %s' % (message,), pb_offset)
+ child_pb = ui.ui_factory.nested_progress_bar()
+ try:
+ stream = vf_to_stream(source_vf, keys, message, child_pb)
+ for _ in target_vf._insert_record_stream(stream,
+ random_id=True,
+ reuse_blocks=False):
+ pass
+ finally:
+ child_pb.finished()
+
+ def _copy_revision_texts(self):
+ source_vf, target_vf = self._build_vfs('revision', True, False)
+ if not self.revision_keys:
+ # We are doing a full fetch, aka 'pack'
+ self.revision_keys = source_vf.keys()
+ self._copy_stream(source_vf, target_vf, self.revision_keys,
+ 'revisions', self._get_progress_stream, 1)
+
+ def _copy_inventory_texts(self):
+ source_vf, target_vf = self._build_vfs('inventory', True, True)
+ # It is not sufficient to just use self.revision_keys, as stacked
+ # repositories can have more inventories than they have revisions.
+ # One alternative would be to do something with
+ # get_parent_map(self.revision_keys), but that shouldn't be any faster
+ # than this.
+ inventory_keys = source_vf.keys()
+ missing_inventories = set(self.revision_keys).difference(inventory_keys)
+ if missing_inventories:
+ # Go back to the original repo, to see if these are really missing
+ # https://bugs.launchpad.net/bzr/+bug/437003
+ # If we are packing a subset of the repo, it is fine to just have
+ # the data in another Pack file, which is not included in this pack
+ # operation.
+ inv_index = self._pack_collection.repo.inventories._index
+ pmap = inv_index.get_parent_map(missing_inventories)
+ really_missing = missing_inventories.difference(pmap)
+ if really_missing:
+ missing_inventories = sorted(really_missing)
+ raise ValueError('We are missing inventories for revisions: %s'
+ % (missing_inventories,))
+ self._copy_stream(source_vf, target_vf, inventory_keys,
+ 'inventories', self._get_filtered_inv_stream, 2)
+
+ def _get_chk_vfs_for_copy(self):
+ return self._build_vfs('chk', False, False)
+
+ def _copy_chk_texts(self):
+ source_vf, target_vf = self._get_chk_vfs_for_copy()
+ # TODO: This is technically spurious... if it is a performance issue,
+ # remove it
+ total_keys = source_vf.keys()
+ trace.mutter('repacking chk: %d id_to_entry roots,'
+ ' %d p_id_map roots, %d total keys',
+ len(self._chk_id_roots), len(self._chk_p_id_roots),
+ len(total_keys))
+ self.pb.update('repacking chk', 3)
+ child_pb = ui.ui_factory.nested_progress_bar()
+ try:
+ for stream in self._get_chk_streams(source_vf, total_keys,
+ pb=child_pb):
+ for _ in target_vf._insert_record_stream(stream,
+ random_id=True,
+ reuse_blocks=False):
+ pass
+ finally:
+ child_pb.finished()
+
+ def _copy_text_texts(self):
+ source_vf, target_vf = self._build_vfs('text', True, True)
+ # XXX: We don't walk the chk map to determine referenced (file_id,
+ # revision_id) keys. We don't do it yet because you really need
+ # to filter out the ones that are present in the parents of the
+ # rev just before the ones you are copying, otherwise the filter
+ # is grabbing too many keys...
+ text_keys = source_vf.keys()
+ self._copy_stream(source_vf, target_vf, text_keys,
+ 'texts', self._get_progress_stream, 4)
+
+ def _copy_signature_texts(self):
+ source_vf, target_vf = self._build_vfs('signature', False, False)
+ signature_keys = source_vf.keys()
+ signature_keys.intersection(self.revision_keys)
+ self._copy_stream(source_vf, target_vf, signature_keys,
+ 'signatures', self._get_progress_stream, 5)
+
+ def _create_pack_from_packs(self):
+ self.pb.update('repacking', 0, 7)
+ self.new_pack = self.open_pack()
+ # Is this necessary for GC ?
+ self.new_pack.set_write_cache_size(1024*1024)
+ self._copy_revision_texts()
+ self._copy_inventory_texts()
+ self._copy_chk_texts()
+ self._copy_text_texts()
+ self._copy_signature_texts()
+ self.new_pack._check_references()
+ if not self._use_pack(self.new_pack):
+ self.new_pack.abort()
+ return None
+ self.new_pack.finish_content()
+ if len(self.packs) == 1:
+ old_pack = self.packs[0]
+ if old_pack.name == self.new_pack._hash.hexdigest():
+ # The single old pack was already optimally packed.
+ trace.mutter('single pack %s was already optimally packed',
+ old_pack.name)
+ self.new_pack.abort()
+ return None
+ self.pb.update('finishing repack', 6, 7)
+ self.new_pack.finish()
+ self._pack_collection.allocate(self.new_pack)
+ return self.new_pack
+
+
+class GCCHKReconcilePacker(GCCHKPacker):
+ """A packer which regenerates indices etc as it copies.
+
+ This is used by ``bzr reconcile`` to cause parent text pointers to be
+ regenerated.
+ """
+
+ def __init__(self, *args, **kwargs):
+ super(GCCHKReconcilePacker, self).__init__(*args, **kwargs)
+ self._data_changed = False
+ self._gather_text_refs = True
+
+ def _copy_inventory_texts(self):
+ source_vf, target_vf = self._build_vfs('inventory', True, True)
+ self._copy_stream(source_vf, target_vf, self.revision_keys,
+ 'inventories', self._get_filtered_inv_stream, 2)
+ if source_vf.keys() != self.revision_keys:
+ self._data_changed = True
+
+ def _copy_text_texts(self):
+ """generate what texts we should have and then copy."""
+ source_vf, target_vf = self._build_vfs('text', True, True)
+ trace.mutter('repacking %d texts', len(self._text_refs))
+ self.pb.update("repacking texts", 4)
+ # we have three major tasks here:
+ # 1) generate the ideal index
+ repo = self._pack_collection.repo
+ # We want the one we just wrote, so base it on self.new_pack
+ revision_vf = self._build_vf('revision', True, False, for_write=True)
+ ancestor_keys = revision_vf.get_parent_map(revision_vf.keys())
+ # Strip keys back into revision_ids.
+ ancestors = dict((k[0], tuple([p[0] for p in parents]))
+ for k, parents in ancestor_keys.iteritems())
+ del ancestor_keys
+ # TODO: _generate_text_key_index should be much cheaper to generate from
+ # a chk repository, rather than the current implementation
+ ideal_index = repo._generate_text_key_index(None, ancestors)
+ file_id_parent_map = source_vf.get_parent_map(self._text_refs)
+ # 2) generate a keys list that contains all the entries that can
+ # be used as-is, with corrected parents.
+ ok_keys = []
+ new_parent_keys = {} # (key, parent_keys)
+ discarded_keys = []
+ NULL_REVISION = _mod_revision.NULL_REVISION
+ for key in self._text_refs:
+ # 0 - index
+ # 1 - key
+ # 2 - value
+ # 3 - refs
+ try:
+ ideal_parents = tuple(ideal_index[key])
+ except KeyError:
+ discarded_keys.append(key)
+ self._data_changed = True
+ else:
+ if ideal_parents == (NULL_REVISION,):
+ ideal_parents = ()
+ source_parents = file_id_parent_map[key]
+ if ideal_parents == source_parents:
+ # no change needed.
+ ok_keys.append(key)
+ else:
+ # We need to change the parent graph, but we don't need to
+ # re-insert the text (since we don't pun the compression
+ # parent with the parents list)
+ self._data_changed = True
+ new_parent_keys[key] = ideal_parents
+ # we're finished with some data.
+ del ideal_index
+ del file_id_parent_map
+ # 3) bulk copy the data, updating records than need it
+ def _update_parents_for_texts():
+ stream = source_vf.get_record_stream(self._text_refs,
+ 'groupcompress', False)
+ for record in stream:
+ if record.key in new_parent_keys:
+ record.parents = new_parent_keys[record.key]
+ yield record
+ target_vf.insert_record_stream(_update_parents_for_texts())
+
+ def _use_pack(self, new_pack):
+ """Override _use_pack to check for reconcile having changed content."""
+ return new_pack.data_inserted() and self._data_changed
+
+
+class GCCHKCanonicalizingPacker(GCCHKPacker):
+ """A packer that ensures inventories have canonical-form CHK maps.
+
+ Ideally this would be part of reconcile, but it's very slow and rarely
+ needed. (It repairs repositories affected by
+ https://bugs.launchpad.net/bzr/+bug/522637).
+ """
+
+ def __init__(self, *args, **kwargs):
+ super(GCCHKCanonicalizingPacker, self).__init__(*args, **kwargs)
+ self._data_changed = False
+
+ def _exhaust_stream(self, source_vf, keys, message, vf_to_stream, pb_offset):
+ """Create and exhaust a stream, but don't insert it.
+
+ This is useful to get the side-effects of generating a stream.
+ """
+ self.pb.update('scanning %s' % (message,), pb_offset)
+ child_pb = ui.ui_factory.nested_progress_bar()
+ try:
+ list(vf_to_stream(source_vf, keys, message, child_pb))
+ finally:
+ child_pb.finished()
+
+ def _copy_inventory_texts(self):
+ source_vf, target_vf = self._build_vfs('inventory', True, True)
+ source_chk_vf, target_chk_vf = self._get_chk_vfs_for_copy()
+ inventory_keys = source_vf.keys()
+ # First, copy the existing CHKs on the assumption that most of them
+ # will be correct. This will save us from having to reinsert (and
+ # recompress) these records later at the cost of perhaps preserving a
+ # few unused CHKs.
+ # (Iterate but don't insert _get_filtered_inv_stream to populate the
+ # variables needed by GCCHKPacker._copy_chk_texts.)
+ self._exhaust_stream(source_vf, inventory_keys, 'inventories',
+ self._get_filtered_inv_stream, 2)
+ GCCHKPacker._copy_chk_texts(self)
+ # Now copy and fix the inventories, and any regenerated CHKs.
+ def chk_canonicalizing_inv_stream(source_vf, keys, message, pb=None):
+ return self._get_filtered_canonicalizing_inv_stream(
+ source_vf, keys, message, pb, source_chk_vf, target_chk_vf)
+ self._copy_stream(source_vf, target_vf, inventory_keys,
+ 'inventories', chk_canonicalizing_inv_stream, 4)
+
+ def _copy_chk_texts(self):
+ # No-op; in this class this happens during _copy_inventory_texts.
+ pass
+
+ def _get_filtered_canonicalizing_inv_stream(self, source_vf, keys, message,
+ pb=None, source_chk_vf=None, target_chk_vf=None):
+ """Filter the texts of inventories, regenerating CHKs to make sure they
+ are canonical.
+ """
+ total_keys = len(keys)
+ target_chk_vf = versionedfile.NoDupeAddLinesDecorator(target_chk_vf)
+ def _filtered_inv_stream():
+ stream = source_vf.get_record_stream(keys, 'groupcompress', True)
+ search_key_name = None
+ for idx, record in enumerate(stream):
+ # Inventories should always be with revisions; assume success.
+ bytes = record.get_bytes_as('fulltext')
+ chk_inv = inventory.CHKInventory.deserialise(
+ source_chk_vf, bytes, record.key)
+ if pb is not None:
+ pb.update('inv', idx, total_keys)
+ chk_inv.id_to_entry._ensure_root()
+ if search_key_name is None:
+ # Find the name corresponding to the search_key_func
+ search_key_reg = chk_map.search_key_registry
+ for search_key_name, func in search_key_reg.iteritems():
+ if func == chk_inv.id_to_entry._search_key_func:
+ break
+ canonical_inv = inventory.CHKInventory.from_inventory(
+ target_chk_vf, chk_inv,
+ maximum_size=chk_inv.id_to_entry._root_node._maximum_size,
+ search_key_name=search_key_name)
+ if chk_inv.id_to_entry.key() != canonical_inv.id_to_entry.key():
+ trace.mutter(
+ 'Non-canonical CHK map for id_to_entry of inv: %s '
+ '(root is %s, should be %s)' % (chk_inv.revision_id,
+ chk_inv.id_to_entry.key()[0],
+ canonical_inv.id_to_entry.key()[0]))
+ self._data_changed = True
+ p_id_map = chk_inv.parent_id_basename_to_file_id
+ p_id_map._ensure_root()
+ canon_p_id_map = canonical_inv.parent_id_basename_to_file_id
+ if p_id_map.key() != canon_p_id_map.key():
+ trace.mutter(
+ 'Non-canonical CHK map for parent_id_to_basename of '
+ 'inv: %s (root is %s, should be %s)'
+ % (chk_inv.revision_id, p_id_map.key()[0],
+ canon_p_id_map.key()[0]))
+ self._data_changed = True
+ yield versionedfile.ChunkedContentFactory(record.key,
+ record.parents, record.sha1,
+ canonical_inv.to_lines())
+ # We have finished processing all of the inventory records, we
+ # don't need these sets anymore
+ return _filtered_inv_stream()
+
+ def _use_pack(self, new_pack):
+ """Override _use_pack to check for reconcile having changed content."""
+ return new_pack.data_inserted() and self._data_changed
+
+
+class GCRepositoryPackCollection(RepositoryPackCollection):
+
+ pack_factory = GCPack
+ resumed_pack_factory = ResumedGCPack
+ normal_packer_class = GCCHKPacker
+ optimising_packer_class = GCCHKPacker
+
+ def _check_new_inventories(self):
+ """Detect missing inventories or chk root entries for the new revisions
+ in this write group.
+
+ :returns: list of strs, summarising any problems found. If the list is
+ empty no problems were found.
+ """
+ # Ensure that all revisions added in this write group have:
+ # - corresponding inventories,
+ # - chk root entries for those inventories,
+ # - and any present parent inventories have their chk root
+ # entries too.
+ # And all this should be independent of any fallback repository.
+ problems = []
+ key_deps = self.repo.revisions._index._key_dependencies
+ new_revisions_keys = key_deps.get_new_keys()
+ no_fallback_inv_index = self.repo.inventories._index
+ no_fallback_chk_bytes_index = self.repo.chk_bytes._index
+ no_fallback_texts_index = self.repo.texts._index
+ inv_parent_map = no_fallback_inv_index.get_parent_map(
+ new_revisions_keys)
+ # Are any inventories for corresponding to the new revisions missing?
+ corresponding_invs = set(inv_parent_map)
+ missing_corresponding = set(new_revisions_keys)
+ missing_corresponding.difference_update(corresponding_invs)
+ if missing_corresponding:
+ problems.append("inventories missing for revisions %s" %
+ (sorted(missing_corresponding),))
+ return problems
+ # Are any chk root entries missing for any inventories? This includes
+ # any present parent inventories, which may be used when calculating
+ # deltas for streaming.
+ all_inv_keys = set(corresponding_invs)
+ for parent_inv_keys in inv_parent_map.itervalues():
+ all_inv_keys.update(parent_inv_keys)
+ # Filter out ghost parents.
+ all_inv_keys.intersection_update(
+ no_fallback_inv_index.get_parent_map(all_inv_keys))
+ parent_invs_only_keys = all_inv_keys.symmetric_difference(
+ corresponding_invs)
+ all_missing = set()
+ inv_ids = [key[-1] for key in all_inv_keys]
+ parent_invs_only_ids = [key[-1] for key in parent_invs_only_keys]
+ root_key_info = _build_interesting_key_sets(
+ self.repo, inv_ids, parent_invs_only_ids)
+ expected_chk_roots = root_key_info.all_keys()
+ present_chk_roots = no_fallback_chk_bytes_index.get_parent_map(
+ expected_chk_roots)
+ missing_chk_roots = expected_chk_roots.difference(present_chk_roots)
+ if missing_chk_roots:
+ problems.append("missing referenced chk root keys: %s"
+ % (sorted(missing_chk_roots),))
+ # Don't bother checking any further.
+ return problems
+ # Find all interesting chk_bytes records, and make sure they are
+ # present, as well as the text keys they reference.
+ chk_bytes_no_fallbacks = self.repo.chk_bytes.without_fallbacks()
+ chk_bytes_no_fallbacks._search_key_func = \
+ self.repo.chk_bytes._search_key_func
+ chk_diff = chk_map.iter_interesting_nodes(
+ chk_bytes_no_fallbacks, root_key_info.interesting_root_keys,
+ root_key_info.uninteresting_root_keys)
+ text_keys = set()
+ try:
+ for record in _filter_text_keys(chk_diff, text_keys,
+ chk_map._bytes_to_text_key):
+ pass
+ except errors.NoSuchRevision, e:
+ # XXX: It would be nice if we could give a more precise error here.
+ problems.append("missing chk node(s) for id_to_entry maps")
+ chk_diff = chk_map.iter_interesting_nodes(
+ chk_bytes_no_fallbacks, root_key_info.interesting_pid_root_keys,
+ root_key_info.uninteresting_pid_root_keys)
+ try:
+ for interesting_rec, interesting_map in chk_diff:
+ pass
+ except errors.NoSuchRevision, e:
+ problems.append(
+ "missing chk node(s) for parent_id_basename_to_file_id maps")
+ present_text_keys = no_fallback_texts_index.get_parent_map(text_keys)
+ missing_text_keys = text_keys.difference(present_text_keys)
+ if missing_text_keys:
+ problems.append("missing text keys: %r"
+ % (sorted(missing_text_keys),))
+ return problems
+
+
+class CHKInventoryRepository(PackRepository):
+ """subclass of PackRepository that uses CHK based inventories."""
+
+ def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
+ _serializer):
+ """Overridden to change pack collection class."""
+ super(CHKInventoryRepository, self).__init__(_format, a_bzrdir,
+ control_files, _commit_builder_class, _serializer)
+ index_transport = self._transport.clone('indices')
+ self._pack_collection = GCRepositoryPackCollection(self,
+ self._transport, index_transport,
+ self._transport.clone('upload'),
+ self._transport.clone('packs'),
+ _format.index_builder_class,
+ _format.index_class,
+ use_chk_index=self._format.supports_chks,
+ )
+ self.inventories = GroupCompressVersionedFiles(
+ _GCGraphIndex(self._pack_collection.inventory_index.combined_index,
+ add_callback=self._pack_collection.inventory_index.add_callback,
+ parents=True, is_locked=self.is_locked,
+ inconsistency_fatal=False),
+ access=self._pack_collection.inventory_index.data_access)
+ self.revisions = GroupCompressVersionedFiles(
+ _GCGraphIndex(self._pack_collection.revision_index.combined_index,
+ add_callback=self._pack_collection.revision_index.add_callback,
+ parents=True, is_locked=self.is_locked,
+ track_external_parent_refs=True, track_new_keys=True),
+ access=self._pack_collection.revision_index.data_access,
+ delta=False)
+ self.signatures = GroupCompressVersionedFiles(
+ _GCGraphIndex(self._pack_collection.signature_index.combined_index,
+ add_callback=self._pack_collection.signature_index.add_callback,
+ parents=False, is_locked=self.is_locked,
+ inconsistency_fatal=False),
+ access=self._pack_collection.signature_index.data_access,
+ delta=False)
+ self.texts = GroupCompressVersionedFiles(
+ _GCGraphIndex(self._pack_collection.text_index.combined_index,
+ add_callback=self._pack_collection.text_index.add_callback,
+ parents=True, is_locked=self.is_locked,
+ inconsistency_fatal=False),
+ access=self._pack_collection.text_index.data_access)
+ # No parents, individual CHK pages don't have specific ancestry
+ self.chk_bytes = GroupCompressVersionedFiles(
+ _GCGraphIndex(self._pack_collection.chk_index.combined_index,
+ add_callback=self._pack_collection.chk_index.add_callback,
+ parents=False, is_locked=self.is_locked,
+ inconsistency_fatal=False),
+ access=self._pack_collection.chk_index.data_access)
+ search_key_name = self._format._serializer.search_key_name
+ search_key_func = chk_map.search_key_registry.get(search_key_name)
+ self.chk_bytes._search_key_func = search_key_func
+ # True when the repository object is 'write locked' (as opposed to the
+ # physical lock only taken out around changes to the pack-names list.)
+ # Another way to represent this would be a decorator around the control
+ # files object that presents logical locks as physical ones - if this
+ # gets ugly consider that alternative design. RBC 20071011
+ self._write_lock_count = 0
+ self._transaction = None
+ # for tests
+ self._reconcile_does_inventory_gc = True
+ self._reconcile_fixes_text_parents = True
+ self._reconcile_backsup_inventory = False
+
+ def _add_inventory_checked(self, revision_id, inv, parents):
+ """Add inv to the repository after checking the inputs.
+
+ This function can be overridden to allow different inventory styles.
+
+ :seealso: add_inventory, for the contract.
+ """
+ # make inventory
+ serializer = self._format._serializer
+ result = inventory.CHKInventory.from_inventory(self.chk_bytes, inv,
+ maximum_size=serializer.maximum_size,
+ search_key_name=serializer.search_key_name)
+ inv_lines = result.to_lines()
+ return self._inventory_add_lines(revision_id, parents,
+ inv_lines, check_content=False)
+
+ def _create_inv_from_null(self, delta, revision_id):
+ """This will mutate new_inv directly.
+
+ This is a simplified form of create_by_apply_delta which knows that all
+ the old values must be None, so everything is a create.
+ """
+ serializer = self._format._serializer
+ new_inv = inventory.CHKInventory(serializer.search_key_name)
+ new_inv.revision_id = revision_id
+ entry_to_bytes = new_inv._entry_to_bytes
+ id_to_entry_dict = {}
+ parent_id_basename_dict = {}
+ for old_path, new_path, file_id, entry in delta:
+ if old_path is not None:
+ raise ValueError('Invalid delta, somebody tried to delete %r'
+ ' from the NULL_REVISION'
+ % ((old_path, file_id),))
+ if new_path is None:
+ raise ValueError('Invalid delta, delta from NULL_REVISION has'
+ ' no new_path %r' % (file_id,))
+ if new_path == '':
+ new_inv.root_id = file_id
+ parent_id_basename_key = StaticTuple('', '').intern()
+ else:
+ utf8_entry_name = entry.name.encode('utf-8')
+ parent_id_basename_key = StaticTuple(entry.parent_id,
+ utf8_entry_name).intern()
+ new_value = entry_to_bytes(entry)
+ # Populate Caches?
+ # new_inv._path_to_fileid_cache[new_path] = file_id
+ key = StaticTuple(file_id).intern()
+ id_to_entry_dict[key] = new_value
+ parent_id_basename_dict[parent_id_basename_key] = file_id
+
+ new_inv._populate_from_dicts(self.chk_bytes, id_to_entry_dict,
+ parent_id_basename_dict, maximum_size=serializer.maximum_size)
+ return new_inv
+
+ def add_inventory_by_delta(self, basis_revision_id, delta, new_revision_id,
+ parents, basis_inv=None, propagate_caches=False):
+ """Add a new inventory expressed as a delta against another revision.
+
+ :param basis_revision_id: The inventory id the delta was created
+ against.
+ :param delta: The inventory delta (see Inventory.apply_delta for
+ details).
+ :param new_revision_id: The revision id that the inventory is being
+ added for.
+ :param parents: The revision ids of the parents that revision_id is
+ known to have and are in the repository already. These are supplied
+ for repositories that depend on the inventory graph for revision
+ graph access, as well as for those that pun ancestry with delta
+ compression.
+ :param basis_inv: The basis inventory if it is already known,
+ otherwise None.
+ :param propagate_caches: If True, the caches for this inventory are
+ copied to and updated for the result if possible.
+
+ :returns: (validator, new_inv)
+ The validator(which is a sha1 digest, though what is sha'd is
+ repository format specific) of the serialized inventory, and the
+ resulting inventory.
+ """
+ if not self.is_in_write_group():
+ raise AssertionError("%r not in write group" % (self,))
+ _mod_revision.check_not_reserved_id(new_revision_id)
+ basis_tree = None
+ if basis_inv is None:
+ if basis_revision_id == _mod_revision.NULL_REVISION:
+ new_inv = self._create_inv_from_null(delta, new_revision_id)
+ if new_inv.root_id is None:
+ raise errors.RootMissing()
+ inv_lines = new_inv.to_lines()
+ return self._inventory_add_lines(new_revision_id, parents,
+ inv_lines, check_content=False), new_inv
+ else:
+ basis_tree = self.revision_tree(basis_revision_id)
+ basis_tree.lock_read()
+ basis_inv = basis_tree.root_inventory
+ try:
+ result = basis_inv.create_by_apply_delta(delta, new_revision_id,
+ propagate_caches=propagate_caches)
+ inv_lines = result.to_lines()
+ return self._inventory_add_lines(new_revision_id, parents,
+ inv_lines, check_content=False), result
+ finally:
+ if basis_tree is not None:
+ basis_tree.unlock()
+
+ def _deserialise_inventory(self, revision_id, bytes):
+ return inventory.CHKInventory.deserialise(self.chk_bytes, bytes,
+ (revision_id,))
+
+ def _iter_inventories(self, revision_ids, ordering):
+ """Iterate over many inventory objects."""
+ if ordering is None:
+ ordering = 'unordered'
+ keys = [(revision_id,) for revision_id in revision_ids]
+ stream = self.inventories.get_record_stream(keys, ordering, True)
+ texts = {}
+ for record in stream:
+ if record.storage_kind != 'absent':
+ texts[record.key] = record.get_bytes_as('fulltext')
+ else:
+ texts[record.key] = None
+ for key in keys:
+ bytes = texts[key]
+ if bytes is None:
+ yield (None, key[-1])
+ else:
+ yield (inventory.CHKInventory.deserialise(
+ self.chk_bytes, bytes, key), key[-1])
+
+ def _get_inventory_xml(self, revision_id):
+ """Get serialized inventory as a string."""
+ # Without a native 'xml' inventory, this method doesn't make sense.
+ # However older working trees, and older bundles want it - so we supply
+ # it allowing _get_inventory_xml to work. Bundles currently use the
+ # serializer directly; this also isn't ideal, but there isn't an xml
+ # iteration interface offered at all for repositories.
+ return self._serializer.write_inventory_to_string(
+ self.get_inventory(revision_id))
+
+ def _find_present_inventory_keys(self, revision_keys):
+ parent_map = self.inventories.get_parent_map(revision_keys)
+ present_inventory_keys = set(k for k in parent_map)
+ return present_inventory_keys
+
+ def fileids_altered_by_revision_ids(self, revision_ids, _inv_weave=None):
+ """Find the file ids and versions affected by revisions.
+
+ :param revisions: an iterable containing revision ids.
+ :param _inv_weave: The inventory weave from this repository or None.
+ If None, the inventory weave will be opened automatically.
+ :return: a dictionary mapping altered file-ids to an iterable of
+ revision_ids. Each altered file-ids has the exact revision_ids that
+ altered it listed explicitly.
+ """
+ rich_root = self.supports_rich_root()
+ bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
+ file_id_revisions = {}
+ pb = ui.ui_factory.nested_progress_bar()
+ try:
+ revision_keys = [(r,) for r in revision_ids]
+ parent_keys = self._find_parent_keys_of_revisions(revision_keys)
+ # TODO: instead of using _find_present_inventory_keys, change the
+ # code paths to allow missing inventories to be tolerated.
+ # However, we only want to tolerate missing parent
+ # inventories, not missing inventories for revision_ids
+ present_parent_inv_keys = self._find_present_inventory_keys(
+ parent_keys)
+ present_parent_inv_ids = set(
+ [k[-1] for k in present_parent_inv_keys])
+ inventories_to_read = set(revision_ids)
+ inventories_to_read.update(present_parent_inv_ids)
+ root_key_info = _build_interesting_key_sets(
+ self, inventories_to_read, present_parent_inv_ids)
+ interesting_root_keys = root_key_info.interesting_root_keys
+ uninteresting_root_keys = root_key_info.uninteresting_root_keys
+ chk_bytes = self.chk_bytes
+ for record, items in chk_map.iter_interesting_nodes(chk_bytes,
+ interesting_root_keys, uninteresting_root_keys,
+ pb=pb):
+ for name, bytes in items:
+ (name_utf8, file_id, revision_id) = bytes_to_info(bytes)
+ # TODO: consider interning file_id, revision_id here, or
+ # pushing that intern() into bytes_to_info()
+ # TODO: rich_root should always be True here, for all
+ # repositories that support chk_bytes
+ if not rich_root and name_utf8 == '':
+ continue
+ try:
+ file_id_revisions[file_id].add(revision_id)
+ except KeyError:
+ file_id_revisions[file_id] = set([revision_id])
+ finally:
+ pb.finished()
+ return file_id_revisions
+
+ def find_text_key_references(self):
+ """Find the text key references within the repository.
+
+ :return: A dictionary mapping text keys ((fileid, revision_id) tuples)
+ to whether they were referred to by the inventory of the
+ revision_id that they contain. The inventory texts from all present
+ revision ids are assessed to generate this report.
+ """
+ # XXX: Slow version but correct: rewrite as a series of delta
+ # examinations/direct tree traversal. Note that that will require care
+ # as a common node is reachable both from the inventory that added it,
+ # and others afterwards.
+ revision_keys = self.revisions.keys()
+ result = {}
+ rich_roots = self.supports_rich_root()
+ pb = ui.ui_factory.nested_progress_bar()
+ try:
+ all_revs = self.all_revision_ids()
+ total = len(all_revs)
+ for pos, inv in enumerate(self.iter_inventories(all_revs)):
+ pb.update("Finding text references", pos, total)
+ for _, entry in inv.iter_entries():
+ if not rich_roots and entry.file_id == inv.root_id:
+ continue
+ key = (entry.file_id, entry.revision)
+ result.setdefault(key, False)
+ if entry.revision == inv.revision_id:
+ result[key] = True
+ return result
+ finally:
+ pb.finished()
+
+ @needs_write_lock
+ def reconcile_canonicalize_chks(self):
+ """Reconcile this repository to make sure all CHKs are in canonical
+ form.
+ """
+ from bzrlib.reconcile import PackReconciler
+ reconciler = PackReconciler(self, thorough=True, canonicalize_chks=True)
+ reconciler.reconcile()
+ return reconciler
+
+ def _reconcile_pack(self, collection, packs, extension, revs, pb):
+ packer = GCCHKReconcilePacker(collection, packs, extension)
+ return packer.pack(pb)
+
+ def _canonicalize_chks_pack(self, collection, packs, extension, revs, pb):
+ packer = GCCHKCanonicalizingPacker(collection, packs, extension, revs)
+ return packer.pack(pb)
+
+ def _get_source(self, to_format):
+ """Return a source for streaming from this repository."""
+ if self._format._serializer == to_format._serializer:
+ # We must be exactly the same format, otherwise stuff like the chk
+ # page layout might be different.
+ # Actually, this test is just slightly looser than exact so that
+ # CHK2 <-> 2a transfers will work.
+ return GroupCHKStreamSource(self, to_format)
+ return super(CHKInventoryRepository, self)._get_source(to_format)
+
+ def _find_inconsistent_revision_parents(self, revisions_iterator=None):
+ """Find revisions with different parent lists in the revision object
+ and in the index graph.
+
+ :param revisions_iterator: None, or an iterator of (revid,
+ Revision-or-None). This iterator controls the revisions checked.
+ :returns: an iterator yielding tuples of (revison-id, parents-in-index,
+ parents-in-revision).
+ """
+ if not self.is_locked():
+ raise AssertionError()
+ vf = self.revisions
+ if revisions_iterator is None:
+ revisions_iterator = self._iter_revisions(None)
+ for revid, revision in revisions_iterator:
+ if revision is None:
+ pass
+ parent_map = vf.get_parent_map([(revid,)])
+ parents_according_to_index = tuple(parent[-1] for parent in
+ parent_map[(revid,)])
+ parents_according_to_revision = tuple(revision.parent_ids)
+ if parents_according_to_index != parents_according_to_revision:
+ yield (revid, parents_according_to_index,
+ parents_according_to_revision)
+
+ def _check_for_inconsistent_revision_parents(self):
+ inconsistencies = list(self._find_inconsistent_revision_parents())
+ if inconsistencies:
+ raise errors.BzrCheckError(
+ "Revision index has inconsistent parents.")
+
+
+class GroupCHKStreamSource(StreamSource):
+ """Used when both the source and target repo are GroupCHK repos."""
+
+ def __init__(self, from_repository, to_format):
+ """Create a StreamSource streaming from from_repository."""
+ super(GroupCHKStreamSource, self).__init__(from_repository, to_format)
+ self._revision_keys = None
+ self._text_keys = None
+ self._text_fetch_order = 'groupcompress'
+ self._chk_id_roots = None
+ self._chk_p_id_roots = None
+
+ def _get_inventory_stream(self, inventory_keys, allow_absent=False):
+ """Get a stream of inventory texts.
+
+ When this function returns, self._chk_id_roots and self._chk_p_id_roots
+ should be populated.
+ """
+ self._chk_id_roots = []
+ self._chk_p_id_roots = []
+ def _filtered_inv_stream():
+ id_roots_set = set()
+ p_id_roots_set = set()
+ source_vf = self.from_repository.inventories
+ stream = source_vf.get_record_stream(inventory_keys,
+ 'groupcompress', True)
+ for record in stream:
+ if record.storage_kind == 'absent':
+ if allow_absent:
+ continue
+ else:
+ raise errors.NoSuchRevision(self, record.key)
+ bytes = record.get_bytes_as('fulltext')
+ chk_inv = inventory.CHKInventory.deserialise(None, bytes,
+ record.key)
+ key = chk_inv.id_to_entry.key()
+ if key not in id_roots_set:
+ self._chk_id_roots.append(key)
+ id_roots_set.add(key)
+ p_id_map = chk_inv.parent_id_basename_to_file_id
+ if p_id_map is None:
+ raise AssertionError('Parent id -> file_id map not set')
+ key = p_id_map.key()
+ if key not in p_id_roots_set:
+ p_id_roots_set.add(key)
+ self._chk_p_id_roots.append(key)
+ yield record
+ # We have finished processing all of the inventory records, we
+ # don't need these sets anymore
+ id_roots_set.clear()
+ p_id_roots_set.clear()
+ return ('inventories', _filtered_inv_stream())
+
+ def _get_filtered_chk_streams(self, excluded_revision_keys):
+ self._text_keys = set()
+ excluded_revision_keys.discard(_mod_revision.NULL_REVISION)
+ if not excluded_revision_keys:
+ uninteresting_root_keys = set()
+ uninteresting_pid_root_keys = set()
+ else:
+ # filter out any excluded revisions whose inventories are not
+ # actually present
+ # TODO: Update Repository.iter_inventories() to add
+ # ignore_missing=True
+ present_keys = self.from_repository._find_present_inventory_keys(
+ excluded_revision_keys)
+ present_ids = [k[-1] for k in present_keys]
+ uninteresting_root_keys = set()
+ uninteresting_pid_root_keys = set()
+ for inv in self.from_repository.iter_inventories(present_ids):
+ uninteresting_root_keys.add(inv.id_to_entry.key())
+ uninteresting_pid_root_keys.add(
+ inv.parent_id_basename_to_file_id.key())
+ chk_bytes = self.from_repository.chk_bytes
+ def _filter_id_to_entry():
+ interesting_nodes = chk_map.iter_interesting_nodes(chk_bytes,
+ self._chk_id_roots, uninteresting_root_keys)
+ for record in _filter_text_keys(interesting_nodes, self._text_keys,
+ chk_map._bytes_to_text_key):
+ if record is not None:
+ yield record
+ # Consumed
+ self._chk_id_roots = None
+ yield 'chk_bytes', _filter_id_to_entry()
+ def _get_parent_id_basename_to_file_id_pages():
+ for record, items in chk_map.iter_interesting_nodes(chk_bytes,
+ self._chk_p_id_roots, uninteresting_pid_root_keys):
+ if record is not None:
+ yield record
+ # Consumed
+ self._chk_p_id_roots = None
+ yield 'chk_bytes', _get_parent_id_basename_to_file_id_pages()
+
+ def _get_text_stream(self):
+ # Note: We know we don't have to handle adding root keys, because both
+ # the source and target are the identical network name.
+ text_stream = self.from_repository.texts.get_record_stream(
+ self._text_keys, self._text_fetch_order, False)
+ return ('texts', text_stream)
+
+ def get_stream(self, search):
+ def wrap_and_count(pb, rc, stream):
+ """Yield records from stream while showing progress."""
+ count = 0
+ for record in stream:
+ if count == rc.STEP:
+ rc.increment(count)
+ pb.update('Estimate', rc.current, rc.max)
+ count = 0
+ count += 1
+ yield record
+
+ revision_ids = search.get_keys()
+ pb = ui.ui_factory.nested_progress_bar()
+ rc = self._record_counter
+ self._record_counter.setup(len(revision_ids))
+ for stream_info in self._fetch_revision_texts(revision_ids):
+ yield (stream_info[0],
+ wrap_and_count(pb, rc, stream_info[1]))
+ self._revision_keys = [(rev_id,) for rev_id in revision_ids]
+ # TODO: The keys to exclude might be part of the search recipe
+ # For now, exclude all parents that are at the edge of ancestry, for
+ # which we have inventories
+ from_repo = self.from_repository
+ parent_keys = from_repo._find_parent_keys_of_revisions(
+ self._revision_keys)
+ self.from_repository.revisions.clear_cache()
+ self.from_repository.signatures.clear_cache()
+ # Clear the repo's get_parent_map cache too.
+ self.from_repository._unstacked_provider.disable_cache()
+ self.from_repository._unstacked_provider.enable_cache()
+ s = self._get_inventory_stream(self._revision_keys)
+ yield (s[0], wrap_and_count(pb, rc, s[1]))
+ self.from_repository.inventories.clear_cache()
+ for stream_info in self._get_filtered_chk_streams(parent_keys):
+ yield (stream_info[0], wrap_and_count(pb, rc, stream_info[1]))
+ self.from_repository.chk_bytes.clear_cache()
+ s = self._get_text_stream()
+ yield (s[0], wrap_and_count(pb, rc, s[1]))
+ self.from_repository.texts.clear_cache()
+ pb.update('Done', rc.max, rc.max)
+ pb.finished()
+
+ def get_stream_for_missing_keys(self, missing_keys):
+ # missing keys can only occur when we are byte copying and not
+ # translating (because translation means we don't send
+ # unreconstructable deltas ever).
+ missing_inventory_keys = set()
+ for key in missing_keys:
+ if key[0] != 'inventories':
+ raise AssertionError('The only missing keys we should'
+ ' be filling in are inventory keys, not %s'
+ % (key[0],))
+ missing_inventory_keys.add(key[1:])
+ if self._chk_id_roots or self._chk_p_id_roots:
+ raise AssertionError('Cannot call get_stream_for_missing_keys'
+ ' until all of get_stream() has been consumed.')
+ # Yield the inventory stream, so we can find the chk stream
+ # Some of the missing_keys will be missing because they are ghosts.
+ # As such, we can ignore them. The Sink is required to verify there are
+ # no unavailable texts when the ghost inventories are not filled in.
+ yield self._get_inventory_stream(missing_inventory_keys,
+ allow_absent=True)
+ # We use the empty set for excluded_revision_keys, to make it clear
+ # that we want to transmit all referenced chk pages.
+ for stream_info in self._get_filtered_chk_streams(set()):
+ yield stream_info
+
+
+class _InterestingKeyInfo(object):
+ def __init__(self):
+ self.interesting_root_keys = set()
+ self.interesting_pid_root_keys = set()
+ self.uninteresting_root_keys = set()
+ self.uninteresting_pid_root_keys = set()
+
+ def all_interesting(self):
+ return self.interesting_root_keys.union(self.interesting_pid_root_keys)
+
+ def all_uninteresting(self):
+ return self.uninteresting_root_keys.union(
+ self.uninteresting_pid_root_keys)
+
+ def all_keys(self):
+ return self.all_interesting().union(self.all_uninteresting())
+
+
+def _build_interesting_key_sets(repo, inventory_ids, parent_only_inv_ids):
+ result = _InterestingKeyInfo()
+ for inv in repo.iter_inventories(inventory_ids, 'unordered'):
+ root_key = inv.id_to_entry.key()
+ pid_root_key = inv.parent_id_basename_to_file_id.key()
+ if inv.revision_id in parent_only_inv_ids:
+ result.uninteresting_root_keys.add(root_key)
+ result.uninteresting_pid_root_keys.add(pid_root_key)
+ else:
+ result.interesting_root_keys.add(root_key)
+ result.interesting_pid_root_keys.add(pid_root_key)
+ return result
+
+
+def _filter_text_keys(interesting_nodes_iterable, text_keys, bytes_to_text_key):
+ """Iterate the result of iter_interesting_nodes, yielding the records
+ and adding to text_keys.
+ """
+ text_keys_update = text_keys.update
+ for record, items in interesting_nodes_iterable:
+ text_keys_update([bytes_to_text_key(b) for n,b in items])
+ yield record
+
+
+class RepositoryFormat2a(RepositoryFormatPack):
+ """A CHK repository that uses the bencode revision serializer."""
+
+ repository_class = CHKInventoryRepository
+ supports_external_lookups = True
+ supports_chks = True
+ _commit_builder_class = PackRootCommitBuilder
+ rich_root_data = True
+ _serializer = chk_serializer.chk_bencode_serializer
+ _commit_inv_deltas = True
+ # What index classes to use
+ index_builder_class = BTreeBuilder
+ index_class = BTreeGraphIndex
+ # Note: We cannot unpack a delta that references a text we haven't
+ # seen yet. There are 2 options, work in fulltexts, or require
+ # topological sorting. Using fulltexts is more optimal for local
+ # operations, because the source can be smart about extracting
+ # multiple in-a-row (and sharing strings). Topological is better
+ # for remote, because we access less data.
+ _fetch_order = 'unordered'
+ _fetch_uses_deltas = False # essentially ignored by the groupcompress code.
+ fast_deltas = True
+ pack_compresses = True
+
+ def _get_matching_bzrdir(self):
+ return controldir.format_registry.make_bzrdir('2a')
+
+ def _ignore_setting_bzrdir(self, format):
+ pass
+
+ _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
+
+ @classmethod
+ def get_format_string(cls):
+ return ('Bazaar repository format 2a (needs bzr 1.16 or later)\n')
+
+ def get_format_description(self):
+ """See RepositoryFormat.get_format_description()."""
+ return ("Repository format 2a - rich roots, group compression"
+ " and chk inventories")
+
+
+class RepositoryFormat2aSubtree(RepositoryFormat2a):
+ """A 2a repository format that supports nested trees.
+
+ """
+
+ def _get_matching_bzrdir(self):
+ return controldir.format_registry.make_bzrdir('development-subtree')
+
+ def _ignore_setting_bzrdir(self, format):
+ pass
+
+ _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
+
+ @classmethod
+ def get_format_string(cls):
+ return ('Bazaar development format 8\n')
+
+ def get_format_description(self):
+ """See RepositoryFormat.get_format_description()."""
+ return ("Development repository format 8 - nested trees, "
+ "group compression and chk inventories")
+
+ experimental = True
+ supports_tree_reference = True
diff --git a/bzrlib/repofmt/knitpack_repo.py b/bzrlib/repofmt/knitpack_repo.py
new file mode 100644
index 0000000..15a819d
--- /dev/null
+++ b/bzrlib/repofmt/knitpack_repo.py
@@ -0,0 +1,1156 @@
+# Copyright (C) 2007-2011 Canonical Ltd
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+"""Knit-based pack repository formats."""
+
+from __future__ import absolute_import
+
+from bzrlib.lazy_import import lazy_import
+lazy_import(globals(), """
+from itertools import izip
+import time
+
+from bzrlib import (
+ controldir,
+ debug,
+ errors,
+ knit,
+ osutils,
+ pack,
+ revision as _mod_revision,
+ trace,
+ tsort,
+ ui,
+ xml5,
+ xml6,
+ xml7,
+ )
+from bzrlib.knit import (
+ _KnitGraphIndex,
+ KnitPlainFactory,
+ KnitVersionedFiles,
+ )
+""")
+
+from bzrlib import (
+ btree_index,
+ )
+from bzrlib.index import (
+ CombinedGraphIndex,
+ GraphIndex,
+ GraphIndexPrefixAdapter,
+ InMemoryGraphIndex,
+ )
+from bzrlib.repofmt.knitrepo import (
+ KnitRepository,
+ )
+from bzrlib.repofmt.pack_repo import (
+ _DirectPackAccess,
+ NewPack,
+ RepositoryFormatPack,
+ ResumedPack,
+ Packer,
+ PackCommitBuilder,
+ PackRepository,
+ PackRootCommitBuilder,
+ RepositoryPackCollection,
+ )
+from bzrlib.vf_repository import (
+ StreamSource,
+ )
+
+
+class KnitPackRepository(PackRepository, KnitRepository):
+
+ def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
+ _serializer):
+ PackRepository.__init__(self, _format, a_bzrdir, control_files,
+ _commit_builder_class, _serializer)
+ if self._format.supports_chks:
+ raise AssertionError("chk not supported")
+ index_transport = self._transport.clone('indices')
+ self._pack_collection = KnitRepositoryPackCollection(self,
+ self._transport,
+ index_transport,
+ self._transport.clone('upload'),
+ self._transport.clone('packs'),
+ _format.index_builder_class,
+ _format.index_class,
+ use_chk_index=False,
+ )
+ self.inventories = KnitVersionedFiles(
+ _KnitGraphIndex(self._pack_collection.inventory_index.combined_index,
+ add_callback=self._pack_collection.inventory_index.add_callback,
+ deltas=True, parents=True, is_locked=self.is_locked),
+ data_access=self._pack_collection.inventory_index.data_access,
+ max_delta_chain=200)
+ self.revisions = KnitVersionedFiles(
+ _KnitGraphIndex(self._pack_collection.revision_index.combined_index,
+ add_callback=self._pack_collection.revision_index.add_callback,
+ deltas=False, parents=True, is_locked=self.is_locked,
+ track_external_parent_refs=True),
+ data_access=self._pack_collection.revision_index.data_access,
+ max_delta_chain=0)
+ self.signatures = KnitVersionedFiles(
+ _KnitGraphIndex(self._pack_collection.signature_index.combined_index,
+ add_callback=self._pack_collection.signature_index.add_callback,
+ deltas=False, parents=False, is_locked=self.is_locked),
+ data_access=self._pack_collection.signature_index.data_access,
+ max_delta_chain=0)
+ self.texts = KnitVersionedFiles(
+ _KnitGraphIndex(self._pack_collection.text_index.combined_index,
+ add_callback=self._pack_collection.text_index.add_callback,
+ deltas=True, parents=True, is_locked=self.is_locked),
+ data_access=self._pack_collection.text_index.data_access,
+ max_delta_chain=200)
+ self.chk_bytes = None
+ # True when the repository object is 'write locked' (as opposed to the
+ # physical lock only taken out around changes to the pack-names list.)
+ # Another way to represent this would be a decorator around the control
+ # files object that presents logical locks as physical ones - if this
+ # gets ugly consider that alternative design. RBC 20071011
+ self._write_lock_count = 0
+ self._transaction = None
+ # for tests
+ self._reconcile_does_inventory_gc = True
+ self._reconcile_fixes_text_parents = True
+ self._reconcile_backsup_inventory = False
+
+ def _get_source(self, to_format):
+ if to_format.network_name() == self._format.network_name():
+ return KnitPackStreamSource(self, to_format)
+ return PackRepository._get_source(self, to_format)
+
+ def _reconcile_pack(self, collection, packs, extension, revs, pb):
+ packer = KnitReconcilePacker(collection, packs, extension, revs)
+ return packer.pack(pb)
+
+
+class RepositoryFormatKnitPack1(RepositoryFormatPack):
+ """A no-subtrees parameterized Pack repository.
+
+ This format was introduced in 0.92.
+ """
+
+ repository_class = KnitPackRepository
+ _commit_builder_class = PackCommitBuilder
+ @property
+ def _serializer(self):
+ return xml5.serializer_v5
+ # What index classes to use
+ index_builder_class = InMemoryGraphIndex
+ index_class = GraphIndex
+
+ def _get_matching_bzrdir(self):
+ return controldir.format_registry.make_bzrdir('pack-0.92')
+
+ def _ignore_setting_bzrdir(self, format):
+ pass
+
+ _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
+
+ @classmethod
+ def get_format_string(cls):
+ """See RepositoryFormat.get_format_string()."""
+ return "Bazaar pack repository format 1 (needs bzr 0.92)\n"
+
+ def get_format_description(self):
+ """See RepositoryFormat.get_format_description()."""
+ return "Packs containing knits without subtree support"
+
+
+class RepositoryFormatKnitPack3(RepositoryFormatPack):
+ """A subtrees parameterized Pack repository.
+
+ This repository format uses the xml7 serializer to get:
+ - support for recording full info about the tree root
+ - support for recording tree-references
+
+ This format was introduced in 0.92.
+ """
+
+ repository_class = KnitPackRepository
+ _commit_builder_class = PackRootCommitBuilder
+ rich_root_data = True
+ experimental = True
+ supports_tree_reference = True
+ @property
+ def _serializer(self):
+ return xml7.serializer_v7
+ # What index classes to use
+ index_builder_class = InMemoryGraphIndex
+ index_class = GraphIndex
+
+ def _get_matching_bzrdir(self):
+ return controldir.format_registry.make_bzrdir(
+ 'pack-0.92-subtree')
+
+ def _ignore_setting_bzrdir(self, format):
+ pass
+
+ _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
+
+ @classmethod
+ def get_format_string(cls):
+ """See RepositoryFormat.get_format_string()."""
+ return "Bazaar pack repository format 1 with subtree support (needs bzr 0.92)\n"
+
+ def get_format_description(self):
+ """See RepositoryFormat.get_format_description()."""
+ return "Packs containing knits with subtree support\n"
+
+
+class RepositoryFormatKnitPack4(RepositoryFormatPack):
+ """A rich-root, no subtrees parameterized Pack repository.
+
+ This repository format uses the xml6 serializer to get:
+ - support for recording full info about the tree root
+
+ This format was introduced in 1.0.
+ """
+
+ repository_class = KnitPackRepository
+ _commit_builder_class = PackRootCommitBuilder
+ rich_root_data = True
+ supports_tree_reference = False
+ @property
+ def _serializer(self):
+ return xml6.serializer_v6
+ # What index classes to use
+ index_builder_class = InMemoryGraphIndex
+ index_class = GraphIndex
+
+ def _get_matching_bzrdir(self):
+ return controldir.format_registry.make_bzrdir(
+ 'rich-root-pack')
+
+ def _ignore_setting_bzrdir(self, format):
+ pass
+
+ _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
+
+ @classmethod
+ def get_format_string(cls):
+ """See RepositoryFormat.get_format_string()."""
+ return ("Bazaar pack repository format 1 with rich root"
+ " (needs bzr 1.0)\n")
+
+ def get_format_description(self):
+ """See RepositoryFormat.get_format_description()."""
+ return "Packs containing knits with rich root support\n"
+
+
+class RepositoryFormatKnitPack5(RepositoryFormatPack):
+ """Repository that supports external references to allow stacking.
+
+ New in release 1.6.
+
+ Supports external lookups, which results in non-truncated ghosts after
+ reconcile compared to pack-0.92 formats.
+ """
+
+ repository_class = KnitPackRepository
+ _commit_builder_class = PackCommitBuilder
+ supports_external_lookups = True
+ # What index classes to use
+ index_builder_class = InMemoryGraphIndex
+ index_class = GraphIndex
+
+ @property
+ def _serializer(self):
+ return xml5.serializer_v5
+
+ def _get_matching_bzrdir(self):
+ return controldir.format_registry.make_bzrdir('1.6')
+
+ def _ignore_setting_bzrdir(self, format):
+ pass
+
+ _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
+
+ @classmethod
+ def get_format_string(cls):
+ """See RepositoryFormat.get_format_string()."""
+ return "Bazaar RepositoryFormatKnitPack5 (bzr 1.6)\n"
+
+ def get_format_description(self):
+ """See RepositoryFormat.get_format_description()."""
+ return "Packs 5 (adds stacking support, requires bzr 1.6)"
+
+
+class RepositoryFormatKnitPack5RichRoot(RepositoryFormatPack):
+ """A repository with rich roots and stacking.
+
+ New in release 1.6.1.
+
+ Supports stacking on other repositories, allowing data to be accessed
+ without being stored locally.
+ """
+
+ repository_class = KnitPackRepository
+ _commit_builder_class = PackRootCommitBuilder
+ rich_root_data = True
+ supports_tree_reference = False # no subtrees
+ supports_external_lookups = True
+ # What index classes to use
+ index_builder_class = InMemoryGraphIndex
+ index_class = GraphIndex
+
+ @property
+ def _serializer(self):
+ return xml6.serializer_v6
+
+ def _get_matching_bzrdir(self):
+ return controldir.format_registry.make_bzrdir(
+ '1.6.1-rich-root')
+
+ def _ignore_setting_bzrdir(self, format):
+ pass
+
+ _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
+
+ @classmethod
+ def get_format_string(cls):
+ """See RepositoryFormat.get_format_string()."""
+ return "Bazaar RepositoryFormatKnitPack5RichRoot (bzr 1.6.1)\n"
+
+ def get_format_description(self):
+ return "Packs 5 rich-root (adds stacking support, requires bzr 1.6.1)"
+
+
+class RepositoryFormatKnitPack5RichRootBroken(RepositoryFormatPack):
+ """A repository with rich roots and external references.
+
+ New in release 1.6.
+
+ Supports external lookups, which results in non-truncated ghosts after
+ reconcile compared to pack-0.92 formats.
+
+ This format was deprecated because the serializer it uses accidentally
+ supported subtrees, when the format was not intended to. This meant that
+ someone could accidentally fetch from an incorrect repository.
+ """
+
+ repository_class = KnitPackRepository
+ _commit_builder_class = PackRootCommitBuilder
+ rich_root_data = True
+ supports_tree_reference = False # no subtrees
+
+ supports_external_lookups = True
+ # What index classes to use
+ index_builder_class = InMemoryGraphIndex
+ index_class = GraphIndex
+
+ @property
+ def _serializer(self):
+ return xml7.serializer_v7
+
+ def _get_matching_bzrdir(self):
+ matching = controldir.format_registry.make_bzrdir(
+ '1.6.1-rich-root')
+ matching.repository_format = self
+ return matching
+
+ def _ignore_setting_bzrdir(self, format):
+ pass
+
+ _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
+
+ @classmethod
+ def get_format_string(cls):
+ """See RepositoryFormat.get_format_string()."""
+ return "Bazaar RepositoryFormatKnitPack5RichRoot (bzr 1.6)\n"
+
+ def get_format_description(self):
+ return ("Packs 5 rich-root (adds stacking support, requires bzr 1.6)"
+ " (deprecated)")
+
+ def is_deprecated(self):
+ return True
+
+
+class RepositoryFormatKnitPack6(RepositoryFormatPack):
+ """A repository with stacking and btree indexes,
+ without rich roots or subtrees.
+
+ This is equivalent to pack-1.6 with B+Tree indices.
+ """
+
+ repository_class = KnitPackRepository
+ _commit_builder_class = PackCommitBuilder
+ supports_external_lookups = True
+ # What index classes to use
+ index_builder_class = btree_index.BTreeBuilder
+ index_class = btree_index.BTreeGraphIndex
+
+ @property
+ def _serializer(self):
+ return xml5.serializer_v5
+
+ def _get_matching_bzrdir(self):
+ return controldir.format_registry.make_bzrdir('1.9')
+
+ def _ignore_setting_bzrdir(self, format):
+ pass
+
+ _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
+
+ @classmethod
+ def get_format_string(cls):
+ """See RepositoryFormat.get_format_string()."""
+ return "Bazaar RepositoryFormatKnitPack6 (bzr 1.9)\n"
+
+ def get_format_description(self):
+ """See RepositoryFormat.get_format_description()."""
+ return "Packs 6 (uses btree indexes, requires bzr 1.9)"
+
+
+class RepositoryFormatKnitPack6RichRoot(RepositoryFormatPack):
+ """A repository with rich roots, no subtrees, stacking and btree indexes.
+
+ 1.6-rich-root with B+Tree indices.
+ """
+
+ repository_class = KnitPackRepository
+ _commit_builder_class = PackRootCommitBuilder
+ rich_root_data = True
+ supports_tree_reference = False # no subtrees
+ supports_external_lookups = True
+ # What index classes to use
+ index_builder_class = btree_index.BTreeBuilder
+ index_class = btree_index.BTreeGraphIndex
+
+ @property
+ def _serializer(self):
+ return xml6.serializer_v6
+
+ def _get_matching_bzrdir(self):
+ return controldir.format_registry.make_bzrdir(
+ '1.9-rich-root')
+
+ def _ignore_setting_bzrdir(self, format):
+ pass
+
+ _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
+
+ @classmethod
+ def get_format_string(cls):
+ """See RepositoryFormat.get_format_string()."""
+ return "Bazaar RepositoryFormatKnitPack6RichRoot (bzr 1.9)\n"
+
+ def get_format_description(self):
+ return "Packs 6 rich-root (uses btree indexes, requires bzr 1.9)"
+
+
+class RepositoryFormatPackDevelopment2Subtree(RepositoryFormatPack):
+ """A subtrees development repository.
+
+ This format should be retained in 2.3, to provide an upgrade path from this
+ to RepositoryFormat2aSubtree. It can be removed in later releases.
+
+ 1.6.1-subtree[as it might have been] with B+Tree indices.
+ """
+
+ repository_class = KnitPackRepository
+ _commit_builder_class = PackRootCommitBuilder
+ rich_root_data = True
+ experimental = True
+ supports_tree_reference = True
+ supports_external_lookups = True
+ # What index classes to use
+ index_builder_class = btree_index.BTreeBuilder
+ index_class = btree_index.BTreeGraphIndex
+
+ @property
+ def _serializer(self):
+ return xml7.serializer_v7
+
+ def _get_matching_bzrdir(self):
+ return controldir.format_registry.make_bzrdir(
+ 'development5-subtree')
+
+ def _ignore_setting_bzrdir(self, format):
+ pass
+
+ _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
+
+ @classmethod
+ def get_format_string(cls):
+ """See RepositoryFormat.get_format_string()."""
+ return ("Bazaar development format 2 with subtree support "
+ "(needs bzr.dev from before 1.8)\n")
+
+ def get_format_description(self):
+ """See RepositoryFormat.get_format_description()."""
+ return ("Development repository format, currently the same as "
+ "1.6.1-subtree with B+Tree indices.\n")
+
+
+class KnitPackStreamSource(StreamSource):
+ """A StreamSource used to transfer data between same-format KnitPack repos.
+
+ This source assumes:
+ 1) Same serialization format for all objects
+ 2) Same root information
+ 3) XML format inventories
+ 4) Atomic inserts (so we can stream inventory texts before text
+ content)
+ 5) No chk_bytes
+ """
+
+ def __init__(self, from_repository, to_format):
+ super(KnitPackStreamSource, self).__init__(from_repository, to_format)
+ self._text_keys = None
+ self._text_fetch_order = 'unordered'
+
+ def _get_filtered_inv_stream(self, revision_ids):
+ from_repo = self.from_repository
+ parent_ids = from_repo._find_parent_ids_of_revisions(revision_ids)
+ parent_keys = [(p,) for p in parent_ids]
+ find_text_keys = from_repo._serializer._find_text_key_references
+ parent_text_keys = set(find_text_keys(
+ from_repo._inventory_xml_lines_for_keys(parent_keys)))
+ content_text_keys = set()
+ knit = KnitVersionedFiles(None, None)
+ factory = KnitPlainFactory()
+ def find_text_keys_from_content(record):
+ if record.storage_kind not in ('knit-delta-gz', 'knit-ft-gz'):
+ raise ValueError("Unknown content storage kind for"
+ " inventory text: %s" % (record.storage_kind,))
+ # It's a knit record, it has a _raw_record field (even if it was
+ # reconstituted from a network stream).
+ raw_data = record._raw_record
+ # read the entire thing
+ revision_id = record.key[-1]
+ content, _ = knit._parse_record(revision_id, raw_data)
+ if record.storage_kind == 'knit-delta-gz':
+ line_iterator = factory.get_linedelta_content(content)
+ elif record.storage_kind == 'knit-ft-gz':
+ line_iterator = factory.get_fulltext_content(content)
+ content_text_keys.update(find_text_keys(
+ [(line, revision_id) for line in line_iterator]))
+ revision_keys = [(r,) for r in revision_ids]
+ def _filtered_inv_stream():
+ source_vf = from_repo.inventories
+ stream = source_vf.get_record_stream(revision_keys,
+ 'unordered', False)
+ for record in stream:
+ if record.storage_kind == 'absent':
+ raise errors.NoSuchRevision(from_repo, record.key)
+ find_text_keys_from_content(record)
+ yield record
+ self._text_keys = content_text_keys - parent_text_keys
+ return ('inventories', _filtered_inv_stream())
+
+ def _get_text_stream(self):
+ # Note: We know we don't have to handle adding root keys, because both
+ # the source and target are the identical network name.
+ text_stream = self.from_repository.texts.get_record_stream(
+ self._text_keys, self._text_fetch_order, False)
+ return ('texts', text_stream)
+
+ def get_stream(self, search):
+ revision_ids = search.get_keys()
+ for stream_info in self._fetch_revision_texts(revision_ids):
+ yield stream_info
+ self._revision_keys = [(rev_id,) for rev_id in revision_ids]
+ yield self._get_filtered_inv_stream(revision_ids)
+ yield self._get_text_stream()
+
+
+class KnitPacker(Packer):
+ """Packer that works with knit packs."""
+
+ def __init__(self, pack_collection, packs, suffix, revision_ids=None,
+ reload_func=None):
+ super(KnitPacker, self).__init__(pack_collection, packs, suffix,
+ revision_ids=revision_ids,
+ reload_func=reload_func)
+
+ def _pack_map_and_index_list(self, index_attribute):
+ """Convert a list of packs to an index pack map and index list.
+
+ :param index_attribute: The attribute that the desired index is found
+ on.
+ :return: A tuple (map, list) where map contains the dict from
+ index:pack_tuple, and list contains the indices in the preferred
+ access order.
+ """
+ indices = []
+ pack_map = {}
+ for pack_obj in self.packs:
+ index = getattr(pack_obj, index_attribute)
+ indices.append(index)
+ pack_map[index] = pack_obj
+ return pack_map, indices
+
+ def _index_contents(self, indices, key_filter=None):
+ """Get an iterable of the index contents from a pack_map.
+
+ :param indices: The list of indices to query
+ :param key_filter: An optional filter to limit the keys returned.
+ """
+ all_index = CombinedGraphIndex(indices)
+ if key_filter is None:
+ return all_index.iter_all_entries()
+ else:
+ return all_index.iter_entries(key_filter)
+
+ def _copy_nodes(self, nodes, index_map, writer, write_index,
+ output_lines=None):
+ """Copy knit nodes between packs with no graph references.
+
+ :param output_lines: Output full texts of copied items.
+ """
+ pb = ui.ui_factory.nested_progress_bar()
+ try:
+ return self._do_copy_nodes(nodes, index_map, writer,
+ write_index, pb, output_lines=output_lines)
+ finally:
+ pb.finished()
+
+ def _do_copy_nodes(self, nodes, index_map, writer, write_index, pb,
+ output_lines=None):
+ # for record verification
+ knit = KnitVersionedFiles(None, None)
+ # plan a readv on each source pack:
+ # group by pack
+ nodes = sorted(nodes)
+ # how to map this into knit.py - or knit.py into this?
+ # we don't want the typical knit logic, we want grouping by pack
+ # at this point - perhaps a helper library for the following code
+ # duplication points?
+ request_groups = {}
+ for index, key, value in nodes:
+ if index not in request_groups:
+ request_groups[index] = []
+ request_groups[index].append((key, value))
+ record_index = 0
+ pb.update("Copied record", record_index, len(nodes))
+ for index, items in request_groups.iteritems():
+ pack_readv_requests = []
+ for key, value in items:
+ # ---- KnitGraphIndex.get_position
+ bits = value[1:].split(' ')
+ offset, length = int(bits[0]), int(bits[1])
+ pack_readv_requests.append((offset, length, (key, value[0])))
+ # linear scan up the pack
+ pack_readv_requests.sort()
+ # copy the data
+ pack_obj = index_map[index]
+ transport, path = pack_obj.access_tuple()
+ try:
+ reader = pack.make_readv_reader(transport, path,
+ [offset[0:2] for offset in pack_readv_requests])
+ except errors.NoSuchFile:
+ if self._reload_func is not None:
+ self._reload_func()
+ raise
+ for (names, read_func), (_1, _2, (key, eol_flag)) in \
+ izip(reader.iter_records(), pack_readv_requests):
+ raw_data = read_func(None)
+ # check the header only
+ if output_lines is not None:
+ output_lines(knit._parse_record(key[-1], raw_data)[0])
+ else:
+ df, _ = knit._parse_record_header(key, raw_data)
+ df.close()
+ pos, size = writer.add_bytes_record(raw_data, names)
+ write_index.add_node(key, eol_flag + "%d %d" % (pos, size))
+ pb.update("Copied record", record_index)
+ record_index += 1
+
+ def _copy_nodes_graph(self, index_map, writer, write_index,
+ readv_group_iter, total_items, output_lines=False):
+ """Copy knit nodes between packs.
+
+ :param output_lines: Return lines present in the copied data as
+ an iterator of line,version_id.
+ """
+ pb = ui.ui_factory.nested_progress_bar()
+ try:
+ for result in self._do_copy_nodes_graph(index_map, writer,
+ write_index, output_lines, pb, readv_group_iter, total_items):
+ yield result
+ except Exception:
+ # Python 2.4 does not permit try:finally: in a generator.
+ pb.finished()
+ raise
+ else:
+ pb.finished()
+
+ def _do_copy_nodes_graph(self, index_map, writer, write_index,
+ output_lines, pb, readv_group_iter, total_items):
+ # for record verification
+ knit = KnitVersionedFiles(None, None)
+ # for line extraction when requested (inventories only)
+ if output_lines:
+ factory = KnitPlainFactory()
+ record_index = 0
+ pb.update("Copied record", record_index, total_items)
+ for index, readv_vector, node_vector in readv_group_iter:
+ # copy the data
+ pack_obj = index_map[index]
+ transport, path = pack_obj.access_tuple()
+ try:
+ reader = pack.make_readv_reader(transport, path, readv_vector)
+ except errors.NoSuchFile:
+ if self._reload_func is not None:
+ self._reload_func()
+ raise
+ for (names, read_func), (key, eol_flag, references) in \
+ izip(reader.iter_records(), node_vector):
+ raw_data = read_func(None)
+ if output_lines:
+ # read the entire thing
+ content, _ = knit._parse_record(key[-1], raw_data)
+ if len(references[-1]) == 0:
+ line_iterator = factory.get_fulltext_content(content)
+ else:
+ line_iterator = factory.get_linedelta_content(content)
+ for line in line_iterator:
+ yield line, key
+ else:
+ # check the header only
+ df, _ = knit._parse_record_header(key, raw_data)
+ df.close()
+ pos, size = writer.add_bytes_record(raw_data, names)
+ write_index.add_node(key, eol_flag + "%d %d" % (pos, size), references)
+ pb.update("Copied record", record_index)
+ record_index += 1
+
+ def _process_inventory_lines(self, inv_lines):
+ """Use up the inv_lines generator and setup a text key filter."""
+ repo = self._pack_collection.repo
+ fileid_revisions = repo._find_file_ids_from_xml_inventory_lines(
+ inv_lines, self.revision_keys)
+ text_filter = []
+ for fileid, file_revids in fileid_revisions.iteritems():
+ text_filter.extend([(fileid, file_revid) for file_revid in file_revids])
+ self._text_filter = text_filter
+
+ def _copy_inventory_texts(self):
+ # select inventory keys
+ inv_keys = self._revision_keys # currently the same keyspace, and note that
+ # querying for keys here could introduce a bug where an inventory item
+ # is missed, so do not change it to query separately without cross
+ # checking like the text key check below.
+ inventory_index_map, inventory_indices = self._pack_map_and_index_list(
+ 'inventory_index')
+ inv_nodes = self._index_contents(inventory_indices, inv_keys)
+ # copy inventory keys and adjust values
+ # XXX: Should be a helper function to allow different inv representation
+ # at this point.
+ self.pb.update("Copying inventory texts", 2)
+ total_items, readv_group_iter = self._least_readv_node_readv(inv_nodes)
+ # Only grab the output lines if we will be processing them
+ output_lines = bool(self.revision_ids)
+ inv_lines = self._copy_nodes_graph(inventory_index_map,
+ self.new_pack._writer, self.new_pack.inventory_index,
+ readv_group_iter, total_items, output_lines=output_lines)
+ if self.revision_ids:
+ self._process_inventory_lines(inv_lines)
+ else:
+ # eat the iterator to cause it to execute.
+ list(inv_lines)
+ self._text_filter = None
+ if 'pack' in debug.debug_flags:
+ trace.mutter('%s: create_pack: inventories copied: %s%s %d items t+%6.3fs',
+ time.ctime(), self._pack_collection._upload_transport.base,
+ self.new_pack.random_name,
+ self.new_pack.inventory_index.key_count(),
+ time.time() - self.new_pack.start_time)
+
+ def _update_pack_order(self, entries, index_to_pack_map):
+ """Determine how we want our packs to be ordered.
+
+ This changes the sort order of the self.packs list so that packs unused
+ by 'entries' will be at the end of the list, so that future requests
+ can avoid probing them. Used packs will be at the front of the
+ self.packs list, in the order of their first use in 'entries'.
+
+ :param entries: A list of (index, ...) tuples
+ :param index_to_pack_map: A mapping from index objects to pack objects.
+ """
+ packs = []
+ seen_indexes = set()
+ for entry in entries:
+ index = entry[0]
+ if index not in seen_indexes:
+ packs.append(index_to_pack_map[index])
+ seen_indexes.add(index)
+ if len(packs) == len(self.packs):
+ if 'pack' in debug.debug_flags:
+ trace.mutter('Not changing pack list, all packs used.')
+ return
+ seen_packs = set(packs)
+ for pack in self.packs:
+ if pack not in seen_packs:
+ packs.append(pack)
+ seen_packs.add(pack)
+ if 'pack' in debug.debug_flags:
+ old_names = [p.access_tuple()[1] for p in self.packs]
+ new_names = [p.access_tuple()[1] for p in packs]
+ trace.mutter('Reordering packs\nfrom: %s\n to: %s',
+ old_names, new_names)
+ self.packs = packs
+
+ def _copy_revision_texts(self):
+ # select revisions
+ if self.revision_ids:
+ revision_keys = [(revision_id,) for revision_id in self.revision_ids]
+ else:
+ revision_keys = None
+ # select revision keys
+ revision_index_map, revision_indices = self._pack_map_and_index_list(
+ 'revision_index')
+ revision_nodes = self._index_contents(revision_indices, revision_keys)
+ revision_nodes = list(revision_nodes)
+ self._update_pack_order(revision_nodes, revision_index_map)
+ # copy revision keys and adjust values
+ self.pb.update("Copying revision texts", 1)
+ total_items, readv_group_iter = self._revision_node_readv(revision_nodes)
+ list(self._copy_nodes_graph(revision_index_map, self.new_pack._writer,
+ self.new_pack.revision_index, readv_group_iter, total_items))
+ if 'pack' in debug.debug_flags:
+ trace.mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
+ time.ctime(), self._pack_collection._upload_transport.base,
+ self.new_pack.random_name,
+ self.new_pack.revision_index.key_count(),
+ time.time() - self.new_pack.start_time)
+ self._revision_keys = revision_keys
+
+ def _get_text_nodes(self):
+ text_index_map, text_indices = self._pack_map_and_index_list(
+ 'text_index')
+ return text_index_map, self._index_contents(text_indices,
+ self._text_filter)
+
+ def _copy_text_texts(self):
+ # select text keys
+ text_index_map, text_nodes = self._get_text_nodes()
+ if self._text_filter is not None:
+ # We could return the keys copied as part of the return value from
+ # _copy_nodes_graph but this doesn't work all that well with the
+ # need to get line output too, so we check separately, and as we're
+ # going to buffer everything anyway, we check beforehand, which
+ # saves reading knit data over the wire when we know there are
+ # mising records.
+ text_nodes = set(text_nodes)
+ present_text_keys = set(_node[1] for _node in text_nodes)
+ missing_text_keys = set(self._text_filter) - present_text_keys
+ if missing_text_keys:
+ # TODO: raise a specific error that can handle many missing
+ # keys.
+ trace.mutter("missing keys during fetch: %r", missing_text_keys)
+ a_missing_key = missing_text_keys.pop()
+ raise errors.RevisionNotPresent(a_missing_key[1],
+ a_missing_key[0])
+ # copy text keys and adjust values
+ self.pb.update("Copying content texts", 3)
+ total_items, readv_group_iter = self._least_readv_node_readv(text_nodes)
+ list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
+ self.new_pack.text_index, readv_group_iter, total_items))
+ self._log_copied_texts()
+
+ def _create_pack_from_packs(self):
+ self.pb.update("Opening pack", 0, 5)
+ self.new_pack = self.open_pack()
+ new_pack = self.new_pack
+ # buffer data - we won't be reading-back during the pack creation and
+ # this makes a significant difference on sftp pushes.
+ new_pack.set_write_cache_size(1024*1024)
+ if 'pack' in debug.debug_flags:
+ plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
+ for a_pack in self.packs]
+ if self.revision_ids is not None:
+ rev_count = len(self.revision_ids)
+ else:
+ rev_count = 'all'
+ trace.mutter('%s: create_pack: creating pack from source packs: '
+ '%s%s %s revisions wanted %s t=0',
+ time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
+ plain_pack_list, rev_count)
+ self._copy_revision_texts()
+ self._copy_inventory_texts()
+ self._copy_text_texts()
+ # select signature keys
+ signature_filter = self._revision_keys # same keyspace
+ signature_index_map, signature_indices = self._pack_map_and_index_list(
+ 'signature_index')
+ signature_nodes = self._index_contents(signature_indices,
+ signature_filter)
+ # copy signature keys and adjust values
+ self.pb.update("Copying signature texts", 4)
+ self._copy_nodes(signature_nodes, signature_index_map, new_pack._writer,
+ new_pack.signature_index)
+ if 'pack' in debug.debug_flags:
+ trace.mutter('%s: create_pack: revision signatures copied: %s%s %d items t+%6.3fs',
+ time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
+ new_pack.signature_index.key_count(),
+ time.time() - new_pack.start_time)
+ new_pack._check_references()
+ if not self._use_pack(new_pack):
+ new_pack.abort()
+ return None
+ self.pb.update("Finishing pack", 5)
+ new_pack.finish()
+ self._pack_collection.allocate(new_pack)
+ return new_pack
+
+ def _least_readv_node_readv(self, nodes):
+ """Generate request groups for nodes using the least readv's.
+
+ :param nodes: An iterable of graph index nodes.
+ :return: Total node count and an iterator of the data needed to perform
+ readvs to obtain the data for nodes. Each item yielded by the
+ iterator is a tuple with:
+ index, readv_vector, node_vector. readv_vector is a list ready to
+ hand to the transport readv method, and node_vector is a list of
+ (key, eol_flag, references) for the node retrieved by the
+ matching readv_vector.
+ """
+ # group by pack so we do one readv per pack
+ nodes = sorted(nodes)
+ total = len(nodes)
+ request_groups = {}
+ for index, key, value, references in nodes:
+ if index not in request_groups:
+ request_groups[index] = []
+ request_groups[index].append((key, value, references))
+ result = []
+ for index, items in request_groups.iteritems():
+ pack_readv_requests = []
+ for key, value, references in items:
+ # ---- KnitGraphIndex.get_position
+ bits = value[1:].split(' ')
+ offset, length = int(bits[0]), int(bits[1])
+ pack_readv_requests.append(
+ ((offset, length), (key, value[0], references)))
+ # linear scan up the pack to maximum range combining.
+ pack_readv_requests.sort()
+ # split out the readv and the node data.
+ pack_readv = [readv for readv, node in pack_readv_requests]
+ node_vector = [node for readv, node in pack_readv_requests]
+ result.append((index, pack_readv, node_vector))
+ return total, result
+
+ def _revision_node_readv(self, revision_nodes):
+ """Return the total revisions and the readv's to issue.
+
+ :param revision_nodes: The revision index contents for the packs being
+ incorporated into the new pack.
+ :return: As per _least_readv_node_readv.
+ """
+ return self._least_readv_node_readv(revision_nodes)
+
+
+class KnitReconcilePacker(KnitPacker):
+ """A packer which regenerates indices etc as it copies.
+
+ This is used by ``bzr reconcile`` to cause parent text pointers to be
+ regenerated.
+ """
+
+ def __init__(self, *args, **kwargs):
+ super(KnitReconcilePacker, self).__init__(*args, **kwargs)
+ self._data_changed = False
+
+ def _process_inventory_lines(self, inv_lines):
+ """Generate a text key reference map rather for reconciling with."""
+ repo = self._pack_collection.repo
+ refs = repo._serializer._find_text_key_references(inv_lines)
+ self._text_refs = refs
+ # during reconcile we:
+ # - convert unreferenced texts to full texts
+ # - correct texts which reference a text not copied to be full texts
+ # - copy all others as-is but with corrected parents.
+ # - so at this point we don't know enough to decide what becomes a full
+ # text.
+ self._text_filter = None
+
+ def _copy_text_texts(self):
+ """generate what texts we should have and then copy."""
+ self.pb.update("Copying content texts", 3)
+ # we have three major tasks here:
+ # 1) generate the ideal index
+ repo = self._pack_collection.repo
+ ancestors = dict([(key[0], tuple(ref[0] for ref in refs[0])) for
+ _1, key, _2, refs in
+ self.new_pack.revision_index.iter_all_entries()])
+ ideal_index = repo._generate_text_key_index(self._text_refs, ancestors)
+ # 2) generate a text_nodes list that contains all the deltas that can
+ # be used as-is, with corrected parents.
+ ok_nodes = []
+ bad_texts = []
+ discarded_nodes = []
+ NULL_REVISION = _mod_revision.NULL_REVISION
+ text_index_map, text_nodes = self._get_text_nodes()
+ for node in text_nodes:
+ # 0 - index
+ # 1 - key
+ # 2 - value
+ # 3 - refs
+ try:
+ ideal_parents = tuple(ideal_index[node[1]])
+ except KeyError:
+ discarded_nodes.append(node)
+ self._data_changed = True
+ else:
+ if ideal_parents == (NULL_REVISION,):
+ ideal_parents = ()
+ if ideal_parents == node[3][0]:
+ # no change needed.
+ ok_nodes.append(node)
+ elif ideal_parents[0:1] == node[3][0][0:1]:
+ # the left most parent is the same, or there are no parents
+ # today. Either way, we can preserve the representation as
+ # long as we change the refs to be inserted.
+ self._data_changed = True
+ ok_nodes.append((node[0], node[1], node[2],
+ (ideal_parents, node[3][1])))
+ self._data_changed = True
+ else:
+ # Reinsert this text completely
+ bad_texts.append((node[1], ideal_parents))
+ self._data_changed = True
+ # we're finished with some data.
+ del ideal_index
+ del text_nodes
+ # 3) bulk copy the ok data
+ total_items, readv_group_iter = self._least_readv_node_readv(ok_nodes)
+ list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
+ self.new_pack.text_index, readv_group_iter, total_items))
+ # 4) adhoc copy all the other texts.
+ # We have to topologically insert all texts otherwise we can fail to
+ # reconcile when parts of a single delta chain are preserved intact,
+ # and other parts are not. E.g. Discarded->d1->d2->d3. d1 will be
+ # reinserted, and if d3 has incorrect parents it will also be
+ # reinserted. If we insert d3 first, d2 is present (as it was bulk
+ # copied), so we will try to delta, but d2 is not currently able to be
+ # extracted because its basis d1 is not present. Topologically sorting
+ # addresses this. The following generates a sort for all the texts that
+ # are being inserted without having to reference the entire text key
+ # space (we only topo sort the revisions, which is smaller).
+ topo_order = tsort.topo_sort(ancestors)
+ rev_order = dict(zip(topo_order, range(len(topo_order))))
+ bad_texts.sort(key=lambda key:rev_order.get(key[0][1], 0))
+ transaction = repo.get_transaction()
+ file_id_index = GraphIndexPrefixAdapter(
+ self.new_pack.text_index,
+ ('blank', ), 1,
+ add_nodes_callback=self.new_pack.text_index.add_nodes)
+ data_access = _DirectPackAccess(
+ {self.new_pack.text_index:self.new_pack.access_tuple()})
+ data_access.set_writer(self.new_pack._writer, self.new_pack.text_index,
+ self.new_pack.access_tuple())
+ output_texts = KnitVersionedFiles(
+ _KnitGraphIndex(self.new_pack.text_index,
+ add_callback=self.new_pack.text_index.add_nodes,
+ deltas=True, parents=True, is_locked=repo.is_locked),
+ data_access=data_access, max_delta_chain=200)
+ for key, parent_keys in bad_texts:
+ # We refer to the new pack to delta data being output.
+ # A possible improvement would be to catch errors on short reads
+ # and only flush then.
+ self.new_pack.flush()
+ parents = []
+ for parent_key in parent_keys:
+ if parent_key[0] != key[0]:
+ # Graph parents must match the fileid
+ raise errors.BzrError('Mismatched key parent %r:%r' %
+ (key, parent_keys))
+ parents.append(parent_key[1])
+ text_lines = osutils.split_lines(repo.texts.get_record_stream(
+ [key], 'unordered', True).next().get_bytes_as('fulltext'))
+ output_texts.add_lines(key, parent_keys, text_lines,
+ random_id=True, check_content=False)
+ # 5) check that nothing inserted has a reference outside the keyspace.
+ missing_text_keys = self.new_pack.text_index._external_references()
+ if missing_text_keys:
+ raise errors.BzrCheckError('Reference to missing compression parents %r'
+ % (missing_text_keys,))
+ self._log_copied_texts()
+
+ def _use_pack(self, new_pack):
+ """Override _use_pack to check for reconcile having changed content."""
+ # XXX: we might be better checking this at the copy time.
+ original_inventory_keys = set()
+ inv_index = self._pack_collection.inventory_index.combined_index
+ for entry in inv_index.iter_all_entries():
+ original_inventory_keys.add(entry[1])
+ new_inventory_keys = set()
+ for entry in new_pack.inventory_index.iter_all_entries():
+ new_inventory_keys.add(entry[1])
+ if new_inventory_keys != original_inventory_keys:
+ self._data_changed = True
+ return new_pack.data_inserted() and self._data_changed
+
+
+class OptimisingKnitPacker(KnitPacker):
+ """A packer which spends more time to create better disk layouts."""
+
+ def _revision_node_readv(self, revision_nodes):
+ """Return the total revisions and the readv's to issue.
+
+ This sort places revisions in topological order with the ancestors
+ after the children.
+
+ :param revision_nodes: The revision index contents for the packs being
+ incorporated into the new pack.
+ :return: As per _least_readv_node_readv.
+ """
+ # build an ancestors dict
+ ancestors = {}
+ by_key = {}
+ for index, key, value, references in revision_nodes:
+ ancestors[key] = references[0]
+ by_key[key] = (index, value, references)
+ order = tsort.topo_sort(ancestors)
+ total = len(order)
+ # Single IO is pathological, but it will work as a starting point.
+ requests = []
+ for key in reversed(order):
+ index, value, references = by_key[key]
+ # ---- KnitGraphIndex.get_position
+ bits = value[1:].split(' ')
+ offset, length = int(bits[0]), int(bits[1])
+ requests.append(
+ (index, [(offset, length)], [(key, value[0], references)]))
+ # TODO: combine requests in the same index that are in ascending order.
+ return total, requests
+
+ def open_pack(self):
+ """Open a pack for the pack we are creating."""
+ new_pack = super(OptimisingKnitPacker, self).open_pack()
+ # Turn on the optimization flags for all the index builders.
+ new_pack.revision_index.set_optimize(for_size=True)
+ new_pack.inventory_index.set_optimize(for_size=True)
+ new_pack.text_index.set_optimize(for_size=True)
+ new_pack.signature_index.set_optimize(for_size=True)
+ return new_pack
+
+
+class KnitRepositoryPackCollection(RepositoryPackCollection):
+ """A knit pack collection."""
+
+ pack_factory = NewPack
+ resumed_pack_factory = ResumedPack
+ normal_packer_class = KnitPacker
+ optimising_packer_class = OptimisingKnitPacker
+
+
+
diff --git a/bzrlib/repofmt/knitrepo.py b/bzrlib/repofmt/knitrepo.py
new file mode 100644
index 0000000..624c4ea
--- /dev/null
+++ b/bzrlib/repofmt/knitrepo.py
@@ -0,0 +1,522 @@
+# Copyright (C) 2007-2010 Canonical Ltd
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+from __future__ import absolute_import
+
+from bzrlib.lazy_import import lazy_import
+lazy_import(globals(), """
+import itertools
+
+from bzrlib import (
+ controldir,
+ errors,
+ knit as _mod_knit,
+ lockable_files,
+ lockdir,
+ osutils,
+ revision as _mod_revision,
+ trace,
+ transactions,
+ versionedfile,
+ xml5,
+ xml6,
+ xml7,
+ )
+""")
+from bzrlib.decorators import needs_read_lock, needs_write_lock
+from bzrlib.repository import (
+ InterRepository,
+ IsInWriteGroupError,
+ RepositoryFormatMetaDir,
+ )
+from bzrlib.vf_repository import (
+ InterSameDataRepository,
+ MetaDirVersionedFileRepository,
+ MetaDirVersionedFileRepositoryFormat,
+ VersionedFileCommitBuilder,
+ VersionedFileRootCommitBuilder,
+ )
+from bzrlib import symbol_versioning
+
+
+class _KnitParentsProvider(object):
+
+ def __init__(self, knit):
+ self._knit = knit
+
+ def __repr__(self):
+ return 'KnitParentsProvider(%r)' % self._knit
+
+ def get_parent_map(self, keys):
+ """See graph.StackedParentsProvider.get_parent_map"""
+ parent_map = {}
+ for revision_id in keys:
+ if revision_id is None:
+ raise ValueError('get_parent_map(None) is not valid')
+ if revision_id == _mod_revision.NULL_REVISION:
+ parent_map[revision_id] = ()
+ else:
+ try:
+ parents = tuple(
+ self._knit.get_parents_with_ghosts(revision_id))
+ except errors.RevisionNotPresent:
+ continue
+ else:
+ if len(parents) == 0:
+ parents = (_mod_revision.NULL_REVISION,)
+ parent_map[revision_id] = parents
+ return parent_map
+
+
+class _KnitsParentsProvider(object):
+
+ def __init__(self, knit, prefix=()):
+ """Create a parent provider for string keys mapped to tuple keys."""
+ self._knit = knit
+ self._prefix = prefix
+
+ def __repr__(self):
+ return 'KnitsParentsProvider(%r)' % self._knit
+
+ def get_parent_map(self, keys):
+ """See graph.StackedParentsProvider.get_parent_map"""
+ parent_map = self._knit.get_parent_map(
+ [self._prefix + (key,) for key in keys])
+ result = {}
+ for key, parents in parent_map.items():
+ revid = key[-1]
+ if len(parents) == 0:
+ parents = (_mod_revision.NULL_REVISION,)
+ else:
+ parents = tuple(parent[-1] for parent in parents)
+ result[revid] = parents
+ for revision_id in keys:
+ if revision_id == _mod_revision.NULL_REVISION:
+ result[revision_id] = ()
+ return result
+
+
+class KnitRepository(MetaDirVersionedFileRepository):
+ """Knit format repository."""
+
+ # These attributes are inherited from the Repository base class. Setting
+ # them to None ensures that if the constructor is changed to not initialize
+ # them, or a subclass fails to call the constructor, that an error will
+ # occur rather than the system working but generating incorrect data.
+ _commit_builder_class = None
+ _serializer = None
+
+ def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
+ _serializer):
+ super(KnitRepository, self).__init__(_format, a_bzrdir, control_files)
+ self._commit_builder_class = _commit_builder_class
+ self._serializer = _serializer
+ self._reconcile_fixes_text_parents = True
+
+ @needs_read_lock
+ def _all_revision_ids(self):
+ """See Repository.all_revision_ids()."""
+ return [key[0] for key in self.revisions.keys()]
+
+ def _activate_new_inventory(self):
+ """Put a replacement inventory.new into use as inventories."""
+ # Copy the content across
+ t = self._transport
+ t.copy('inventory.new.kndx', 'inventory.kndx')
+ try:
+ t.copy('inventory.new.knit', 'inventory.knit')
+ except errors.NoSuchFile:
+ # empty inventories knit
+ t.delete('inventory.knit')
+ # delete the temp inventory
+ t.delete('inventory.new.kndx')
+ try:
+ t.delete('inventory.new.knit')
+ except errors.NoSuchFile:
+ # empty inventories knit
+ pass
+ # Force index reload (sanity check)
+ self.inventories._index._reset_cache()
+ self.inventories.keys()
+
+ def _backup_inventory(self):
+ t = self._transport
+ t.copy('inventory.kndx', 'inventory.backup.kndx')
+ t.copy('inventory.knit', 'inventory.backup.knit')
+
+ def _move_file_id(self, from_id, to_id):
+ t = self._transport.clone('knits')
+ from_rel_url = self.texts._index._mapper.map((from_id, None))
+ to_rel_url = self.texts._index._mapper.map((to_id, None))
+ # We expect both files to always exist in this case.
+ for suffix in ('.knit', '.kndx'):
+ t.rename(from_rel_url + suffix, to_rel_url + suffix)
+
+ def _remove_file_id(self, file_id):
+ t = self._transport.clone('knits')
+ rel_url = self.texts._index._mapper.map((file_id, None))
+ for suffix in ('.kndx', '.knit'):
+ try:
+ t.delete(rel_url + suffix)
+ except errors.NoSuchFile:
+ pass
+
+ def _temp_inventories(self):
+ result = self._format._get_inventories(self._transport, self,
+ 'inventory.new')
+ # Reconciling when the output has no revisions would result in no
+ # writes - but we want to ensure there is an inventory for
+ # compatibility with older clients that don't lazy-load.
+ result.get_parent_map([('A',)])
+ return result
+
+ @needs_read_lock
+ def get_revision(self, revision_id):
+ """Return the Revision object for a named revision"""
+ revision_id = osutils.safe_revision_id(revision_id)
+ return self.get_revision_reconcile(revision_id)
+
+ def _refresh_data(self):
+ if not self.is_locked():
+ return
+ if self.is_in_write_group():
+ raise IsInWriteGroupError(self)
+ # Create a new transaction to force all knits to see the scope change.
+ # This is safe because we're outside a write group.
+ self.control_files._finish_transaction()
+ if self.is_write_locked():
+ self.control_files._set_write_transaction()
+ else:
+ self.control_files._set_read_transaction()
+
+ @needs_write_lock
+ def reconcile(self, other=None, thorough=False):
+ """Reconcile this repository."""
+ from bzrlib.reconcile import KnitReconciler
+ reconciler = KnitReconciler(self, thorough=thorough)
+ reconciler.reconcile()
+ return reconciler
+
+ def _make_parents_provider(self):
+ return _KnitsParentsProvider(self.revisions)
+
+
+class RepositoryFormatKnit(MetaDirVersionedFileRepositoryFormat):
+ """Bzr repository knit format (generalized).
+
+ This repository format has:
+ - knits for file texts and inventory
+ - hash subdirectory based stores.
+ - knits for revisions and signatures
+ - TextStores for revisions and signatures.
+ - a format marker of its own
+ - an optional 'shared-storage' flag
+ - an optional 'no-working-trees' flag
+ - a LockDir lock
+ """
+
+ # Set this attribute in derived classes to control the repository class
+ # created by open and initialize.
+ repository_class = None
+ # Set this attribute in derived classes to control the
+ # _commit_builder_class that the repository objects will have passed to
+ # their constructor.
+ _commit_builder_class = None
+ # Set this attribute in derived clases to control the _serializer that the
+ # repository objects will have passed to their constructor.
+ @property
+ def _serializer(self):
+ return xml5.serializer_v5
+ # Knit based repositories handle ghosts reasonably well.
+ supports_ghosts = True
+ # External lookups are not supported in this format.
+ supports_external_lookups = False
+ # No CHK support.
+ supports_chks = False
+ _fetch_order = 'topological'
+ _fetch_uses_deltas = True
+ fast_deltas = False
+ supports_funky_characters = True
+ # The revision.kndx could potentially claim a revision has a different
+ # parent to the revision text.
+ revision_graph_can_have_wrong_parents = True
+
+ def _get_inventories(self, repo_transport, repo, name='inventory'):
+ mapper = versionedfile.ConstantMapper(name)
+ index = _mod_knit._KndxIndex(repo_transport, mapper,
+ repo.get_transaction, repo.is_write_locked, repo.is_locked)
+ access = _mod_knit._KnitKeyAccess(repo_transport, mapper)
+ return _mod_knit.KnitVersionedFiles(index, access, annotated=False)
+
+ def _get_revisions(self, repo_transport, repo):
+ mapper = versionedfile.ConstantMapper('revisions')
+ index = _mod_knit._KndxIndex(repo_transport, mapper,
+ repo.get_transaction, repo.is_write_locked, repo.is_locked)
+ access = _mod_knit._KnitKeyAccess(repo_transport, mapper)
+ return _mod_knit.KnitVersionedFiles(index, access, max_delta_chain=0,
+ annotated=False)
+
+ def _get_signatures(self, repo_transport, repo):
+ mapper = versionedfile.ConstantMapper('signatures')
+ index = _mod_knit._KndxIndex(repo_transport, mapper,
+ repo.get_transaction, repo.is_write_locked, repo.is_locked)
+ access = _mod_knit._KnitKeyAccess(repo_transport, mapper)
+ return _mod_knit.KnitVersionedFiles(index, access, max_delta_chain=0,
+ annotated=False)
+
+ def _get_texts(self, repo_transport, repo):
+ mapper = versionedfile.HashEscapedPrefixMapper()
+ base_transport = repo_transport.clone('knits')
+ index = _mod_knit._KndxIndex(base_transport, mapper,
+ repo.get_transaction, repo.is_write_locked, repo.is_locked)
+ access = _mod_knit._KnitKeyAccess(base_transport, mapper)
+ return _mod_knit.KnitVersionedFiles(index, access, max_delta_chain=200,
+ annotated=True)
+
+ def initialize(self, a_bzrdir, shared=False):
+ """Create a knit format 1 repository.
+
+ :param a_bzrdir: bzrdir to contain the new repository; must already
+ be initialized.
+ :param shared: If true the repository will be initialized as a shared
+ repository.
+ """
+ trace.mutter('creating repository in %s.', a_bzrdir.transport.base)
+ dirs = ['knits']
+ files = []
+ utf8_files = [('format', self.get_format_string())]
+
+ self._upload_blank_content(a_bzrdir, dirs, files, utf8_files, shared)
+ repo_transport = a_bzrdir.get_repository_transport(None)
+ control_files = lockable_files.LockableFiles(repo_transport,
+ 'lock', lockdir.LockDir)
+ transaction = transactions.WriteTransaction()
+ result = self.open(a_bzrdir=a_bzrdir, _found=True)
+ result.lock_write()
+ # the revision id here is irrelevant: it will not be stored, and cannot
+ # already exist, we do this to create files on disk for older clients.
+ result.inventories.get_parent_map([('A',)])
+ result.revisions.get_parent_map([('A',)])
+ result.signatures.get_parent_map([('A',)])
+ result.unlock()
+ self._run_post_repo_init_hooks(result, a_bzrdir, shared)
+ return result
+
+ def open(self, a_bzrdir, _found=False, _override_transport=None):
+ """See RepositoryFormat.open().
+
+ :param _override_transport: INTERNAL USE ONLY. Allows opening the
+ repository at a slightly different url
+ than normal. I.e. during 'upgrade'.
+ """
+ if not _found:
+ format = RepositoryFormatMetaDir.find_format(a_bzrdir)
+ if _override_transport is not None:
+ repo_transport = _override_transport
+ else:
+ repo_transport = a_bzrdir.get_repository_transport(None)
+ control_files = lockable_files.LockableFiles(repo_transport,
+ 'lock', lockdir.LockDir)
+ repo = self.repository_class(_format=self,
+ a_bzrdir=a_bzrdir,
+ control_files=control_files,
+ _commit_builder_class=self._commit_builder_class,
+ _serializer=self._serializer)
+ repo.revisions = self._get_revisions(repo_transport, repo)
+ repo.signatures = self._get_signatures(repo_transport, repo)
+ repo.inventories = self._get_inventories(repo_transport, repo)
+ repo.texts = self._get_texts(repo_transport, repo)
+ repo.chk_bytes = None
+ repo._transport = repo_transport
+ return repo
+
+
+class RepositoryFormatKnit1(RepositoryFormatKnit):
+ """Bzr repository knit format 1.
+
+ This repository format has:
+ - knits for file texts and inventory
+ - hash subdirectory based stores.
+ - knits for revisions and signatures
+ - TextStores for revisions and signatures.
+ - a format marker of its own
+ - an optional 'shared-storage' flag
+ - an optional 'no-working-trees' flag
+ - a LockDir lock
+
+ This format was introduced in bzr 0.8.
+ """
+
+ repository_class = KnitRepository
+ _commit_builder_class = VersionedFileCommitBuilder
+ @property
+ def _serializer(self):
+ return xml5.serializer_v5
+
+ def __ne__(self, other):
+ return self.__class__ is not other.__class__
+
+ @classmethod
+ def get_format_string(cls):
+ """See RepositoryFormat.get_format_string()."""
+ return "Bazaar-NG Knit Repository Format 1"
+
+ def get_format_description(self):
+ """See RepositoryFormat.get_format_description()."""
+ return "Knit repository format 1"
+
+
+class RepositoryFormatKnit3(RepositoryFormatKnit):
+ """Bzr repository knit format 3.
+
+ This repository format has:
+ - knits for file texts and inventory
+ - hash subdirectory based stores.
+ - knits for revisions and signatures
+ - TextStores for revisions and signatures.
+ - a format marker of its own
+ - an optional 'shared-storage' flag
+ - an optional 'no-working-trees' flag
+ - a LockDir lock
+ - support for recording full info about the tree root
+ - support for recording tree-references
+ """
+
+ repository_class = KnitRepository
+ _commit_builder_class = VersionedFileRootCommitBuilder
+ rich_root_data = True
+ experimental = True
+ supports_tree_reference = True
+ @property
+ def _serializer(self):
+ return xml7.serializer_v7
+
+ def _get_matching_bzrdir(self):
+ return controldir.format_registry.make_bzrdir('dirstate-with-subtree')
+
+ def _ignore_setting_bzrdir(self, format):
+ pass
+
+ _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
+
+ @classmethod
+ def get_format_string(cls):
+ """See RepositoryFormat.get_format_string()."""
+ return "Bazaar Knit Repository Format 3 (bzr 0.15)\n"
+
+ def get_format_description(self):
+ """See RepositoryFormat.get_format_description()."""
+ return "Knit repository format 3"
+
+
+class RepositoryFormatKnit4(RepositoryFormatKnit):
+ """Bzr repository knit format 4.
+
+ This repository format has everything in format 3, except for
+ tree-references:
+ - knits for file texts and inventory
+ - hash subdirectory based stores.
+ - knits for revisions and signatures
+ - TextStores for revisions and signatures.
+ - a format marker of its own
+ - an optional 'shared-storage' flag
+ - an optional 'no-working-trees' flag
+ - a LockDir lock
+ - support for recording full info about the tree root
+ """
+
+ repository_class = KnitRepository
+ _commit_builder_class = VersionedFileRootCommitBuilder
+ rich_root_data = True
+ supports_tree_reference = False
+ @property
+ def _serializer(self):
+ return xml6.serializer_v6
+
+ def _get_matching_bzrdir(self):
+ return controldir.format_registry.make_bzrdir('rich-root')
+
+ def _ignore_setting_bzrdir(self, format):
+ pass
+
+ _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
+
+ @classmethod
+ def get_format_string(cls):
+ """See RepositoryFormat.get_format_string()."""
+ return 'Bazaar Knit Repository Format 4 (bzr 1.0)\n'
+
+ def get_format_description(self):
+ """See RepositoryFormat.get_format_description()."""
+ return "Knit repository format 4"
+
+
+class InterKnitRepo(InterSameDataRepository):
+ """Optimised code paths between Knit based repositories."""
+
+ @classmethod
+ def _get_repo_format_to_test(self):
+ return RepositoryFormatKnit1()
+
+ @staticmethod
+ def is_compatible(source, target):
+ """Be compatible with known Knit formats.
+
+ We don't test for the stores being of specific types because that
+ could lead to confusing results, and there is no need to be
+ overly general.
+ """
+ try:
+ are_knits = (isinstance(source._format, RepositoryFormatKnit) and
+ isinstance(target._format, RepositoryFormatKnit))
+ except AttributeError:
+ return False
+ return are_knits and InterRepository._same_model(source, target)
+
+ @needs_read_lock
+ def search_missing_revision_ids(self,
+ find_ghosts=True, revision_ids=None, if_present_ids=None,
+ limit=None):
+ """See InterRepository.search_missing_revision_ids()."""
+ source_ids_set = self._present_source_revisions_for(
+ revision_ids, if_present_ids)
+ # source_ids is the worst possible case we may need to pull.
+ # now we want to filter source_ids against what we actually
+ # have in target, but don't try to check for existence where we know
+ # we do not have a revision as that would be pointless.
+ target_ids = set(self.target.all_revision_ids())
+ possibly_present_revisions = target_ids.intersection(source_ids_set)
+ actually_present_revisions = set(
+ self.target._eliminate_revisions_not_present(possibly_present_revisions))
+ required_revisions = source_ids_set.difference(actually_present_revisions)
+ if revision_ids is not None:
+ # we used get_ancestry to determine source_ids then we are assured all
+ # revisions referenced are present as they are installed in topological order.
+ # and the tip revision was validated by get_ancestry.
+ result_set = required_revisions
+ else:
+ # if we just grabbed the possibly available ids, then
+ # we only have an estimate of whats available and need to validate
+ # that against the revision records.
+ result_set = set(
+ self.source._eliminate_revisions_not_present(required_revisions))
+ if limit is not None:
+ topo_ordered = self.source.get_graph().iter_topo_order(result_set)
+ result_set = set(itertools.islice(topo_ordered, limit))
+ return self.source.revision_ids_to_search_result(result_set)
+
+
+InterRepository.register_optimiser(InterKnitRepo)
diff --git a/bzrlib/repofmt/pack_repo.py b/bzrlib/repofmt/pack_repo.py
new file mode 100644
index 0000000..d513d95
--- /dev/null
+++ b/bzrlib/repofmt/pack_repo.py
@@ -0,0 +1,2091 @@
+# Copyright (C) 2007-2011 Canonical Ltd
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+from __future__ import absolute_import
+
+import re
+import sys
+
+from bzrlib.lazy_import import lazy_import
+lazy_import(globals(), """
+from itertools import izip
+import time
+
+from bzrlib import (
+ chk_map,
+ cleanup,
+ config,
+ debug,
+ graph,
+ osutils,
+ pack,
+ transactions,
+ tsort,
+ ui,
+ )
+from bzrlib.index import (
+ CombinedGraphIndex,
+ GraphIndexPrefixAdapter,
+ )
+""")
+from bzrlib import (
+ btree_index,
+ errors,
+ lockable_files,
+ lockdir,
+ )
+
+from bzrlib.decorators import (
+ needs_read_lock,
+ needs_write_lock,
+ only_raises,
+ )
+from bzrlib.lock import LogicalLockResult
+from bzrlib.repository import (
+ _LazyListJoin,
+ MetaDirRepository,
+ RepositoryFormatMetaDir,
+ RepositoryWriteLockResult,
+ )
+from bzrlib.vf_repository import (
+ MetaDirVersionedFileRepository,
+ MetaDirVersionedFileRepositoryFormat,
+ VersionedFileCommitBuilder,
+ VersionedFileRootCommitBuilder,
+ )
+from bzrlib.trace import (
+ mutter,
+ note,
+ warning,
+ )
+
+
+class PackCommitBuilder(VersionedFileCommitBuilder):
+ """Subclass of VersionedFileCommitBuilder to add texts with pack semantics.
+
+ Specifically this uses one knit object rather than one knit object per
+ added text, reducing memory and object pressure.
+ """
+
+ def __init__(self, repository, parents, config, timestamp=None,
+ timezone=None, committer=None, revprops=None,
+ revision_id=None, lossy=False):
+ VersionedFileCommitBuilder.__init__(self, repository, parents, config,
+ timestamp=timestamp, timezone=timezone, committer=committer,
+ revprops=revprops, revision_id=revision_id, lossy=lossy)
+ self._file_graph = graph.Graph(
+ repository._pack_collection.text_index.combined_index)
+
+ def _heads(self, file_id, revision_ids):
+ keys = [(file_id, revision_id) for revision_id in revision_ids]
+ return set([key[1] for key in self._file_graph.heads(keys)])
+
+
+class PackRootCommitBuilder(VersionedFileRootCommitBuilder):
+ """A subclass of RootCommitBuilder to add texts with pack semantics.
+
+ Specifically this uses one knit object rather than one knit object per
+ added text, reducing memory and object pressure.
+ """
+
+ def __init__(self, repository, parents, config, timestamp=None,
+ timezone=None, committer=None, revprops=None,
+ revision_id=None, lossy=False):
+ super(PackRootCommitBuilder, self).__init__(repository, parents,
+ config, timestamp=timestamp, timezone=timezone,
+ committer=committer, revprops=revprops, revision_id=revision_id,
+ lossy=lossy)
+ self._file_graph = graph.Graph(
+ repository._pack_collection.text_index.combined_index)
+
+ def _heads(self, file_id, revision_ids):
+ keys = [(file_id, revision_id) for revision_id in revision_ids]
+ return set([key[1] for key in self._file_graph.heads(keys)])
+
+
+class Pack(object):
+ """An in memory proxy for a pack and its indices.
+
+ This is a base class that is not directly used, instead the classes
+ ExistingPack and NewPack are used.
+ """
+
+ # A map of index 'type' to the file extension and position in the
+ # index_sizes array.
+ index_definitions = {
+ 'chk': ('.cix', 4),
+ 'revision': ('.rix', 0),
+ 'inventory': ('.iix', 1),
+ 'text': ('.tix', 2),
+ 'signature': ('.six', 3),
+ }
+
+ def __init__(self, revision_index, inventory_index, text_index,
+ signature_index, chk_index=None):
+ """Create a pack instance.
+
+ :param revision_index: A GraphIndex for determining what revisions are
+ present in the Pack and accessing the locations of their texts.
+ :param inventory_index: A GraphIndex for determining what inventories are
+ present in the Pack and accessing the locations of their
+ texts/deltas.
+ :param text_index: A GraphIndex for determining what file texts
+ are present in the pack and accessing the locations of their
+ texts/deltas (via (fileid, revisionid) tuples).
+ :param signature_index: A GraphIndex for determining what signatures are
+ present in the Pack and accessing the locations of their texts.
+ :param chk_index: A GraphIndex for accessing content by CHK, if the
+ pack has one.
+ """
+ self.revision_index = revision_index
+ self.inventory_index = inventory_index
+ self.text_index = text_index
+ self.signature_index = signature_index
+ self.chk_index = chk_index
+
+ def access_tuple(self):
+ """Return a tuple (transport, name) for the pack content."""
+ return self.pack_transport, self.file_name()
+
+ def _check_references(self):
+ """Make sure our external references are present.
+
+ Packs are allowed to have deltas whose base is not in the pack, but it
+ must be present somewhere in this collection. It is not allowed to
+ have deltas based on a fallback repository.
+ (See <https://bugs.launchpad.net/bzr/+bug/288751>)
+ """
+ missing_items = {}
+ for (index_name, external_refs, index) in [
+ ('texts',
+ self._get_external_refs(self.text_index),
+ self._pack_collection.text_index.combined_index),
+ ('inventories',
+ self._get_external_refs(self.inventory_index),
+ self._pack_collection.inventory_index.combined_index),
+ ]:
+ missing = external_refs.difference(
+ k for (idx, k, v, r) in
+ index.iter_entries(external_refs))
+ if missing:
+ missing_items[index_name] = sorted(list(missing))
+ if missing_items:
+ from pprint import pformat
+ raise errors.BzrCheckError(
+ "Newly created pack file %r has delta references to "
+ "items not in its repository:\n%s"
+ % (self, pformat(missing_items)))
+
+ def file_name(self):
+ """Get the file name for the pack on disk."""
+ return self.name + '.pack'
+
+ def get_revision_count(self):
+ return self.revision_index.key_count()
+
+ def index_name(self, index_type, name):
+ """Get the disk name of an index type for pack name 'name'."""
+ return name + Pack.index_definitions[index_type][0]
+
+ def index_offset(self, index_type):
+ """Get the position in a index_size array for a given index type."""
+ return Pack.index_definitions[index_type][1]
+
+ def inventory_index_name(self, name):
+ """The inv index is the name + .iix."""
+ return self.index_name('inventory', name)
+
+ def revision_index_name(self, name):
+ """The revision index is the name + .rix."""
+ return self.index_name('revision', name)
+
+ def signature_index_name(self, name):
+ """The signature index is the name + .six."""
+ return self.index_name('signature', name)
+
+ def text_index_name(self, name):
+ """The text index is the name + .tix."""
+ return self.index_name('text', name)
+
+ def _replace_index_with_readonly(self, index_type):
+ unlimited_cache = False
+ if index_type == 'chk':
+ unlimited_cache = True
+ index = self.index_class(self.index_transport,
+ self.index_name(index_type, self.name),
+ self.index_sizes[self.index_offset(index_type)],
+ unlimited_cache=unlimited_cache)
+ if index_type == 'chk':
+ index._leaf_factory = btree_index._gcchk_factory
+ setattr(self, index_type + '_index', index)
+
+
+class ExistingPack(Pack):
+ """An in memory proxy for an existing .pack and its disk indices."""
+
+ def __init__(self, pack_transport, name, revision_index, inventory_index,
+ text_index, signature_index, chk_index=None):
+ """Create an ExistingPack object.
+
+ :param pack_transport: The transport where the pack file resides.
+ :param name: The name of the pack on disk in the pack_transport.
+ """
+ Pack.__init__(self, revision_index, inventory_index, text_index,
+ signature_index, chk_index)
+ self.name = name
+ self.pack_transport = pack_transport
+ if None in (revision_index, inventory_index, text_index,
+ signature_index, name, pack_transport):
+ raise AssertionError()
+
+ def __eq__(self, other):
+ return self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ def __repr__(self):
+ return "<%s.%s object at 0x%x, %s, %s" % (
+ self.__class__.__module__, self.__class__.__name__, id(self),
+ self.pack_transport, self.name)
+
+
+class ResumedPack(ExistingPack):
+
+ def __init__(self, name, revision_index, inventory_index, text_index,
+ signature_index, upload_transport, pack_transport, index_transport,
+ pack_collection, chk_index=None):
+ """Create a ResumedPack object."""
+ ExistingPack.__init__(self, pack_transport, name, revision_index,
+ inventory_index, text_index, signature_index,
+ chk_index=chk_index)
+ self.upload_transport = upload_transport
+ self.index_transport = index_transport
+ self.index_sizes = [None, None, None, None]
+ indices = [
+ ('revision', revision_index),
+ ('inventory', inventory_index),
+ ('text', text_index),
+ ('signature', signature_index),
+ ]
+ if chk_index is not None:
+ indices.append(('chk', chk_index))
+ self.index_sizes.append(None)
+ for index_type, index in indices:
+ offset = self.index_offset(index_type)
+ self.index_sizes[offset] = index._size
+ self.index_class = pack_collection._index_class
+ self._pack_collection = pack_collection
+ self._state = 'resumed'
+ # XXX: perhaps check that the .pack file exists?
+
+ def access_tuple(self):
+ if self._state == 'finished':
+ return Pack.access_tuple(self)
+ elif self._state == 'resumed':
+ return self.upload_transport, self.file_name()
+ else:
+ raise AssertionError(self._state)
+
+ def abort(self):
+ self.upload_transport.delete(self.file_name())
+ indices = [self.revision_index, self.inventory_index, self.text_index,
+ self.signature_index]
+ if self.chk_index is not None:
+ indices.append(self.chk_index)
+ for index in indices:
+ index._transport.delete(index._name)
+
+ def finish(self):
+ self._check_references()
+ index_types = ['revision', 'inventory', 'text', 'signature']
+ if self.chk_index is not None:
+ index_types.append('chk')
+ for index_type in index_types:
+ old_name = self.index_name(index_type, self.name)
+ new_name = '../indices/' + old_name
+ self.upload_transport.move(old_name, new_name)
+ self._replace_index_with_readonly(index_type)
+ new_name = '../packs/' + self.file_name()
+ self.upload_transport.move(self.file_name(), new_name)
+ self._state = 'finished'
+
+ def _get_external_refs(self, index):
+ """Return compression parents for this index that are not present.
+
+ This returns any compression parents that are referenced by this index,
+ which are not contained *in* this index. They may be present elsewhere.
+ """
+ return index.external_references(1)
+
+
+class NewPack(Pack):
+ """An in memory proxy for a pack which is being created."""
+
+ def __init__(self, pack_collection, upload_suffix='', file_mode=None):
+ """Create a NewPack instance.
+
+ :param pack_collection: A PackCollection into which this is being inserted.
+ :param upload_suffix: An optional suffix to be given to any temporary
+ files created during the pack creation. e.g '.autopack'
+ :param file_mode: Unix permissions for newly created file.
+ """
+ # The relative locations of the packs are constrained, but all are
+ # passed in because the caller has them, so as to avoid object churn.
+ index_builder_class = pack_collection._index_builder_class
+ if pack_collection.chk_index is not None:
+ chk_index = index_builder_class(reference_lists=0)
+ else:
+ chk_index = None
+ Pack.__init__(self,
+ # Revisions: parents list, no text compression.
+ index_builder_class(reference_lists=1),
+ # Inventory: We want to map compression only, but currently the
+ # knit code hasn't been updated enough to understand that, so we
+ # have a regular 2-list index giving parents and compression
+ # source.
+ index_builder_class(reference_lists=2),
+ # Texts: compression and per file graph, for all fileids - so two
+ # reference lists and two elements in the key tuple.
+ index_builder_class(reference_lists=2, key_elements=2),
+ # Signatures: Just blobs to store, no compression, no parents
+ # listing.
+ index_builder_class(reference_lists=0),
+ # CHK based storage - just blobs, no compression or parents.
+ chk_index=chk_index
+ )
+ self._pack_collection = pack_collection
+ # When we make readonly indices, we need this.
+ self.index_class = pack_collection._index_class
+ # where should the new pack be opened
+ self.upload_transport = pack_collection._upload_transport
+ # where are indices written out to
+ self.index_transport = pack_collection._index_transport
+ # where is the pack renamed to when it is finished?
+ self.pack_transport = pack_collection._pack_transport
+ # What file mode to upload the pack and indices with.
+ self._file_mode = file_mode
+ # tracks the content written to the .pack file.
+ self._hash = osutils.md5()
+ # a tuple with the length in bytes of the indices, once the pack
+ # is finalised. (rev, inv, text, sigs, chk_if_in_use)
+ self.index_sizes = None
+ # How much data to cache when writing packs. Note that this is not
+ # synchronised with reads, because it's not in the transport layer, so
+ # is not safe unless the client knows it won't be reading from the pack
+ # under creation.
+ self._cache_limit = 0
+ # the temporary pack file name.
+ self.random_name = osutils.rand_chars(20) + upload_suffix
+ # when was this pack started ?
+ self.start_time = time.time()
+ # open an output stream for the data added to the pack.
+ self.write_stream = self.upload_transport.open_write_stream(
+ self.random_name, mode=self._file_mode)
+ if 'pack' in debug.debug_flags:
+ mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
+ time.ctime(), self.upload_transport.base, self.random_name,
+ time.time() - self.start_time)
+ # A list of byte sequences to be written to the new pack, and the
+ # aggregate size of them. Stored as a list rather than separate
+ # variables so that the _write_data closure below can update them.
+ self._buffer = [[], 0]
+ # create a callable for adding data
+ #
+ # robertc says- this is a closure rather than a method on the object
+ # so that the variables are locals, and faster than accessing object
+ # members.
+ def _write_data(bytes, flush=False, _buffer=self._buffer,
+ _write=self.write_stream.write, _update=self._hash.update):
+ _buffer[0].append(bytes)
+ _buffer[1] += len(bytes)
+ # buffer cap
+ if _buffer[1] > self._cache_limit or flush:
+ bytes = ''.join(_buffer[0])
+ _write(bytes)
+ _update(bytes)
+ _buffer[:] = [[], 0]
+ # expose this on self, for the occasion when clients want to add data.
+ self._write_data = _write_data
+ # a pack writer object to serialise pack records.
+ self._writer = pack.ContainerWriter(self._write_data)
+ self._writer.begin()
+ # what state is the pack in? (open, finished, aborted)
+ self._state = 'open'
+ # no name until we finish writing the content
+ self.name = None
+
+ def abort(self):
+ """Cancel creating this pack."""
+ self._state = 'aborted'
+ self.write_stream.close()
+ # Remove the temporary pack file.
+ self.upload_transport.delete(self.random_name)
+ # The indices have no state on disk.
+
+ def access_tuple(self):
+ """Return a tuple (transport, name) for the pack content."""
+ if self._state == 'finished':
+ return Pack.access_tuple(self)
+ elif self._state == 'open':
+ return self.upload_transport, self.random_name
+ else:
+ raise AssertionError(self._state)
+
+ def data_inserted(self):
+ """True if data has been added to this pack."""
+ return bool(self.get_revision_count() or
+ self.inventory_index.key_count() or
+ self.text_index.key_count() or
+ self.signature_index.key_count() or
+ (self.chk_index is not None and self.chk_index.key_count()))
+
+ def finish_content(self):
+ if self.name is not None:
+ return
+ self._writer.end()
+ if self._buffer[1]:
+ self._write_data('', flush=True)
+ self.name = self._hash.hexdigest()
+
+ def finish(self, suspend=False):
+ """Finish the new pack.
+
+ This:
+ - finalises the content
+ - assigns a name (the md5 of the content, currently)
+ - writes out the associated indices
+ - renames the pack into place.
+ - stores the index size tuple for the pack in the index_sizes
+ attribute.
+ """
+ self.finish_content()
+ if not suspend:
+ self._check_references()
+ # write indices
+ # XXX: It'd be better to write them all to temporary names, then
+ # rename them all into place, so that the window when only some are
+ # visible is smaller. On the other hand none will be seen until
+ # they're in the names list.
+ self.index_sizes = [None, None, None, None]
+ self._write_index('revision', self.revision_index, 'revision',
+ suspend)
+ self._write_index('inventory', self.inventory_index, 'inventory',
+ suspend)
+ self._write_index('text', self.text_index, 'file texts', suspend)
+ self._write_index('signature', self.signature_index,
+ 'revision signatures', suspend)
+ if self.chk_index is not None:
+ self.index_sizes.append(None)
+ self._write_index('chk', self.chk_index,
+ 'content hash bytes', suspend)
+ self.write_stream.close(
+ want_fdatasync=self._pack_collection.config_stack.get('repository.fdatasync'))
+ # Note that this will clobber an existing pack with the same name,
+ # without checking for hash collisions. While this is undesirable this
+ # is something that can be rectified in a subsequent release. One way
+ # to rectify it may be to leave the pack at the original name, writing
+ # its pack-names entry as something like 'HASH: index-sizes
+ # temporary-name'. Allocate that and check for collisions, if it is
+ # collision free then rename it into place. If clients know this scheme
+ # they can handle missing-file errors by:
+ # - try for HASH.pack
+ # - try for temporary-name
+ # - refresh the pack-list to see if the pack is now absent
+ new_name = self.name + '.pack'
+ if not suspend:
+ new_name = '../packs/' + new_name
+ self.upload_transport.move(self.random_name, new_name)
+ self._state = 'finished'
+ if 'pack' in debug.debug_flags:
+ # XXX: size might be interesting?
+ mutter('%s: create_pack: pack finished: %s%s->%s t+%6.3fs',
+ time.ctime(), self.upload_transport.base, self.random_name,
+ new_name, time.time() - self.start_time)
+
+ def flush(self):
+ """Flush any current data."""
+ if self._buffer[1]:
+ bytes = ''.join(self._buffer[0])
+ self.write_stream.write(bytes)
+ self._hash.update(bytes)
+ self._buffer[:] = [[], 0]
+
+ def _get_external_refs(self, index):
+ return index._external_references()
+
+ def set_write_cache_size(self, size):
+ self._cache_limit = size
+
+ def _write_index(self, index_type, index, label, suspend=False):
+ """Write out an index.
+
+ :param index_type: The type of index to write - e.g. 'revision'.
+ :param index: The index object to serialise.
+ :param label: What label to give the index e.g. 'revision'.
+ """
+ index_name = self.index_name(index_type, self.name)
+ if suspend:
+ transport = self.upload_transport
+ else:
+ transport = self.index_transport
+ index_tempfile = index.finish()
+ index_bytes = index_tempfile.read()
+ write_stream = transport.open_write_stream(index_name,
+ mode=self._file_mode)
+ write_stream.write(index_bytes)
+ write_stream.close(
+ want_fdatasync=self._pack_collection.config_stack.get('repository.fdatasync'))
+ self.index_sizes[self.index_offset(index_type)] = len(index_bytes)
+ if 'pack' in debug.debug_flags:
+ # XXX: size might be interesting?
+ mutter('%s: create_pack: wrote %s index: %s%s t+%6.3fs',
+ time.ctime(), label, self.upload_transport.base,
+ self.random_name, time.time() - self.start_time)
+ # Replace the writable index on this object with a readonly,
+ # presently unloaded index. We should alter
+ # the index layer to make its finish() error if add_node is
+ # subsequently used. RBC
+ self._replace_index_with_readonly(index_type)
+
+
+class AggregateIndex(object):
+ """An aggregated index for the RepositoryPackCollection.
+
+ AggregateIndex is reponsible for managing the PackAccess object,
+ Index-To-Pack mapping, and all indices list for a specific type of index
+ such as 'revision index'.
+
+ A CombinedIndex provides an index on a single key space built up
+ from several on-disk indices. The AggregateIndex builds on this
+ to provide a knit access layer, and allows having up to one writable
+ index within the collection.
+ """
+ # XXX: Probably 'can be written to' could/should be separated from 'acts
+ # like a knit index' -- mbp 20071024
+
+ def __init__(self, reload_func=None, flush_func=None):
+ """Create an AggregateIndex.
+
+ :param reload_func: A function to call if we find we are missing an
+ index. Should have the form reload_func() => True if the list of
+ active pack files has changed.
+ """
+ self._reload_func = reload_func
+ self.index_to_pack = {}
+ self.combined_index = CombinedGraphIndex([], reload_func=reload_func)
+ self.data_access = _DirectPackAccess(self.index_to_pack,
+ reload_func=reload_func,
+ flush_func=flush_func)
+ self.add_callback = None
+
+ def add_index(self, index, pack):
+ """Add index to the aggregate, which is an index for Pack pack.
+
+ Future searches on the aggregate index will seach this new index
+ before all previously inserted indices.
+
+ :param index: An Index for the pack.
+ :param pack: A Pack instance.
+ """
+ # expose it to the index map
+ self.index_to_pack[index] = pack.access_tuple()
+ # put it at the front of the linear index list
+ self.combined_index.insert_index(0, index, pack.name)
+
+ def add_writable_index(self, index, pack):
+ """Add an index which is able to have data added to it.
+
+ There can be at most one writable index at any time. Any
+ modifications made to the knit are put into this index.
+
+ :param index: An index from the pack parameter.
+ :param pack: A Pack instance.
+ """
+ if self.add_callback is not None:
+ raise AssertionError(
+ "%s already has a writable index through %s" % \
+ (self, self.add_callback))
+ # allow writing: queue writes to a new index
+ self.add_index(index, pack)
+ # Updates the index to packs mapping as a side effect,
+ self.data_access.set_writer(pack._writer, index, pack.access_tuple())
+ self.add_callback = index.add_nodes
+
+ def clear(self):
+ """Reset all the aggregate data to nothing."""
+ self.data_access.set_writer(None, None, (None, None))
+ self.index_to_pack.clear()
+ del self.combined_index._indices[:]
+ del self.combined_index._index_names[:]
+ self.add_callback = None
+
+ def remove_index(self, index):
+ """Remove index from the indices used to answer queries.
+
+ :param index: An index from the pack parameter.
+ """
+ del self.index_to_pack[index]
+ pos = self.combined_index._indices.index(index)
+ del self.combined_index._indices[pos]
+ del self.combined_index._index_names[pos]
+ if (self.add_callback is not None and
+ getattr(index, 'add_nodes', None) == self.add_callback):
+ self.add_callback = None
+ self.data_access.set_writer(None, None, (None, None))
+
+
+class Packer(object):
+ """Create a pack from packs."""
+
+ def __init__(self, pack_collection, packs, suffix, revision_ids=None,
+ reload_func=None):
+ """Create a Packer.
+
+ :param pack_collection: A RepositoryPackCollection object where the
+ new pack is being written to.
+ :param packs: The packs to combine.
+ :param suffix: The suffix to use on the temporary files for the pack.
+ :param revision_ids: Revision ids to limit the pack to.
+ :param reload_func: A function to call if a pack file/index goes
+ missing. The side effect of calling this function should be to
+ update self.packs. See also AggregateIndex
+ """
+ self.packs = packs
+ self.suffix = suffix
+ self.revision_ids = revision_ids
+ # The pack object we are creating.
+ self.new_pack = None
+ self._pack_collection = pack_collection
+ self._reload_func = reload_func
+ # The index layer keys for the revisions being copied. None for 'all
+ # objects'.
+ self._revision_keys = None
+ # What text keys to copy. None for 'all texts'. This is set by
+ # _copy_inventory_texts
+ self._text_filter = None
+
+ def pack(self, pb=None):
+ """Create a new pack by reading data from other packs.
+
+ This does little more than a bulk copy of data. One key difference
+ is that data with the same item key across multiple packs is elided
+ from the output. The new pack is written into the current pack store
+ along with its indices, and the name added to the pack names. The
+ source packs are not altered and are not required to be in the current
+ pack collection.
+
+ :param pb: An optional progress bar to use. A nested bar is created if
+ this is None.
+ :return: A Pack object, or None if nothing was copied.
+ """
+ # open a pack - using the same name as the last temporary file
+ # - which has already been flushed, so it's safe.
+ # XXX: - duplicate code warning with start_write_group; fix before
+ # considering 'done'.
+ if self._pack_collection._new_pack is not None:
+ raise errors.BzrError('call to %s.pack() while another pack is'
+ ' being written.'
+ % (self.__class__.__name__,))
+ if self.revision_ids is not None:
+ if len(self.revision_ids) == 0:
+ # silly fetch request.
+ return None
+ else:
+ self.revision_ids = frozenset(self.revision_ids)
+ self.revision_keys = frozenset((revid,) for revid in
+ self.revision_ids)
+ if pb is None:
+ self.pb = ui.ui_factory.nested_progress_bar()
+ else:
+ self.pb = pb
+ try:
+ return self._create_pack_from_packs()
+ finally:
+ if pb is None:
+ self.pb.finished()
+
+ def open_pack(self):
+ """Open a pack for the pack we are creating."""
+ new_pack = self._pack_collection.pack_factory(self._pack_collection,
+ upload_suffix=self.suffix,
+ file_mode=self._pack_collection.repo.bzrdir._get_file_mode())
+ # We know that we will process all nodes in order, and don't need to
+ # query, so don't combine any indices spilled to disk until we are done
+ new_pack.revision_index.set_optimize(combine_backing_indices=False)
+ new_pack.inventory_index.set_optimize(combine_backing_indices=False)
+ new_pack.text_index.set_optimize(combine_backing_indices=False)
+ new_pack.signature_index.set_optimize(combine_backing_indices=False)
+ return new_pack
+
+ def _copy_revision_texts(self):
+ """Copy revision data to the new pack."""
+ raise NotImplementedError(self._copy_revision_texts)
+
+ def _copy_inventory_texts(self):
+ """Copy the inventory texts to the new pack.
+
+ self._revision_keys is used to determine what inventories to copy.
+
+ Sets self._text_filter appropriately.
+ """
+ raise NotImplementedError(self._copy_inventory_texts)
+
+ def _copy_text_texts(self):
+ raise NotImplementedError(self._copy_text_texts)
+
+ def _create_pack_from_packs(self):
+ raise NotImplementedError(self._create_pack_from_packs)
+
+ def _log_copied_texts(self):
+ if 'pack' in debug.debug_flags:
+ mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
+ time.ctime(), self._pack_collection._upload_transport.base,
+ self.new_pack.random_name,
+ self.new_pack.text_index.key_count(),
+ time.time() - self.new_pack.start_time)
+
+ def _use_pack(self, new_pack):
+ """Return True if new_pack should be used.
+
+ :param new_pack: The pack that has just been created.
+ :return: True if the pack should be used.
+ """
+ return new_pack.data_inserted()
+
+
+class RepositoryPackCollection(object):
+ """Management of packs within a repository.
+
+ :ivar _names: map of {pack_name: (index_size,)}
+ """
+
+ pack_factory = None
+ resumed_pack_factory = None
+ normal_packer_class = None
+ optimising_packer_class = None
+
+ def __init__(self, repo, transport, index_transport, upload_transport,
+ pack_transport, index_builder_class, index_class,
+ use_chk_index):
+ """Create a new RepositoryPackCollection.
+
+ :param transport: Addresses the repository base directory
+ (typically .bzr/repository/).
+ :param index_transport: Addresses the directory containing indices.
+ :param upload_transport: Addresses the directory into which packs are written
+ while they're being created.
+ :param pack_transport: Addresses the directory of existing complete packs.
+ :param index_builder_class: The index builder class to use.
+ :param index_class: The index class to use.
+ :param use_chk_index: Whether to setup and manage a CHK index.
+ """
+ # XXX: This should call self.reset()
+ self.repo = repo
+ self.transport = transport
+ self._index_transport = index_transport
+ self._upload_transport = upload_transport
+ self._pack_transport = pack_transport
+ self._index_builder_class = index_builder_class
+ self._index_class = index_class
+ self._suffix_offsets = {'.rix': 0, '.iix': 1, '.tix': 2, '.six': 3,
+ '.cix': 4}
+ self.packs = []
+ # name:Pack mapping
+ self._names = None
+ self._packs_by_name = {}
+ # the previous pack-names content
+ self._packs_at_load = None
+ # when a pack is being created by this object, the state of that pack.
+ self._new_pack = None
+ # aggregated revision index data
+ flush = self._flush_new_pack
+ self.revision_index = AggregateIndex(self.reload_pack_names, flush)
+ self.inventory_index = AggregateIndex(self.reload_pack_names, flush)
+ self.text_index = AggregateIndex(self.reload_pack_names, flush)
+ self.signature_index = AggregateIndex(self.reload_pack_names, flush)
+ all_indices = [self.revision_index, self.inventory_index,
+ self.text_index, self.signature_index]
+ if use_chk_index:
+ self.chk_index = AggregateIndex(self.reload_pack_names, flush)
+ all_indices.append(self.chk_index)
+ else:
+ # used to determine if we're using a chk_index elsewhere.
+ self.chk_index = None
+ # Tell all the CombinedGraphIndex objects about each other, so they can
+ # share hints about which pack names to search first.
+ all_combined = [agg_idx.combined_index for agg_idx in all_indices]
+ for combined_idx in all_combined:
+ combined_idx.set_sibling_indices(
+ set(all_combined).difference([combined_idx]))
+ # resumed packs
+ self._resumed_packs = []
+ self.config_stack = config.LocationStack(self.transport.base)
+
+ def __repr__(self):
+ return '%s(%r)' % (self.__class__.__name__, self.repo)
+
+ def add_pack_to_memory(self, pack):
+ """Make a Pack object available to the repository to satisfy queries.
+
+ :param pack: A Pack object.
+ """
+ if pack.name in self._packs_by_name:
+ raise AssertionError(
+ 'pack %s already in _packs_by_name' % (pack.name,))
+ self.packs.append(pack)
+ self._packs_by_name[pack.name] = pack
+ self.revision_index.add_index(pack.revision_index, pack)
+ self.inventory_index.add_index(pack.inventory_index, pack)
+ self.text_index.add_index(pack.text_index, pack)
+ self.signature_index.add_index(pack.signature_index, pack)
+ if self.chk_index is not None:
+ self.chk_index.add_index(pack.chk_index, pack)
+
+ def all_packs(self):
+ """Return a list of all the Pack objects this repository has.
+
+ Note that an in-progress pack being created is not returned.
+
+ :return: A list of Pack objects for all the packs in the repository.
+ """
+ result = []
+ for name in self.names():
+ result.append(self.get_pack_by_name(name))
+ return result
+
+ def autopack(self):
+ """Pack the pack collection incrementally.
+
+ This will not attempt global reorganisation or recompression,
+ rather it will just ensure that the total number of packs does
+ not grow without bound. It uses the _max_pack_count method to
+ determine if autopacking is needed, and the pack_distribution
+ method to determine the number of revisions in each pack.
+
+ If autopacking takes place then the packs name collection will have
+ been flushed to disk - packing requires updating the name collection
+ in synchronisation with certain steps. Otherwise the names collection
+ is not flushed.
+
+ :return: Something evaluating true if packing took place.
+ """
+ while True:
+ try:
+ return self._do_autopack()
+ except errors.RetryAutopack:
+ # If we get a RetryAutopack exception, we should abort the
+ # current action, and retry.
+ pass
+
+ def _do_autopack(self):
+ # XXX: Should not be needed when the management of indices is sane.
+ total_revisions = self.revision_index.combined_index.key_count()
+ total_packs = len(self._names)
+ if self._max_pack_count(total_revisions) >= total_packs:
+ return None
+ # determine which packs need changing
+ pack_distribution = self.pack_distribution(total_revisions)
+ existing_packs = []
+ for pack in self.all_packs():
+ revision_count = pack.get_revision_count()
+ if revision_count == 0:
+ # revision less packs are not generated by normal operation,
+ # only by operations like sign-my-commits, and thus will not
+ # tend to grow rapdily or without bound like commit containing
+ # packs do - leave them alone as packing them really should
+ # group their data with the relevant commit, and that may
+ # involve rewriting ancient history - which autopack tries to
+ # avoid. Alternatively we could not group the data but treat
+ # each of these as having a single revision, and thus add
+ # one revision for each to the total revision count, to get
+ # a matching distribution.
+ continue
+ existing_packs.append((revision_count, pack))
+ pack_operations = self.plan_autopack_combinations(
+ existing_packs, pack_distribution)
+ num_new_packs = len(pack_operations)
+ num_old_packs = sum([len(po[1]) for po in pack_operations])
+ num_revs_affected = sum([po[0] for po in pack_operations])
+ mutter('Auto-packing repository %s, which has %d pack files, '
+ 'containing %d revisions. Packing %d files into %d affecting %d'
+ ' revisions', self, total_packs, total_revisions, num_old_packs,
+ num_new_packs, num_revs_affected)
+ result = self._execute_pack_operations(pack_operations, packer_class=self.normal_packer_class,
+ reload_func=self._restart_autopack)
+ mutter('Auto-packing repository %s completed', self)
+ return result
+
+ def _execute_pack_operations(self, pack_operations, packer_class,
+ reload_func=None):
+ """Execute a series of pack operations.
+
+ :param pack_operations: A list of [revision_count, packs_to_combine].
+ :param packer_class: The class of packer to use
+ :return: The new pack names.
+ """
+ for revision_count, packs in pack_operations:
+ # we may have no-ops from the setup logic
+ if len(packs) == 0:
+ continue
+ packer = packer_class(self, packs, '.autopack',
+ reload_func=reload_func)
+ try:
+ result = packer.pack()
+ except errors.RetryWithNewPacks:
+ # An exception is propagating out of this context, make sure
+ # this packer has cleaned up. Packer() doesn't set its new_pack
+ # state into the RepositoryPackCollection object, so we only
+ # have access to it directly here.
+ if packer.new_pack is not None:
+ packer.new_pack.abort()
+ raise
+ if result is None:
+ return
+ for pack in packs:
+ self._remove_pack_from_memory(pack)
+ # record the newly available packs and stop advertising the old
+ # packs
+ to_be_obsoleted = []
+ for _, packs in pack_operations:
+ to_be_obsoleted.extend(packs)
+ result = self._save_pack_names(clear_obsolete_packs=True,
+ obsolete_packs=to_be_obsoleted)
+ return result
+
+ def _flush_new_pack(self):
+ if self._new_pack is not None:
+ self._new_pack.flush()
+
+ def lock_names(self):
+ """Acquire the mutex around the pack-names index.
+
+ This cannot be used in the middle of a read-only transaction on the
+ repository.
+ """
+ self.repo.control_files.lock_write()
+
+ def _already_packed(self):
+ """Is the collection already packed?"""
+ return not (self.repo._format.pack_compresses or (len(self._names) > 1))
+
+ def pack(self, hint=None, clean_obsolete_packs=False):
+ """Pack the pack collection totally."""
+ self.ensure_loaded()
+ total_packs = len(self._names)
+ if self._already_packed():
+ return
+ total_revisions = self.revision_index.combined_index.key_count()
+ # XXX: the following may want to be a class, to pack with a given
+ # policy.
+ mutter('Packing repository %s, which has %d pack files, '
+ 'containing %d revisions with hint %r.', self, total_packs,
+ total_revisions, hint)
+ while True:
+ try:
+ self._try_pack_operations(hint)
+ except RetryPackOperations:
+ continue
+ break
+
+ if clean_obsolete_packs:
+ self._clear_obsolete_packs()
+
+ def _try_pack_operations(self, hint):
+ """Calculate the pack operations based on the hint (if any), and
+ execute them.
+ """
+ # determine which packs need changing
+ pack_operations = [[0, []]]
+ for pack in self.all_packs():
+ if hint is None or pack.name in hint:
+ # Either no hint was provided (so we are packing everything),
+ # or this pack was included in the hint.
+ pack_operations[-1][0] += pack.get_revision_count()
+ pack_operations[-1][1].append(pack)
+ self._execute_pack_operations(pack_operations,
+ packer_class=self.optimising_packer_class,
+ reload_func=self._restart_pack_operations)
+
+ def plan_autopack_combinations(self, existing_packs, pack_distribution):
+ """Plan a pack operation.
+
+ :param existing_packs: The packs to pack. (A list of (revcount, Pack)
+ tuples).
+ :param pack_distribution: A list with the number of revisions desired
+ in each pack.
+ """
+ if len(existing_packs) <= len(pack_distribution):
+ return []
+ existing_packs.sort(reverse=True)
+ pack_operations = [[0, []]]
+ # plan out what packs to keep, and what to reorganise
+ while len(existing_packs):
+ # take the largest pack, and if it's less than the head of the
+ # distribution chart we will include its contents in the new pack
+ # for that position. If it's larger, we remove its size from the
+ # distribution chart
+ next_pack_rev_count, next_pack = existing_packs.pop(0)
+ if next_pack_rev_count >= pack_distribution[0]:
+ # this is already packed 'better' than this, so we can
+ # not waste time packing it.
+ while next_pack_rev_count > 0:
+ next_pack_rev_count -= pack_distribution[0]
+ if next_pack_rev_count >= 0:
+ # more to go
+ del pack_distribution[0]
+ else:
+ # didn't use that entire bucket up
+ pack_distribution[0] = -next_pack_rev_count
+ else:
+ # add the revisions we're going to add to the next output pack
+ pack_operations[-1][0] += next_pack_rev_count
+ # allocate this pack to the next pack sub operation
+ pack_operations[-1][1].append(next_pack)
+ if pack_operations[-1][0] >= pack_distribution[0]:
+ # this pack is used up, shift left.
+ del pack_distribution[0]
+ pack_operations.append([0, []])
+ # Now that we know which pack files we want to move, shove them all
+ # into a single pack file.
+ final_rev_count = 0
+ final_pack_list = []
+ for num_revs, pack_files in pack_operations:
+ final_rev_count += num_revs
+ final_pack_list.extend(pack_files)
+ if len(final_pack_list) == 1:
+ raise AssertionError('We somehow generated an autopack with a'
+ ' single pack file being moved.')
+ return []
+ return [[final_rev_count, final_pack_list]]
+
+ def ensure_loaded(self):
+ """Ensure we have read names from disk.
+
+ :return: True if the disk names had not been previously read.
+ """
+ # NB: if you see an assertion error here, it's probably access against
+ # an unlocked repo. Naughty.
+ if not self.repo.is_locked():
+ raise errors.ObjectNotLocked(self.repo)
+ if self._names is None:
+ self._names = {}
+ self._packs_at_load = set()
+ for index, key, value in self._iter_disk_pack_index():
+ name = key[0]
+ self._names[name] = self._parse_index_sizes(value)
+ self._packs_at_load.add((key, value))
+ result = True
+ else:
+ result = False
+ # populate all the metadata.
+ self.all_packs()
+ return result
+
+ def _parse_index_sizes(self, value):
+ """Parse a string of index sizes."""
+ return tuple([int(digits) for digits in value.split(' ')])
+
+ def get_pack_by_name(self, name):
+ """Get a Pack object by name.
+
+ :param name: The name of the pack - e.g. '123456'
+ :return: A Pack object.
+ """
+ try:
+ return self._packs_by_name[name]
+ except KeyError:
+ rev_index = self._make_index(name, '.rix')
+ inv_index = self._make_index(name, '.iix')
+ txt_index = self._make_index(name, '.tix')
+ sig_index = self._make_index(name, '.six')
+ if self.chk_index is not None:
+ chk_index = self._make_index(name, '.cix', is_chk=True)
+ else:
+ chk_index = None
+ result = ExistingPack(self._pack_transport, name, rev_index,
+ inv_index, txt_index, sig_index, chk_index)
+ self.add_pack_to_memory(result)
+ return result
+
+ def _resume_pack(self, name):
+ """Get a suspended Pack object by name.
+
+ :param name: The name of the pack - e.g. '123456'
+ :return: A Pack object.
+ """
+ if not re.match('[a-f0-9]{32}', name):
+ # Tokens should be md5sums of the suspended pack file, i.e. 32 hex
+ # digits.
+ raise errors.UnresumableWriteGroup(
+ self.repo, [name], 'Malformed write group token')
+ try:
+ rev_index = self._make_index(name, '.rix', resume=True)
+ inv_index = self._make_index(name, '.iix', resume=True)
+ txt_index = self._make_index(name, '.tix', resume=True)
+ sig_index = self._make_index(name, '.six', resume=True)
+ if self.chk_index is not None:
+ chk_index = self._make_index(name, '.cix', resume=True,
+ is_chk=True)
+ else:
+ chk_index = None
+ result = self.resumed_pack_factory(name, rev_index, inv_index,
+ txt_index, sig_index, self._upload_transport,
+ self._pack_transport, self._index_transport, self,
+ chk_index=chk_index)
+ except errors.NoSuchFile, e:
+ raise errors.UnresumableWriteGroup(self.repo, [name], str(e))
+ self.add_pack_to_memory(result)
+ self._resumed_packs.append(result)
+ return result
+
+ def allocate(self, a_new_pack):
+ """Allocate name in the list of packs.
+
+ :param a_new_pack: A NewPack instance to be added to the collection of
+ packs for this repository.
+ """
+ self.ensure_loaded()
+ if a_new_pack.name in self._names:
+ raise errors.BzrError(
+ 'Pack %r already exists in %s' % (a_new_pack.name, self))
+ self._names[a_new_pack.name] = tuple(a_new_pack.index_sizes)
+ self.add_pack_to_memory(a_new_pack)
+
+ def _iter_disk_pack_index(self):
+ """Iterate over the contents of the pack-names index.
+
+ This is used when loading the list from disk, and before writing to
+ detect updates from others during our write operation.
+ :return: An iterator of the index contents.
+ """
+ return self._index_class(self.transport, 'pack-names', None
+ ).iter_all_entries()
+
+ def _make_index(self, name, suffix, resume=False, is_chk=False):
+ size_offset = self._suffix_offsets[suffix]
+ index_name = name + suffix
+ if resume:
+ transport = self._upload_transport
+ index_size = transport.stat(index_name).st_size
+ else:
+ transport = self._index_transport
+ index_size = self._names[name][size_offset]
+ index = self._index_class(transport, index_name, index_size,
+ unlimited_cache=is_chk)
+ if is_chk and self._index_class is btree_index.BTreeGraphIndex:
+ index._leaf_factory = btree_index._gcchk_factory
+ return index
+
+ def _max_pack_count(self, total_revisions):
+ """Return the maximum number of packs to use for total revisions.
+
+ :param total_revisions: The total number of revisions in the
+ repository.
+ """
+ if not total_revisions:
+ return 1
+ digits = str(total_revisions)
+ result = 0
+ for digit in digits:
+ result += int(digit)
+ return result
+
+ def names(self):
+ """Provide an order to the underlying names."""
+ return sorted(self._names.keys())
+
+ def _obsolete_packs(self, packs):
+ """Move a number of packs which have been obsoleted out of the way.
+
+ Each pack and its associated indices are moved out of the way.
+
+ Note: for correctness this function should only be called after a new
+ pack names index has been written without these pack names, and with
+ the names of packs that contain the data previously available via these
+ packs.
+
+ :param packs: The packs to obsolete.
+ :param return: None.
+ """
+ for pack in packs:
+ try:
+ try:
+ pack.pack_transport.move(pack.file_name(),
+ '../obsolete_packs/' + pack.file_name())
+ except errors.NoSuchFile:
+ # perhaps obsolete_packs was removed? Let's create it and
+ # try again
+ try:
+ pack.pack_transport.mkdir('../obsolete_packs/')
+ except errors.FileExists:
+ pass
+ pack.pack_transport.move(pack.file_name(),
+ '../obsolete_packs/' + pack.file_name())
+ except (errors.PathError, errors.TransportError), e:
+ # TODO: Should these be warnings or mutters?
+ mutter("couldn't rename obsolete pack, skipping it:\n%s"
+ % (e,))
+ # TODO: Probably needs to know all possible indices for this pack
+ # - or maybe list the directory and move all indices matching this
+ # name whether we recognize it or not?
+ suffixes = ['.iix', '.six', '.tix', '.rix']
+ if self.chk_index is not None:
+ suffixes.append('.cix')
+ for suffix in suffixes:
+ try:
+ self._index_transport.move(pack.name + suffix,
+ '../obsolete_packs/' + pack.name + suffix)
+ except (errors.PathError, errors.TransportError), e:
+ mutter("couldn't rename obsolete index, skipping it:\n%s"
+ % (e,))
+
+ def pack_distribution(self, total_revisions):
+ """Generate a list of the number of revisions to put in each pack.
+
+ :param total_revisions: The total number of revisions in the
+ repository.
+ """
+ if total_revisions == 0:
+ return [0]
+ digits = reversed(str(total_revisions))
+ result = []
+ for exponent, count in enumerate(digits):
+ size = 10 ** exponent
+ for pos in range(int(count)):
+ result.append(size)
+ return list(reversed(result))
+
+ def _pack_tuple(self, name):
+ """Return a tuple with the transport and file name for a pack name."""
+ return self._pack_transport, name + '.pack'
+
+ def _remove_pack_from_memory(self, pack):
+ """Remove pack from the packs accessed by this repository.
+
+ Only affects memory state, until self._save_pack_names() is invoked.
+ """
+ self._names.pop(pack.name)
+ self._packs_by_name.pop(pack.name)
+ self._remove_pack_indices(pack)
+ self.packs.remove(pack)
+
+ def _remove_pack_indices(self, pack, ignore_missing=False):
+ """Remove the indices for pack from the aggregated indices.
+
+ :param ignore_missing: Suppress KeyErrors from calling remove_index.
+ """
+ for index_type in Pack.index_definitions.keys():
+ attr_name = index_type + '_index'
+ aggregate_index = getattr(self, attr_name)
+ if aggregate_index is not None:
+ pack_index = getattr(pack, attr_name)
+ try:
+ aggregate_index.remove_index(pack_index)
+ except KeyError:
+ if ignore_missing:
+ continue
+ raise
+
+ def reset(self):
+ """Clear all cached data."""
+ # cached revision data
+ self.revision_index.clear()
+ # cached signature data
+ self.signature_index.clear()
+ # cached file text data
+ self.text_index.clear()
+ # cached inventory data
+ self.inventory_index.clear()
+ # cached chk data
+ if self.chk_index is not None:
+ self.chk_index.clear()
+ # remove the open pack
+ self._new_pack = None
+ # information about packs.
+ self._names = None
+ self.packs = []
+ self._packs_by_name = {}
+ self._packs_at_load = None
+
+ def _unlock_names(self):
+ """Release the mutex around the pack-names index."""
+ self.repo.control_files.unlock()
+
+ def _diff_pack_names(self):
+ """Read the pack names from disk, and compare it to the one in memory.
+
+ :return: (disk_nodes, deleted_nodes, new_nodes)
+ disk_nodes The final set of nodes that should be referenced
+ deleted_nodes Nodes which have been removed from when we started
+ new_nodes Nodes that are newly introduced
+ """
+ # load the disk nodes across
+ disk_nodes = set()
+ for index, key, value in self._iter_disk_pack_index():
+ disk_nodes.add((key, value))
+ orig_disk_nodes = set(disk_nodes)
+
+ # do a two-way diff against our original content
+ current_nodes = set()
+ for name, sizes in self._names.iteritems():
+ current_nodes.add(
+ ((name, ), ' '.join(str(size) for size in sizes)))
+
+ # Packs no longer present in the repository, which were present when we
+ # locked the repository
+ deleted_nodes = self._packs_at_load - current_nodes
+ # Packs which this process is adding
+ new_nodes = current_nodes - self._packs_at_load
+
+ # Update the disk_nodes set to include the ones we are adding, and
+ # remove the ones which were removed by someone else
+ disk_nodes.difference_update(deleted_nodes)
+ disk_nodes.update(new_nodes)
+
+ return disk_nodes, deleted_nodes, new_nodes, orig_disk_nodes
+
+ def _syncronize_pack_names_from_disk_nodes(self, disk_nodes):
+ """Given the correct set of pack files, update our saved info.
+
+ :return: (removed, added, modified)
+ removed pack names removed from self._names
+ added pack names added to self._names
+ modified pack names that had changed value
+ """
+ removed = []
+ added = []
+ modified = []
+ ## self._packs_at_load = disk_nodes
+ new_names = dict(disk_nodes)
+ # drop no longer present nodes
+ for pack in self.all_packs():
+ if (pack.name,) not in new_names:
+ removed.append(pack.name)
+ self._remove_pack_from_memory(pack)
+ # add new nodes/refresh existing ones
+ for key, value in disk_nodes:
+ name = key[0]
+ sizes = self._parse_index_sizes(value)
+ if name in self._names:
+ # existing
+ if sizes != self._names[name]:
+ # the pack for name has had its indices replaced - rare but
+ # important to handle. XXX: probably can never happen today
+ # because the three-way merge code above does not handle it
+ # - you may end up adding the same key twice to the new
+ # disk index because the set values are the same, unless
+ # the only index shows up as deleted by the set difference
+ # - which it may. Until there is a specific test for this,
+ # assume it's broken. RBC 20071017.
+ self._remove_pack_from_memory(self.get_pack_by_name(name))
+ self._names[name] = sizes
+ self.get_pack_by_name(name)
+ modified.append(name)
+ else:
+ # new
+ self._names[name] = sizes
+ self.get_pack_by_name(name)
+ added.append(name)
+ return removed, added, modified
+
+ def _save_pack_names(self, clear_obsolete_packs=False, obsolete_packs=None):
+ """Save the list of packs.
+
+ This will take out the mutex around the pack names list for the
+ duration of the method call. If concurrent updates have been made, a
+ three-way merge between the current list and the current in memory list
+ is performed.
+
+ :param clear_obsolete_packs: If True, clear out the contents of the
+ obsolete_packs directory.
+ :param obsolete_packs: Packs that are obsolete once the new pack-names
+ file has been written.
+ :return: A list of the names saved that were not previously on disk.
+ """
+ already_obsolete = []
+ self.lock_names()
+ try:
+ builder = self._index_builder_class()
+ (disk_nodes, deleted_nodes, new_nodes,
+ orig_disk_nodes) = self._diff_pack_names()
+ # TODO: handle same-name, index-size-changes here -
+ # e.g. use the value from disk, not ours, *unless* we're the one
+ # changing it.
+ for key, value in disk_nodes:
+ builder.add_node(key, value)
+ self.transport.put_file('pack-names', builder.finish(),
+ mode=self.repo.bzrdir._get_file_mode())
+ self._packs_at_load = disk_nodes
+ if clear_obsolete_packs:
+ to_preserve = None
+ if obsolete_packs:
+ to_preserve = set([o.name for o in obsolete_packs])
+ already_obsolete = self._clear_obsolete_packs(to_preserve)
+ finally:
+ self._unlock_names()
+ # synchronise the memory packs list with what we just wrote:
+ self._syncronize_pack_names_from_disk_nodes(disk_nodes)
+ if obsolete_packs:
+ # TODO: We could add one more condition here. "if o.name not in
+ # orig_disk_nodes and o != the new_pack we haven't written to
+ # disk yet. However, the new pack object is not easily
+ # accessible here (it would have to be passed through the
+ # autopacking code, etc.)
+ obsolete_packs = [o for o in obsolete_packs
+ if o.name not in already_obsolete]
+ self._obsolete_packs(obsolete_packs)
+ return [new_node[0][0] for new_node in new_nodes]
+
+ def reload_pack_names(self):
+ """Sync our pack listing with what is present in the repository.
+
+ This should be called when we find out that something we thought was
+ present is now missing. This happens when another process re-packs the
+ repository, etc.
+
+ :return: True if the in-memory list of packs has been altered at all.
+ """
+ # The ensure_loaded call is to handle the case where the first call
+ # made involving the collection was to reload_pack_names, where we
+ # don't have a view of disk contents. It's a bit of a bandaid, and
+ # causes two reads of pack-names, but it's a rare corner case not
+ # struck with regular push/pull etc.
+ first_read = self.ensure_loaded()
+ if first_read:
+ return True
+ # out the new value.
+ (disk_nodes, deleted_nodes, new_nodes,
+ orig_disk_nodes) = self._diff_pack_names()
+ # _packs_at_load is meant to be the explicit list of names in
+ # 'pack-names' at then start. As such, it should not contain any
+ # pending names that haven't been written out yet.
+ self._packs_at_load = orig_disk_nodes
+ (removed, added,
+ modified) = self._syncronize_pack_names_from_disk_nodes(disk_nodes)
+ if removed or added or modified:
+ return True
+ return False
+
+ def _restart_autopack(self):
+ """Reload the pack names list, and restart the autopack code."""
+ if not self.reload_pack_names():
+ # Re-raise the original exception, because something went missing
+ # and a restart didn't find it
+ raise
+ raise errors.RetryAutopack(self.repo, False, sys.exc_info())
+
+ def _restart_pack_operations(self):
+ """Reload the pack names list, and restart the autopack code."""
+ if not self.reload_pack_names():
+ # Re-raise the original exception, because something went missing
+ # and a restart didn't find it
+ raise
+ raise RetryPackOperations(self.repo, False, sys.exc_info())
+
+ def _clear_obsolete_packs(self, preserve=None):
+ """Delete everything from the obsolete-packs directory.
+
+ :return: A list of pack identifiers (the filename without '.pack') that
+ were found in obsolete_packs.
+ """
+ found = []
+ obsolete_pack_transport = self.transport.clone('obsolete_packs')
+ if preserve is None:
+ preserve = set()
+ try:
+ obsolete_pack_files = obsolete_pack_transport.list_dir('.')
+ except errors.NoSuchFile:
+ return found
+ for filename in obsolete_pack_files:
+ name, ext = osutils.splitext(filename)
+ if ext == '.pack':
+ found.append(name)
+ if name in preserve:
+ continue
+ try:
+ obsolete_pack_transport.delete(filename)
+ except (errors.PathError, errors.TransportError), e:
+ warning("couldn't delete obsolete pack, skipping it:\n%s"
+ % (e,))
+ return found
+
+ def _start_write_group(self):
+ # Do not permit preparation for writing if we're not in a 'write lock'.
+ if not self.repo.is_write_locked():
+ raise errors.NotWriteLocked(self)
+ self._new_pack = self.pack_factory(self, upload_suffix='.pack',
+ file_mode=self.repo.bzrdir._get_file_mode())
+ # allow writing: queue writes to a new index
+ self.revision_index.add_writable_index(self._new_pack.revision_index,
+ self._new_pack)
+ self.inventory_index.add_writable_index(self._new_pack.inventory_index,
+ self._new_pack)
+ self.text_index.add_writable_index(self._new_pack.text_index,
+ self._new_pack)
+ self._new_pack.text_index.set_optimize(combine_backing_indices=False)
+ self.signature_index.add_writable_index(self._new_pack.signature_index,
+ self._new_pack)
+ if self.chk_index is not None:
+ self.chk_index.add_writable_index(self._new_pack.chk_index,
+ self._new_pack)
+ self.repo.chk_bytes._index._add_callback = self.chk_index.add_callback
+ self._new_pack.chk_index.set_optimize(combine_backing_indices=False)
+
+ self.repo.inventories._index._add_callback = self.inventory_index.add_callback
+ self.repo.revisions._index._add_callback = self.revision_index.add_callback
+ self.repo.signatures._index._add_callback = self.signature_index.add_callback
+ self.repo.texts._index._add_callback = self.text_index.add_callback
+
+ def _abort_write_group(self):
+ # FIXME: just drop the transient index.
+ # forget what names there are
+ if self._new_pack is not None:
+ operation = cleanup.OperationWithCleanups(self._new_pack.abort)
+ operation.add_cleanup(setattr, self, '_new_pack', None)
+ # If we aborted while in the middle of finishing the write
+ # group, _remove_pack_indices could fail because the indexes are
+ # already gone. But they're not there we shouldn't fail in this
+ # case, so we pass ignore_missing=True.
+ operation.add_cleanup(self._remove_pack_indices, self._new_pack,
+ ignore_missing=True)
+ operation.run_simple()
+ for resumed_pack in self._resumed_packs:
+ operation = cleanup.OperationWithCleanups(resumed_pack.abort)
+ # See comment in previous finally block.
+ operation.add_cleanup(self._remove_pack_indices, resumed_pack,
+ ignore_missing=True)
+ operation.run_simple()
+ del self._resumed_packs[:]
+
+ def _remove_resumed_pack_indices(self):
+ for resumed_pack in self._resumed_packs:
+ self._remove_pack_indices(resumed_pack)
+ del self._resumed_packs[:]
+
+ def _check_new_inventories(self):
+ """Detect missing inventories in this write group.
+
+ :returns: list of strs, summarising any problems found. If the list is
+ empty no problems were found.
+ """
+ # The base implementation does no checks. GCRepositoryPackCollection
+ # overrides this.
+ return []
+
+ def _commit_write_group(self):
+ all_missing = set()
+ for prefix, versioned_file in (
+ ('revisions', self.repo.revisions),
+ ('inventories', self.repo.inventories),
+ ('texts', self.repo.texts),
+ ('signatures', self.repo.signatures),
+ ):
+ missing = versioned_file.get_missing_compression_parent_keys()
+ all_missing.update([(prefix,) + key for key in missing])
+ if all_missing:
+ raise errors.BzrCheckError(
+ "Repository %s has missing compression parent(s) %r "
+ % (self.repo, sorted(all_missing)))
+ problems = self._check_new_inventories()
+ if problems:
+ problems_summary = '\n'.join(problems)
+ raise errors.BzrCheckError(
+ "Cannot add revision(s) to repository: " + problems_summary)
+ self._remove_pack_indices(self._new_pack)
+ any_new_content = False
+ if self._new_pack.data_inserted():
+ # get all the data to disk and read to use
+ self._new_pack.finish()
+ self.allocate(self._new_pack)
+ self._new_pack = None
+ any_new_content = True
+ else:
+ self._new_pack.abort()
+ self._new_pack = None
+ for resumed_pack in self._resumed_packs:
+ # XXX: this is a pretty ugly way to turn the resumed pack into a
+ # properly committed pack.
+ self._names[resumed_pack.name] = None
+ self._remove_pack_from_memory(resumed_pack)
+ resumed_pack.finish()
+ self.allocate(resumed_pack)
+ any_new_content = True
+ del self._resumed_packs[:]
+ if any_new_content:
+ result = self.autopack()
+ if not result:
+ # when autopack takes no steps, the names list is still
+ # unsaved.
+ return self._save_pack_names()
+ return result
+ return []
+
+ def _suspend_write_group(self):
+ tokens = [pack.name for pack in self._resumed_packs]
+ self._remove_pack_indices(self._new_pack)
+ if self._new_pack.data_inserted():
+ # get all the data to disk and read to use
+ self._new_pack.finish(suspend=True)
+ tokens.append(self._new_pack.name)
+ self._new_pack = None
+ else:
+ self._new_pack.abort()
+ self._new_pack = None
+ self._remove_resumed_pack_indices()
+ return tokens
+
+ def _resume_write_group(self, tokens):
+ for token in tokens:
+ self._resume_pack(token)
+
+
+class PackRepository(MetaDirVersionedFileRepository):
+ """Repository with knit objects stored inside pack containers.
+
+ The layering for a KnitPackRepository is:
+
+ Graph | HPSS | Repository public layer |
+ ===================================================
+ Tuple based apis below, string based, and key based apis above
+ ---------------------------------------------------
+ VersionedFiles
+ Provides .texts, .revisions etc
+ This adapts the N-tuple keys to physical knit records which only have a
+ single string identifier (for historical reasons), which in older formats
+ was always the revision_id, and in the mapped code for packs is always
+ the last element of key tuples.
+ ---------------------------------------------------
+ GraphIndex
+ A separate GraphIndex is used for each of the
+ texts/inventories/revisions/signatures contained within each individual
+ pack file. The GraphIndex layer works in N-tuples and is unaware of any
+ semantic value.
+ ===================================================
+
+ """
+
+ # These attributes are inherited from the Repository base class. Setting
+ # them to None ensures that if the constructor is changed to not initialize
+ # them, or a subclass fails to call the constructor, that an error will
+ # occur rather than the system working but generating incorrect data.
+ _commit_builder_class = None
+ _serializer = None
+
+ def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
+ _serializer):
+ MetaDirRepository.__init__(self, _format, a_bzrdir, control_files)
+ self._commit_builder_class = _commit_builder_class
+ self._serializer = _serializer
+ self._reconcile_fixes_text_parents = True
+ if self._format.supports_external_lookups:
+ self._unstacked_provider = graph.CachingParentsProvider(
+ self._make_parents_provider_unstacked())
+ else:
+ self._unstacked_provider = graph.CachingParentsProvider(self)
+ self._unstacked_provider.disable_cache()
+
+ @needs_read_lock
+ def _all_revision_ids(self):
+ """See Repository.all_revision_ids()."""
+ return [key[0] for key in self.revisions.keys()]
+
+ def _abort_write_group(self):
+ self.revisions._index._key_dependencies.clear()
+ self._pack_collection._abort_write_group()
+
+ def _make_parents_provider(self):
+ if not self._format.supports_external_lookups:
+ return self._unstacked_provider
+ return graph.StackedParentsProvider(_LazyListJoin(
+ [self._unstacked_provider], self._fallback_repositories))
+
+ def _refresh_data(self):
+ if not self.is_locked():
+ return
+ self._pack_collection.reload_pack_names()
+ self._unstacked_provider.disable_cache()
+ self._unstacked_provider.enable_cache()
+
+ def _start_write_group(self):
+ self._pack_collection._start_write_group()
+
+ def _commit_write_group(self):
+ hint = self._pack_collection._commit_write_group()
+ self.revisions._index._key_dependencies.clear()
+ # The commit may have added keys that were previously cached as
+ # missing, so reset the cache.
+ self._unstacked_provider.disable_cache()
+ self._unstacked_provider.enable_cache()
+ return hint
+
+ def suspend_write_group(self):
+ # XXX check self._write_group is self.get_transaction()?
+ tokens = self._pack_collection._suspend_write_group()
+ self.revisions._index._key_dependencies.clear()
+ self._write_group = None
+ return tokens
+
+ def _resume_write_group(self, tokens):
+ self._start_write_group()
+ try:
+ self._pack_collection._resume_write_group(tokens)
+ except errors.UnresumableWriteGroup:
+ self._abort_write_group()
+ raise
+ for pack in self._pack_collection._resumed_packs:
+ self.revisions._index.scan_unvalidated_index(pack.revision_index)
+
+ def get_transaction(self):
+ if self._write_lock_count:
+ return self._transaction
+ else:
+ return self.control_files.get_transaction()
+
+ def is_locked(self):
+ return self._write_lock_count or self.control_files.is_locked()
+
+ def is_write_locked(self):
+ return self._write_lock_count
+
+ def lock_write(self, token=None):
+ """Lock the repository for writes.
+
+ :return: A bzrlib.repository.RepositoryWriteLockResult.
+ """
+ locked = self.is_locked()
+ if not self._write_lock_count and locked:
+ raise errors.ReadOnlyError(self)
+ self._write_lock_count += 1
+ if self._write_lock_count == 1:
+ self._transaction = transactions.WriteTransaction()
+ if not locked:
+ if 'relock' in debug.debug_flags and self._prev_lock == 'w':
+ note('%r was write locked again', self)
+ self._prev_lock = 'w'
+ self._unstacked_provider.enable_cache()
+ for repo in self._fallback_repositories:
+ # Writes don't affect fallback repos
+ repo.lock_read()
+ self._refresh_data()
+ return RepositoryWriteLockResult(self.unlock, None)
+
+ def lock_read(self):
+ """Lock the repository for reads.
+
+ :return: A bzrlib.lock.LogicalLockResult.
+ """
+ locked = self.is_locked()
+ if self._write_lock_count:
+ self._write_lock_count += 1
+ else:
+ self.control_files.lock_read()
+ if not locked:
+ if 'relock' in debug.debug_flags and self._prev_lock == 'r':
+ note('%r was read locked again', self)
+ self._prev_lock = 'r'
+ self._unstacked_provider.enable_cache()
+ for repo in self._fallback_repositories:
+ repo.lock_read()
+ self._refresh_data()
+ return LogicalLockResult(self.unlock)
+
+ def leave_lock_in_place(self):
+ # not supported - raise an error
+ raise NotImplementedError(self.leave_lock_in_place)
+
+ def dont_leave_lock_in_place(self):
+ # not supported - raise an error
+ raise NotImplementedError(self.dont_leave_lock_in_place)
+
+ @needs_write_lock
+ def pack(self, hint=None, clean_obsolete_packs=False):
+ """Compress the data within the repository.
+
+ This will pack all the data to a single pack. In future it may
+ recompress deltas or do other such expensive operations.
+ """
+ self._pack_collection.pack(hint=hint, clean_obsolete_packs=clean_obsolete_packs)
+
+ @needs_write_lock
+ def reconcile(self, other=None, thorough=False):
+ """Reconcile this repository."""
+ from bzrlib.reconcile import PackReconciler
+ reconciler = PackReconciler(self, thorough=thorough)
+ reconciler.reconcile()
+ return reconciler
+
+ def _reconcile_pack(self, collection, packs, extension, revs, pb):
+ raise NotImplementedError(self._reconcile_pack)
+
+ @only_raises(errors.LockNotHeld, errors.LockBroken)
+ def unlock(self):
+ if self._write_lock_count == 1 and self._write_group is not None:
+ self.abort_write_group()
+ self._unstacked_provider.disable_cache()
+ self._transaction = None
+ self._write_lock_count = 0
+ raise errors.BzrError(
+ 'Must end write group before releasing write lock on %s'
+ % self)
+ if self._write_lock_count:
+ self._write_lock_count -= 1
+ if not self._write_lock_count:
+ transaction = self._transaction
+ self._transaction = None
+ transaction.finish()
+ else:
+ self.control_files.unlock()
+
+ if not self.is_locked():
+ self._unstacked_provider.disable_cache()
+ for repo in self._fallback_repositories:
+ repo.unlock()
+
+
+class RepositoryFormatPack(MetaDirVersionedFileRepositoryFormat):
+ """Format logic for pack structured repositories.
+
+ This repository format has:
+ - a list of packs in pack-names
+ - packs in packs/NAME.pack
+ - indices in indices/NAME.{iix,six,tix,rix}
+ - knit deltas in the packs, knit indices mapped to the indices.
+ - thunk objects to support the knits programming API.
+ - a format marker of its own
+ - an optional 'shared-storage' flag
+ - an optional 'no-working-trees' flag
+ - a LockDir lock
+ """
+
+ # Set this attribute in derived classes to control the repository class
+ # created by open and initialize.
+ repository_class = None
+ # Set this attribute in derived classes to control the
+ # _commit_builder_class that the repository objects will have passed to
+ # their constructor.
+ _commit_builder_class = None
+ # Set this attribute in derived clases to control the _serializer that the
+ # repository objects will have passed to their constructor.
+ _serializer = None
+ # Packs are not confused by ghosts.
+ supports_ghosts = True
+ # External references are not supported in pack repositories yet.
+ supports_external_lookups = False
+ # Most pack formats do not use chk lookups.
+ supports_chks = False
+ # What index classes to use
+ index_builder_class = None
+ index_class = None
+ _fetch_uses_deltas = True
+ fast_deltas = False
+ supports_funky_characters = True
+ revision_graph_can_have_wrong_parents = True
+
+ def initialize(self, a_bzrdir, shared=False):
+ """Create a pack based repository.
+
+ :param a_bzrdir: bzrdir to contain the new repository; must already
+ be initialized.
+ :param shared: If true the repository will be initialized as a shared
+ repository.
+ """
+ mutter('creating repository in %s.', a_bzrdir.transport.base)
+ dirs = ['indices', 'obsolete_packs', 'packs', 'upload']
+ builder = self.index_builder_class()
+ files = [('pack-names', builder.finish())]
+ utf8_files = [('format', self.get_format_string())]
+
+ self._upload_blank_content(a_bzrdir, dirs, files, utf8_files, shared)
+ repository = self.open(a_bzrdir=a_bzrdir, _found=True)
+ self._run_post_repo_init_hooks(repository, a_bzrdir, shared)
+ return repository
+
+ def open(self, a_bzrdir, _found=False, _override_transport=None):
+ """See RepositoryFormat.open().
+
+ :param _override_transport: INTERNAL USE ONLY. Allows opening the
+ repository at a slightly different url
+ than normal. I.e. during 'upgrade'.
+ """
+ if not _found:
+ format = RepositoryFormatMetaDir.find_format(a_bzrdir)
+ if _override_transport is not None:
+ repo_transport = _override_transport
+ else:
+ repo_transport = a_bzrdir.get_repository_transport(None)
+ control_files = lockable_files.LockableFiles(repo_transport,
+ 'lock', lockdir.LockDir)
+ return self.repository_class(_format=self,
+ a_bzrdir=a_bzrdir,
+ control_files=control_files,
+ _commit_builder_class=self._commit_builder_class,
+ _serializer=self._serializer)
+
+
+class RetryPackOperations(errors.RetryWithNewPacks):
+ """Raised when we are packing and we find a missing file.
+
+ Meant as a signaling exception, to tell the RepositoryPackCollection.pack
+ code it should try again.
+ """
+
+ internal_error = True
+
+ _fmt = ("Pack files have changed, reload and try pack again."
+ " context: %(context)s %(orig_error)s")
+
+
+class _DirectPackAccess(object):
+ """Access to data in one or more packs with less translation."""
+
+ def __init__(self, index_to_packs, reload_func=None, flush_func=None):
+ """Create a _DirectPackAccess object.
+
+ :param index_to_packs: A dict mapping index objects to the transport
+ and file names for obtaining data.
+ :param reload_func: A function to call if we determine that the pack
+ files have moved and we need to reload our caches. See
+ bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
+ """
+ self._container_writer = None
+ self._write_index = None
+ self._indices = index_to_packs
+ self._reload_func = reload_func
+ self._flush_func = flush_func
+
+ def add_raw_records(self, key_sizes, raw_data):
+ """Add raw knit bytes to a storage area.
+
+ The data is spooled to the container writer in one bytes-record per
+ raw data item.
+
+ :param sizes: An iterable of tuples containing the key and size of each
+ raw data segment.
+ :param raw_data: A bytestring containing the data.
+ :return: A list of memos to retrieve the record later. Each memo is an
+ opaque index memo. For _DirectPackAccess the memo is (index, pos,
+ length), where the index field is the write_index object supplied
+ to the PackAccess object.
+ """
+ if type(raw_data) is not str:
+ raise AssertionError(
+ 'data must be plain bytes was %s' % type(raw_data))
+ result = []
+ offset = 0
+ for key, size in key_sizes:
+ p_offset, p_length = self._container_writer.add_bytes_record(
+ raw_data[offset:offset+size], [])
+ offset += size
+ result.append((self._write_index, p_offset, p_length))
+ return result
+
+ def flush(self):
+ """Flush pending writes on this access object.
+
+ This will flush any buffered writes to a NewPack.
+ """
+ if self._flush_func is not None:
+ self._flush_func()
+
+ def get_raw_records(self, memos_for_retrieval):
+ """Get the raw bytes for a records.
+
+ :param memos_for_retrieval: An iterable containing the (index, pos,
+ length) memo for retrieving the bytes. The Pack access method
+ looks up the pack to use for a given record in its index_to_pack
+ map.
+ :return: An iterator over the bytes of the records.
+ """
+ # first pass, group into same-index requests
+ request_lists = []
+ current_index = None
+ for (index, offset, length) in memos_for_retrieval:
+ if current_index == index:
+ current_list.append((offset, length))
+ else:
+ if current_index is not None:
+ request_lists.append((current_index, current_list))
+ current_index = index
+ current_list = [(offset, length)]
+ # handle the last entry
+ if current_index is not None:
+ request_lists.append((current_index, current_list))
+ for index, offsets in request_lists:
+ try:
+ transport, path = self._indices[index]
+ except KeyError:
+ # A KeyError here indicates that someone has triggered an index
+ # reload, and this index has gone missing, we need to start
+ # over.
+ if self._reload_func is None:
+ # If we don't have a _reload_func there is nothing that can
+ # be done
+ raise
+ raise errors.RetryWithNewPacks(index,
+ reload_occurred=True,
+ exc_info=sys.exc_info())
+ try:
+ reader = pack.make_readv_reader(transport, path, offsets)
+ for names, read_func in reader.iter_records():
+ yield read_func(None)
+ except errors.NoSuchFile:
+ # A NoSuchFile error indicates that a pack file has gone
+ # missing on disk, we need to trigger a reload, and start over.
+ if self._reload_func is None:
+ raise
+ raise errors.RetryWithNewPacks(transport.abspath(path),
+ reload_occurred=False,
+ exc_info=sys.exc_info())
+
+ def set_writer(self, writer, index, transport_packname):
+ """Set a writer to use for adding data."""
+ if index is not None:
+ self._indices[index] = transport_packname
+ self._container_writer = writer
+ self._write_index = index
+
+ def reload_or_raise(self, retry_exc):
+ """Try calling the reload function, or re-raise the original exception.
+
+ This should be called after _DirectPackAccess raises a
+ RetryWithNewPacks exception. This function will handle the common logic
+ of determining when the error is fatal versus being temporary.
+ It will also make sure that the original exception is raised, rather
+ than the RetryWithNewPacks exception.
+
+ If this function returns, then the calling function should retry
+ whatever operation was being performed. Otherwise an exception will
+ be raised.
+
+ :param retry_exc: A RetryWithNewPacks exception.
+ """
+ is_error = False
+ if self._reload_func is None:
+ is_error = True
+ elif not self._reload_func():
+ # The reload claimed that nothing changed
+ if not retry_exc.reload_occurred:
+ # If there wasn't an earlier reload, then we really were
+ # expecting to find changes. We didn't find them, so this is a
+ # hard error
+ is_error = True
+ if is_error:
+ exc_class, exc_value, exc_traceback = retry_exc.exc_info
+ raise exc_class, exc_value, exc_traceback
+
+
+