diff options
author | Lorry <lorry@roadtrain.codethink.co.uk> | 2012-08-22 15:47:16 +0100 |
---|---|---|
committer | Lorry <lorry@roadtrain.codethink.co.uk> | 2012-08-22 15:47:16 +0100 |
commit | 25335618bf8755ce6b116ee14f47f5a1f2c821e9 (patch) | |
tree | d889d7ab3f9f985d0c54c534cb8052bd2e6d7163 /bzrlib/repofmt | |
download | bzr-tarball-25335618bf8755ce6b116ee14f47f5a1f2c821e9.tar.gz |
Tarball conversion
Diffstat (limited to 'bzrlib/repofmt')
-rw-r--r-- | bzrlib/repofmt/__init__.py | 20 | ||||
-rw-r--r-- | bzrlib/repofmt/groupcompress_repo.py | 1426 | ||||
-rw-r--r-- | bzrlib/repofmt/knitpack_repo.py | 1156 | ||||
-rw-r--r-- | bzrlib/repofmt/knitrepo.py | 522 | ||||
-rw-r--r-- | bzrlib/repofmt/pack_repo.py | 2091 |
5 files changed, 5215 insertions, 0 deletions
diff --git a/bzrlib/repofmt/__init__.py b/bzrlib/repofmt/__init__.py new file mode 100644 index 0000000..afe2457 --- /dev/null +++ b/bzrlib/repofmt/__init__.py @@ -0,0 +1,20 @@ +# Copyright (C) 2007 Canonical Ltd +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +"""Repository formats""" + +from __future__ import absolute_import + diff --git a/bzrlib/repofmt/groupcompress_repo.py b/bzrlib/repofmt/groupcompress_repo.py new file mode 100644 index 0000000..3a088f5 --- /dev/null +++ b/bzrlib/repofmt/groupcompress_repo.py @@ -0,0 +1,1426 @@ +# Copyright (C) 2008-2011 Canonical Ltd +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +"""Repository formats using CHK inventories and groupcompress compression.""" + +from __future__ import absolute_import + +import time + +from bzrlib import ( + controldir, + chk_map, + chk_serializer, + debug, + errors, + index as _mod_index, + inventory, + osutils, + pack, + revision as _mod_revision, + trace, + ui, + versionedfile, + ) +from bzrlib.btree_index import ( + BTreeGraphIndex, + BTreeBuilder, + ) +from bzrlib.decorators import needs_write_lock +from bzrlib.groupcompress import ( + _GCGraphIndex, + GroupCompressVersionedFiles, + ) +from bzrlib.repofmt.pack_repo import ( + _DirectPackAccess, + Pack, + NewPack, + PackRepository, + PackRootCommitBuilder, + RepositoryPackCollection, + RepositoryFormatPack, + ResumedPack, + Packer, + ) +from bzrlib.vf_repository import ( + StreamSource, + ) +from bzrlib.static_tuple import StaticTuple + + +class GCPack(NewPack): + + def __init__(self, pack_collection, upload_suffix='', file_mode=None): + """Create a NewPack instance. + + :param pack_collection: A PackCollection into which this is being + inserted. + :param upload_suffix: An optional suffix to be given to any temporary + files created during the pack creation. e.g '.autopack' + :param file_mode: An optional file mode to create the new files with. + """ + # replaced from NewPack to: + # - change inventory reference list length to 1 + # - change texts reference lists to 1 + # TODO: patch this to be parameterised + + # The relative locations of the packs are constrained, but all are + # passed in because the caller has them, so as to avoid object churn. + index_builder_class = pack_collection._index_builder_class + # from brisbane-core + if pack_collection.chk_index is not None: + chk_index = index_builder_class(reference_lists=0) + else: + chk_index = None + Pack.__init__(self, + # Revisions: parents list, no text compression. + index_builder_class(reference_lists=1), + # Inventory: We want to map compression only, but currently the + # knit code hasn't been updated enough to understand that, so we + # have a regular 2-list index giving parents and compression + # source. + index_builder_class(reference_lists=1), + # Texts: per file graph, for all fileids - so one reference list + # and two elements in the key tuple. + index_builder_class(reference_lists=1, key_elements=2), + # Signatures: Just blobs to store, no compression, no parents + # listing. + index_builder_class(reference_lists=0), + # CHK based storage - just blobs, no compression or parents. + chk_index=chk_index + ) + self._pack_collection = pack_collection + # When we make readonly indices, we need this. + self.index_class = pack_collection._index_class + # where should the new pack be opened + self.upload_transport = pack_collection._upload_transport + # where are indices written out to + self.index_transport = pack_collection._index_transport + # where is the pack renamed to when it is finished? + self.pack_transport = pack_collection._pack_transport + # What file mode to upload the pack and indices with. + self._file_mode = file_mode + # tracks the content written to the .pack file. + self._hash = osutils.md5() + # a four-tuple with the length in bytes of the indices, once the pack + # is finalised. (rev, inv, text, sigs) + self.index_sizes = None + # How much data to cache when writing packs. Note that this is not + # synchronised with reads, because it's not in the transport layer, so + # is not safe unless the client knows it won't be reading from the pack + # under creation. + self._cache_limit = 0 + # the temporary pack file name. + self.random_name = osutils.rand_chars(20) + upload_suffix + # when was this pack started ? + self.start_time = time.time() + # open an output stream for the data added to the pack. + self.write_stream = self.upload_transport.open_write_stream( + self.random_name, mode=self._file_mode) + if 'pack' in debug.debug_flags: + trace.mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs', + time.ctime(), self.upload_transport.base, self.random_name, + time.time() - self.start_time) + # A list of byte sequences to be written to the new pack, and the + # aggregate size of them. Stored as a list rather than separate + # variables so that the _write_data closure below can update them. + self._buffer = [[], 0] + # create a callable for adding data + # + # robertc says- this is a closure rather than a method on the object + # so that the variables are locals, and faster than accessing object + # members. + def _write_data(bytes, flush=False, _buffer=self._buffer, + _write=self.write_stream.write, _update=self._hash.update): + _buffer[0].append(bytes) + _buffer[1] += len(bytes) + # buffer cap + if _buffer[1] > self._cache_limit or flush: + bytes = ''.join(_buffer[0]) + _write(bytes) + _update(bytes) + _buffer[:] = [[], 0] + # expose this on self, for the occasion when clients want to add data. + self._write_data = _write_data + # a pack writer object to serialise pack records. + self._writer = pack.ContainerWriter(self._write_data) + self._writer.begin() + # what state is the pack in? (open, finished, aborted) + self._state = 'open' + # no name until we finish writing the content + self.name = None + + def _check_references(self): + """Make sure our external references are present. + + Packs are allowed to have deltas whose base is not in the pack, but it + must be present somewhere in this collection. It is not allowed to + have deltas based on a fallback repository. + (See <https://bugs.launchpad.net/bzr/+bug/288751>) + """ + # Groupcompress packs don't have any external references, arguably CHK + # pages have external references, but we cannot 'cheaply' determine + # them without actually walking all of the chk pages. + + +class ResumedGCPack(ResumedPack): + + def _check_references(self): + """Make sure our external compression parents are present.""" + # See GCPack._check_references for why this is empty + + def _get_external_refs(self, index): + # GC repositories don't have compression parents external to a given + # pack file + return set() + + +class GCCHKPacker(Packer): + """This class understand what it takes to collect a GCCHK repo.""" + + def __init__(self, pack_collection, packs, suffix, revision_ids=None, + reload_func=None): + super(GCCHKPacker, self).__init__(pack_collection, packs, suffix, + revision_ids=revision_ids, + reload_func=reload_func) + self._pack_collection = pack_collection + # ATM, We only support this for GCCHK repositories + if pack_collection.chk_index is None: + raise AssertionError('pack_collection.chk_index should not be None') + self._gather_text_refs = False + self._chk_id_roots = [] + self._chk_p_id_roots = [] + self._text_refs = None + # set by .pack() if self.revision_ids is not None + self.revision_keys = None + + def _get_progress_stream(self, source_vf, keys, message, pb): + def pb_stream(): + substream = source_vf.get_record_stream(keys, 'groupcompress', True) + for idx, record in enumerate(substream): + if pb is not None: + pb.update(message, idx + 1, len(keys)) + yield record + return pb_stream() + + def _get_filtered_inv_stream(self, source_vf, keys, message, pb=None): + """Filter the texts of inventories, to find the chk pages.""" + total_keys = len(keys) + def _filtered_inv_stream(): + id_roots_set = set() + p_id_roots_set = set() + stream = source_vf.get_record_stream(keys, 'groupcompress', True) + for idx, record in enumerate(stream): + # Inventories should always be with revisions; assume success. + bytes = record.get_bytes_as('fulltext') + chk_inv = inventory.CHKInventory.deserialise(None, bytes, + record.key) + if pb is not None: + pb.update('inv', idx, total_keys) + key = chk_inv.id_to_entry.key() + if key not in id_roots_set: + self._chk_id_roots.append(key) + id_roots_set.add(key) + p_id_map = chk_inv.parent_id_basename_to_file_id + if p_id_map is None: + raise AssertionError('Parent id -> file_id map not set') + key = p_id_map.key() + if key not in p_id_roots_set: + p_id_roots_set.add(key) + self._chk_p_id_roots.append(key) + yield record + # We have finished processing all of the inventory records, we + # don't need these sets anymore + id_roots_set.clear() + p_id_roots_set.clear() + return _filtered_inv_stream() + + def _get_chk_streams(self, source_vf, keys, pb=None): + # We want to stream the keys from 'id_roots', and things they + # reference, and then stream things from p_id_roots and things they + # reference, and then any remaining keys that we didn't get to. + + # We also group referenced texts together, so if one root references a + # text with prefix 'a', and another root references a node with prefix + # 'a', we want to yield those nodes before we yield the nodes for 'b' + # This keeps 'similar' nodes together. + + # Note: We probably actually want multiple streams here, to help the + # client understand that the different levels won't compress well + # against each other. + # Test the difference between using one Group per level, and + # using 1 Group per prefix. (so '' (root) would get a group, then + # all the references to search-key 'a' would get a group, etc.) + total_keys = len(keys) + remaining_keys = set(keys) + counter = [0] + if self._gather_text_refs: + self._text_refs = set() + def _get_referenced_stream(root_keys, parse_leaf_nodes=False): + cur_keys = root_keys + while cur_keys: + keys_by_search_prefix = {} + remaining_keys.difference_update(cur_keys) + next_keys = set() + def handle_internal_node(node): + for prefix, value in node._items.iteritems(): + # We don't want to request the same key twice, and we + # want to order it by the first time it is seen. + # Even further, we don't want to request a key which is + # not in this group of pack files (it should be in the + # repo, but it doesn't have to be in the group being + # packed.) + # TODO: consider how to treat externally referenced chk + # pages as 'external_references' so that we + # always fill them in for stacked branches + if value not in next_keys and value in remaining_keys: + keys_by_search_prefix.setdefault(prefix, + []).append(value) + next_keys.add(value) + def handle_leaf_node(node): + # Store is None, because we know we have a LeafNode, and we + # just want its entries + for file_id, bytes in node.iteritems(None): + self._text_refs.add(chk_map._bytes_to_text_key(bytes)) + def next_stream(): + stream = source_vf.get_record_stream(cur_keys, + 'as-requested', True) + for record in stream: + if record.storage_kind == 'absent': + # An absent CHK record: we assume that the missing + # record is in a different pack - e.g. a page not + # altered by the commit we're packing. + continue + bytes = record.get_bytes_as('fulltext') + # We don't care about search_key_func for this code, + # because we only care about external references. + node = chk_map._deserialise(bytes, record.key, + search_key_func=None) + common_base = node._search_prefix + if isinstance(node, chk_map.InternalNode): + handle_internal_node(node) + elif parse_leaf_nodes: + handle_leaf_node(node) + counter[0] += 1 + if pb is not None: + pb.update('chk node', counter[0], total_keys) + yield record + yield next_stream() + # Double check that we won't be emitting any keys twice + # If we get rid of the pre-calculation of all keys, we could + # turn this around and do + # next_keys.difference_update(seen_keys) + # However, we also may have references to chk pages in another + # pack file during autopack. We filter earlier, so we should no + # longer need to do this + # next_keys = next_keys.intersection(remaining_keys) + cur_keys = [] + for prefix in sorted(keys_by_search_prefix): + cur_keys.extend(keys_by_search_prefix.pop(prefix)) + for stream in _get_referenced_stream(self._chk_id_roots, + self._gather_text_refs): + yield stream + del self._chk_id_roots + # while it isn't really possible for chk_id_roots to not be in the + # local group of packs, it is possible that the tree shape has not + # changed recently, so we need to filter _chk_p_id_roots by the + # available keys + chk_p_id_roots = [key for key in self._chk_p_id_roots + if key in remaining_keys] + del self._chk_p_id_roots + for stream in _get_referenced_stream(chk_p_id_roots, False): + yield stream + if remaining_keys: + trace.mutter('There were %d keys in the chk index, %d of which' + ' were not referenced', total_keys, + len(remaining_keys)) + if self.revision_ids is None: + stream = source_vf.get_record_stream(remaining_keys, + 'unordered', True) + yield stream + + def _build_vf(self, index_name, parents, delta, for_write=False): + """Build a VersionedFiles instance on top of this group of packs.""" + index_name = index_name + '_index' + index_to_pack = {} + access = _DirectPackAccess(index_to_pack, + reload_func=self._reload_func) + if for_write: + # Use new_pack + if self.new_pack is None: + raise AssertionError('No new pack has been set') + index = getattr(self.new_pack, index_name) + index_to_pack[index] = self.new_pack.access_tuple() + index.set_optimize(for_size=True) + access.set_writer(self.new_pack._writer, index, + self.new_pack.access_tuple()) + add_callback = index.add_nodes + else: + indices = [] + for pack in self.packs: + sub_index = getattr(pack, index_name) + index_to_pack[sub_index] = pack.access_tuple() + indices.append(sub_index) + index = _mod_index.CombinedGraphIndex(indices) + add_callback = None + vf = GroupCompressVersionedFiles( + _GCGraphIndex(index, + add_callback=add_callback, + parents=parents, + is_locked=self._pack_collection.repo.is_locked), + access=access, + delta=delta) + return vf + + def _build_vfs(self, index_name, parents, delta): + """Build the source and target VersionedFiles.""" + source_vf = self._build_vf(index_name, parents, + delta, for_write=False) + target_vf = self._build_vf(index_name, parents, + delta, for_write=True) + return source_vf, target_vf + + def _copy_stream(self, source_vf, target_vf, keys, message, vf_to_stream, + pb_offset): + trace.mutter('repacking %d %s', len(keys), message) + self.pb.update('repacking %s' % (message,), pb_offset) + child_pb = ui.ui_factory.nested_progress_bar() + try: + stream = vf_to_stream(source_vf, keys, message, child_pb) + for _ in target_vf._insert_record_stream(stream, + random_id=True, + reuse_blocks=False): + pass + finally: + child_pb.finished() + + def _copy_revision_texts(self): + source_vf, target_vf = self._build_vfs('revision', True, False) + if not self.revision_keys: + # We are doing a full fetch, aka 'pack' + self.revision_keys = source_vf.keys() + self._copy_stream(source_vf, target_vf, self.revision_keys, + 'revisions', self._get_progress_stream, 1) + + def _copy_inventory_texts(self): + source_vf, target_vf = self._build_vfs('inventory', True, True) + # It is not sufficient to just use self.revision_keys, as stacked + # repositories can have more inventories than they have revisions. + # One alternative would be to do something with + # get_parent_map(self.revision_keys), but that shouldn't be any faster + # than this. + inventory_keys = source_vf.keys() + missing_inventories = set(self.revision_keys).difference(inventory_keys) + if missing_inventories: + # Go back to the original repo, to see if these are really missing + # https://bugs.launchpad.net/bzr/+bug/437003 + # If we are packing a subset of the repo, it is fine to just have + # the data in another Pack file, which is not included in this pack + # operation. + inv_index = self._pack_collection.repo.inventories._index + pmap = inv_index.get_parent_map(missing_inventories) + really_missing = missing_inventories.difference(pmap) + if really_missing: + missing_inventories = sorted(really_missing) + raise ValueError('We are missing inventories for revisions: %s' + % (missing_inventories,)) + self._copy_stream(source_vf, target_vf, inventory_keys, + 'inventories', self._get_filtered_inv_stream, 2) + + def _get_chk_vfs_for_copy(self): + return self._build_vfs('chk', False, False) + + def _copy_chk_texts(self): + source_vf, target_vf = self._get_chk_vfs_for_copy() + # TODO: This is technically spurious... if it is a performance issue, + # remove it + total_keys = source_vf.keys() + trace.mutter('repacking chk: %d id_to_entry roots,' + ' %d p_id_map roots, %d total keys', + len(self._chk_id_roots), len(self._chk_p_id_roots), + len(total_keys)) + self.pb.update('repacking chk', 3) + child_pb = ui.ui_factory.nested_progress_bar() + try: + for stream in self._get_chk_streams(source_vf, total_keys, + pb=child_pb): + for _ in target_vf._insert_record_stream(stream, + random_id=True, + reuse_blocks=False): + pass + finally: + child_pb.finished() + + def _copy_text_texts(self): + source_vf, target_vf = self._build_vfs('text', True, True) + # XXX: We don't walk the chk map to determine referenced (file_id, + # revision_id) keys. We don't do it yet because you really need + # to filter out the ones that are present in the parents of the + # rev just before the ones you are copying, otherwise the filter + # is grabbing too many keys... + text_keys = source_vf.keys() + self._copy_stream(source_vf, target_vf, text_keys, + 'texts', self._get_progress_stream, 4) + + def _copy_signature_texts(self): + source_vf, target_vf = self._build_vfs('signature', False, False) + signature_keys = source_vf.keys() + signature_keys.intersection(self.revision_keys) + self._copy_stream(source_vf, target_vf, signature_keys, + 'signatures', self._get_progress_stream, 5) + + def _create_pack_from_packs(self): + self.pb.update('repacking', 0, 7) + self.new_pack = self.open_pack() + # Is this necessary for GC ? + self.new_pack.set_write_cache_size(1024*1024) + self._copy_revision_texts() + self._copy_inventory_texts() + self._copy_chk_texts() + self._copy_text_texts() + self._copy_signature_texts() + self.new_pack._check_references() + if not self._use_pack(self.new_pack): + self.new_pack.abort() + return None + self.new_pack.finish_content() + if len(self.packs) == 1: + old_pack = self.packs[0] + if old_pack.name == self.new_pack._hash.hexdigest(): + # The single old pack was already optimally packed. + trace.mutter('single pack %s was already optimally packed', + old_pack.name) + self.new_pack.abort() + return None + self.pb.update('finishing repack', 6, 7) + self.new_pack.finish() + self._pack_collection.allocate(self.new_pack) + return self.new_pack + + +class GCCHKReconcilePacker(GCCHKPacker): + """A packer which regenerates indices etc as it copies. + + This is used by ``bzr reconcile`` to cause parent text pointers to be + regenerated. + """ + + def __init__(self, *args, **kwargs): + super(GCCHKReconcilePacker, self).__init__(*args, **kwargs) + self._data_changed = False + self._gather_text_refs = True + + def _copy_inventory_texts(self): + source_vf, target_vf = self._build_vfs('inventory', True, True) + self._copy_stream(source_vf, target_vf, self.revision_keys, + 'inventories', self._get_filtered_inv_stream, 2) + if source_vf.keys() != self.revision_keys: + self._data_changed = True + + def _copy_text_texts(self): + """generate what texts we should have and then copy.""" + source_vf, target_vf = self._build_vfs('text', True, True) + trace.mutter('repacking %d texts', len(self._text_refs)) + self.pb.update("repacking texts", 4) + # we have three major tasks here: + # 1) generate the ideal index + repo = self._pack_collection.repo + # We want the one we just wrote, so base it on self.new_pack + revision_vf = self._build_vf('revision', True, False, for_write=True) + ancestor_keys = revision_vf.get_parent_map(revision_vf.keys()) + # Strip keys back into revision_ids. + ancestors = dict((k[0], tuple([p[0] for p in parents])) + for k, parents in ancestor_keys.iteritems()) + del ancestor_keys + # TODO: _generate_text_key_index should be much cheaper to generate from + # a chk repository, rather than the current implementation + ideal_index = repo._generate_text_key_index(None, ancestors) + file_id_parent_map = source_vf.get_parent_map(self._text_refs) + # 2) generate a keys list that contains all the entries that can + # be used as-is, with corrected parents. + ok_keys = [] + new_parent_keys = {} # (key, parent_keys) + discarded_keys = [] + NULL_REVISION = _mod_revision.NULL_REVISION + for key in self._text_refs: + # 0 - index + # 1 - key + # 2 - value + # 3 - refs + try: + ideal_parents = tuple(ideal_index[key]) + except KeyError: + discarded_keys.append(key) + self._data_changed = True + else: + if ideal_parents == (NULL_REVISION,): + ideal_parents = () + source_parents = file_id_parent_map[key] + if ideal_parents == source_parents: + # no change needed. + ok_keys.append(key) + else: + # We need to change the parent graph, but we don't need to + # re-insert the text (since we don't pun the compression + # parent with the parents list) + self._data_changed = True + new_parent_keys[key] = ideal_parents + # we're finished with some data. + del ideal_index + del file_id_parent_map + # 3) bulk copy the data, updating records than need it + def _update_parents_for_texts(): + stream = source_vf.get_record_stream(self._text_refs, + 'groupcompress', False) + for record in stream: + if record.key in new_parent_keys: + record.parents = new_parent_keys[record.key] + yield record + target_vf.insert_record_stream(_update_parents_for_texts()) + + def _use_pack(self, new_pack): + """Override _use_pack to check for reconcile having changed content.""" + return new_pack.data_inserted() and self._data_changed + + +class GCCHKCanonicalizingPacker(GCCHKPacker): + """A packer that ensures inventories have canonical-form CHK maps. + + Ideally this would be part of reconcile, but it's very slow and rarely + needed. (It repairs repositories affected by + https://bugs.launchpad.net/bzr/+bug/522637). + """ + + def __init__(self, *args, **kwargs): + super(GCCHKCanonicalizingPacker, self).__init__(*args, **kwargs) + self._data_changed = False + + def _exhaust_stream(self, source_vf, keys, message, vf_to_stream, pb_offset): + """Create and exhaust a stream, but don't insert it. + + This is useful to get the side-effects of generating a stream. + """ + self.pb.update('scanning %s' % (message,), pb_offset) + child_pb = ui.ui_factory.nested_progress_bar() + try: + list(vf_to_stream(source_vf, keys, message, child_pb)) + finally: + child_pb.finished() + + def _copy_inventory_texts(self): + source_vf, target_vf = self._build_vfs('inventory', True, True) + source_chk_vf, target_chk_vf = self._get_chk_vfs_for_copy() + inventory_keys = source_vf.keys() + # First, copy the existing CHKs on the assumption that most of them + # will be correct. This will save us from having to reinsert (and + # recompress) these records later at the cost of perhaps preserving a + # few unused CHKs. + # (Iterate but don't insert _get_filtered_inv_stream to populate the + # variables needed by GCCHKPacker._copy_chk_texts.) + self._exhaust_stream(source_vf, inventory_keys, 'inventories', + self._get_filtered_inv_stream, 2) + GCCHKPacker._copy_chk_texts(self) + # Now copy and fix the inventories, and any regenerated CHKs. + def chk_canonicalizing_inv_stream(source_vf, keys, message, pb=None): + return self._get_filtered_canonicalizing_inv_stream( + source_vf, keys, message, pb, source_chk_vf, target_chk_vf) + self._copy_stream(source_vf, target_vf, inventory_keys, + 'inventories', chk_canonicalizing_inv_stream, 4) + + def _copy_chk_texts(self): + # No-op; in this class this happens during _copy_inventory_texts. + pass + + def _get_filtered_canonicalizing_inv_stream(self, source_vf, keys, message, + pb=None, source_chk_vf=None, target_chk_vf=None): + """Filter the texts of inventories, regenerating CHKs to make sure they + are canonical. + """ + total_keys = len(keys) + target_chk_vf = versionedfile.NoDupeAddLinesDecorator(target_chk_vf) + def _filtered_inv_stream(): + stream = source_vf.get_record_stream(keys, 'groupcompress', True) + search_key_name = None + for idx, record in enumerate(stream): + # Inventories should always be with revisions; assume success. + bytes = record.get_bytes_as('fulltext') + chk_inv = inventory.CHKInventory.deserialise( + source_chk_vf, bytes, record.key) + if pb is not None: + pb.update('inv', idx, total_keys) + chk_inv.id_to_entry._ensure_root() + if search_key_name is None: + # Find the name corresponding to the search_key_func + search_key_reg = chk_map.search_key_registry + for search_key_name, func in search_key_reg.iteritems(): + if func == chk_inv.id_to_entry._search_key_func: + break + canonical_inv = inventory.CHKInventory.from_inventory( + target_chk_vf, chk_inv, + maximum_size=chk_inv.id_to_entry._root_node._maximum_size, + search_key_name=search_key_name) + if chk_inv.id_to_entry.key() != canonical_inv.id_to_entry.key(): + trace.mutter( + 'Non-canonical CHK map for id_to_entry of inv: %s ' + '(root is %s, should be %s)' % (chk_inv.revision_id, + chk_inv.id_to_entry.key()[0], + canonical_inv.id_to_entry.key()[0])) + self._data_changed = True + p_id_map = chk_inv.parent_id_basename_to_file_id + p_id_map._ensure_root() + canon_p_id_map = canonical_inv.parent_id_basename_to_file_id + if p_id_map.key() != canon_p_id_map.key(): + trace.mutter( + 'Non-canonical CHK map for parent_id_to_basename of ' + 'inv: %s (root is %s, should be %s)' + % (chk_inv.revision_id, p_id_map.key()[0], + canon_p_id_map.key()[0])) + self._data_changed = True + yield versionedfile.ChunkedContentFactory(record.key, + record.parents, record.sha1, + canonical_inv.to_lines()) + # We have finished processing all of the inventory records, we + # don't need these sets anymore + return _filtered_inv_stream() + + def _use_pack(self, new_pack): + """Override _use_pack to check for reconcile having changed content.""" + return new_pack.data_inserted() and self._data_changed + + +class GCRepositoryPackCollection(RepositoryPackCollection): + + pack_factory = GCPack + resumed_pack_factory = ResumedGCPack + normal_packer_class = GCCHKPacker + optimising_packer_class = GCCHKPacker + + def _check_new_inventories(self): + """Detect missing inventories or chk root entries for the new revisions + in this write group. + + :returns: list of strs, summarising any problems found. If the list is + empty no problems were found. + """ + # Ensure that all revisions added in this write group have: + # - corresponding inventories, + # - chk root entries for those inventories, + # - and any present parent inventories have their chk root + # entries too. + # And all this should be independent of any fallback repository. + problems = [] + key_deps = self.repo.revisions._index._key_dependencies + new_revisions_keys = key_deps.get_new_keys() + no_fallback_inv_index = self.repo.inventories._index + no_fallback_chk_bytes_index = self.repo.chk_bytes._index + no_fallback_texts_index = self.repo.texts._index + inv_parent_map = no_fallback_inv_index.get_parent_map( + new_revisions_keys) + # Are any inventories for corresponding to the new revisions missing? + corresponding_invs = set(inv_parent_map) + missing_corresponding = set(new_revisions_keys) + missing_corresponding.difference_update(corresponding_invs) + if missing_corresponding: + problems.append("inventories missing for revisions %s" % + (sorted(missing_corresponding),)) + return problems + # Are any chk root entries missing for any inventories? This includes + # any present parent inventories, which may be used when calculating + # deltas for streaming. + all_inv_keys = set(corresponding_invs) + for parent_inv_keys in inv_parent_map.itervalues(): + all_inv_keys.update(parent_inv_keys) + # Filter out ghost parents. + all_inv_keys.intersection_update( + no_fallback_inv_index.get_parent_map(all_inv_keys)) + parent_invs_only_keys = all_inv_keys.symmetric_difference( + corresponding_invs) + all_missing = set() + inv_ids = [key[-1] for key in all_inv_keys] + parent_invs_only_ids = [key[-1] for key in parent_invs_only_keys] + root_key_info = _build_interesting_key_sets( + self.repo, inv_ids, parent_invs_only_ids) + expected_chk_roots = root_key_info.all_keys() + present_chk_roots = no_fallback_chk_bytes_index.get_parent_map( + expected_chk_roots) + missing_chk_roots = expected_chk_roots.difference(present_chk_roots) + if missing_chk_roots: + problems.append("missing referenced chk root keys: %s" + % (sorted(missing_chk_roots),)) + # Don't bother checking any further. + return problems + # Find all interesting chk_bytes records, and make sure they are + # present, as well as the text keys they reference. + chk_bytes_no_fallbacks = self.repo.chk_bytes.without_fallbacks() + chk_bytes_no_fallbacks._search_key_func = \ + self.repo.chk_bytes._search_key_func + chk_diff = chk_map.iter_interesting_nodes( + chk_bytes_no_fallbacks, root_key_info.interesting_root_keys, + root_key_info.uninteresting_root_keys) + text_keys = set() + try: + for record in _filter_text_keys(chk_diff, text_keys, + chk_map._bytes_to_text_key): + pass + except errors.NoSuchRevision, e: + # XXX: It would be nice if we could give a more precise error here. + problems.append("missing chk node(s) for id_to_entry maps") + chk_diff = chk_map.iter_interesting_nodes( + chk_bytes_no_fallbacks, root_key_info.interesting_pid_root_keys, + root_key_info.uninteresting_pid_root_keys) + try: + for interesting_rec, interesting_map in chk_diff: + pass + except errors.NoSuchRevision, e: + problems.append( + "missing chk node(s) for parent_id_basename_to_file_id maps") + present_text_keys = no_fallback_texts_index.get_parent_map(text_keys) + missing_text_keys = text_keys.difference(present_text_keys) + if missing_text_keys: + problems.append("missing text keys: %r" + % (sorted(missing_text_keys),)) + return problems + + +class CHKInventoryRepository(PackRepository): + """subclass of PackRepository that uses CHK based inventories.""" + + def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class, + _serializer): + """Overridden to change pack collection class.""" + super(CHKInventoryRepository, self).__init__(_format, a_bzrdir, + control_files, _commit_builder_class, _serializer) + index_transport = self._transport.clone('indices') + self._pack_collection = GCRepositoryPackCollection(self, + self._transport, index_transport, + self._transport.clone('upload'), + self._transport.clone('packs'), + _format.index_builder_class, + _format.index_class, + use_chk_index=self._format.supports_chks, + ) + self.inventories = GroupCompressVersionedFiles( + _GCGraphIndex(self._pack_collection.inventory_index.combined_index, + add_callback=self._pack_collection.inventory_index.add_callback, + parents=True, is_locked=self.is_locked, + inconsistency_fatal=False), + access=self._pack_collection.inventory_index.data_access) + self.revisions = GroupCompressVersionedFiles( + _GCGraphIndex(self._pack_collection.revision_index.combined_index, + add_callback=self._pack_collection.revision_index.add_callback, + parents=True, is_locked=self.is_locked, + track_external_parent_refs=True, track_new_keys=True), + access=self._pack_collection.revision_index.data_access, + delta=False) + self.signatures = GroupCompressVersionedFiles( + _GCGraphIndex(self._pack_collection.signature_index.combined_index, + add_callback=self._pack_collection.signature_index.add_callback, + parents=False, is_locked=self.is_locked, + inconsistency_fatal=False), + access=self._pack_collection.signature_index.data_access, + delta=False) + self.texts = GroupCompressVersionedFiles( + _GCGraphIndex(self._pack_collection.text_index.combined_index, + add_callback=self._pack_collection.text_index.add_callback, + parents=True, is_locked=self.is_locked, + inconsistency_fatal=False), + access=self._pack_collection.text_index.data_access) + # No parents, individual CHK pages don't have specific ancestry + self.chk_bytes = GroupCompressVersionedFiles( + _GCGraphIndex(self._pack_collection.chk_index.combined_index, + add_callback=self._pack_collection.chk_index.add_callback, + parents=False, is_locked=self.is_locked, + inconsistency_fatal=False), + access=self._pack_collection.chk_index.data_access) + search_key_name = self._format._serializer.search_key_name + search_key_func = chk_map.search_key_registry.get(search_key_name) + self.chk_bytes._search_key_func = search_key_func + # True when the repository object is 'write locked' (as opposed to the + # physical lock only taken out around changes to the pack-names list.) + # Another way to represent this would be a decorator around the control + # files object that presents logical locks as physical ones - if this + # gets ugly consider that alternative design. RBC 20071011 + self._write_lock_count = 0 + self._transaction = None + # for tests + self._reconcile_does_inventory_gc = True + self._reconcile_fixes_text_parents = True + self._reconcile_backsup_inventory = False + + def _add_inventory_checked(self, revision_id, inv, parents): + """Add inv to the repository after checking the inputs. + + This function can be overridden to allow different inventory styles. + + :seealso: add_inventory, for the contract. + """ + # make inventory + serializer = self._format._serializer + result = inventory.CHKInventory.from_inventory(self.chk_bytes, inv, + maximum_size=serializer.maximum_size, + search_key_name=serializer.search_key_name) + inv_lines = result.to_lines() + return self._inventory_add_lines(revision_id, parents, + inv_lines, check_content=False) + + def _create_inv_from_null(self, delta, revision_id): + """This will mutate new_inv directly. + + This is a simplified form of create_by_apply_delta which knows that all + the old values must be None, so everything is a create. + """ + serializer = self._format._serializer + new_inv = inventory.CHKInventory(serializer.search_key_name) + new_inv.revision_id = revision_id + entry_to_bytes = new_inv._entry_to_bytes + id_to_entry_dict = {} + parent_id_basename_dict = {} + for old_path, new_path, file_id, entry in delta: + if old_path is not None: + raise ValueError('Invalid delta, somebody tried to delete %r' + ' from the NULL_REVISION' + % ((old_path, file_id),)) + if new_path is None: + raise ValueError('Invalid delta, delta from NULL_REVISION has' + ' no new_path %r' % (file_id,)) + if new_path == '': + new_inv.root_id = file_id + parent_id_basename_key = StaticTuple('', '').intern() + else: + utf8_entry_name = entry.name.encode('utf-8') + parent_id_basename_key = StaticTuple(entry.parent_id, + utf8_entry_name).intern() + new_value = entry_to_bytes(entry) + # Populate Caches? + # new_inv._path_to_fileid_cache[new_path] = file_id + key = StaticTuple(file_id).intern() + id_to_entry_dict[key] = new_value + parent_id_basename_dict[parent_id_basename_key] = file_id + + new_inv._populate_from_dicts(self.chk_bytes, id_to_entry_dict, + parent_id_basename_dict, maximum_size=serializer.maximum_size) + return new_inv + + def add_inventory_by_delta(self, basis_revision_id, delta, new_revision_id, + parents, basis_inv=None, propagate_caches=False): + """Add a new inventory expressed as a delta against another revision. + + :param basis_revision_id: The inventory id the delta was created + against. + :param delta: The inventory delta (see Inventory.apply_delta for + details). + :param new_revision_id: The revision id that the inventory is being + added for. + :param parents: The revision ids of the parents that revision_id is + known to have and are in the repository already. These are supplied + for repositories that depend on the inventory graph for revision + graph access, as well as for those that pun ancestry with delta + compression. + :param basis_inv: The basis inventory if it is already known, + otherwise None. + :param propagate_caches: If True, the caches for this inventory are + copied to and updated for the result if possible. + + :returns: (validator, new_inv) + The validator(which is a sha1 digest, though what is sha'd is + repository format specific) of the serialized inventory, and the + resulting inventory. + """ + if not self.is_in_write_group(): + raise AssertionError("%r not in write group" % (self,)) + _mod_revision.check_not_reserved_id(new_revision_id) + basis_tree = None + if basis_inv is None: + if basis_revision_id == _mod_revision.NULL_REVISION: + new_inv = self._create_inv_from_null(delta, new_revision_id) + if new_inv.root_id is None: + raise errors.RootMissing() + inv_lines = new_inv.to_lines() + return self._inventory_add_lines(new_revision_id, parents, + inv_lines, check_content=False), new_inv + else: + basis_tree = self.revision_tree(basis_revision_id) + basis_tree.lock_read() + basis_inv = basis_tree.root_inventory + try: + result = basis_inv.create_by_apply_delta(delta, new_revision_id, + propagate_caches=propagate_caches) + inv_lines = result.to_lines() + return self._inventory_add_lines(new_revision_id, parents, + inv_lines, check_content=False), result + finally: + if basis_tree is not None: + basis_tree.unlock() + + def _deserialise_inventory(self, revision_id, bytes): + return inventory.CHKInventory.deserialise(self.chk_bytes, bytes, + (revision_id,)) + + def _iter_inventories(self, revision_ids, ordering): + """Iterate over many inventory objects.""" + if ordering is None: + ordering = 'unordered' + keys = [(revision_id,) for revision_id in revision_ids] + stream = self.inventories.get_record_stream(keys, ordering, True) + texts = {} + for record in stream: + if record.storage_kind != 'absent': + texts[record.key] = record.get_bytes_as('fulltext') + else: + texts[record.key] = None + for key in keys: + bytes = texts[key] + if bytes is None: + yield (None, key[-1]) + else: + yield (inventory.CHKInventory.deserialise( + self.chk_bytes, bytes, key), key[-1]) + + def _get_inventory_xml(self, revision_id): + """Get serialized inventory as a string.""" + # Without a native 'xml' inventory, this method doesn't make sense. + # However older working trees, and older bundles want it - so we supply + # it allowing _get_inventory_xml to work. Bundles currently use the + # serializer directly; this also isn't ideal, but there isn't an xml + # iteration interface offered at all for repositories. + return self._serializer.write_inventory_to_string( + self.get_inventory(revision_id)) + + def _find_present_inventory_keys(self, revision_keys): + parent_map = self.inventories.get_parent_map(revision_keys) + present_inventory_keys = set(k for k in parent_map) + return present_inventory_keys + + def fileids_altered_by_revision_ids(self, revision_ids, _inv_weave=None): + """Find the file ids and versions affected by revisions. + + :param revisions: an iterable containing revision ids. + :param _inv_weave: The inventory weave from this repository or None. + If None, the inventory weave will be opened automatically. + :return: a dictionary mapping altered file-ids to an iterable of + revision_ids. Each altered file-ids has the exact revision_ids that + altered it listed explicitly. + """ + rich_root = self.supports_rich_root() + bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key + file_id_revisions = {} + pb = ui.ui_factory.nested_progress_bar() + try: + revision_keys = [(r,) for r in revision_ids] + parent_keys = self._find_parent_keys_of_revisions(revision_keys) + # TODO: instead of using _find_present_inventory_keys, change the + # code paths to allow missing inventories to be tolerated. + # However, we only want to tolerate missing parent + # inventories, not missing inventories for revision_ids + present_parent_inv_keys = self._find_present_inventory_keys( + parent_keys) + present_parent_inv_ids = set( + [k[-1] for k in present_parent_inv_keys]) + inventories_to_read = set(revision_ids) + inventories_to_read.update(present_parent_inv_ids) + root_key_info = _build_interesting_key_sets( + self, inventories_to_read, present_parent_inv_ids) + interesting_root_keys = root_key_info.interesting_root_keys + uninteresting_root_keys = root_key_info.uninteresting_root_keys + chk_bytes = self.chk_bytes + for record, items in chk_map.iter_interesting_nodes(chk_bytes, + interesting_root_keys, uninteresting_root_keys, + pb=pb): + for name, bytes in items: + (name_utf8, file_id, revision_id) = bytes_to_info(bytes) + # TODO: consider interning file_id, revision_id here, or + # pushing that intern() into bytes_to_info() + # TODO: rich_root should always be True here, for all + # repositories that support chk_bytes + if not rich_root and name_utf8 == '': + continue + try: + file_id_revisions[file_id].add(revision_id) + except KeyError: + file_id_revisions[file_id] = set([revision_id]) + finally: + pb.finished() + return file_id_revisions + + def find_text_key_references(self): + """Find the text key references within the repository. + + :return: A dictionary mapping text keys ((fileid, revision_id) tuples) + to whether they were referred to by the inventory of the + revision_id that they contain. The inventory texts from all present + revision ids are assessed to generate this report. + """ + # XXX: Slow version but correct: rewrite as a series of delta + # examinations/direct tree traversal. Note that that will require care + # as a common node is reachable both from the inventory that added it, + # and others afterwards. + revision_keys = self.revisions.keys() + result = {} + rich_roots = self.supports_rich_root() + pb = ui.ui_factory.nested_progress_bar() + try: + all_revs = self.all_revision_ids() + total = len(all_revs) + for pos, inv in enumerate(self.iter_inventories(all_revs)): + pb.update("Finding text references", pos, total) + for _, entry in inv.iter_entries(): + if not rich_roots and entry.file_id == inv.root_id: + continue + key = (entry.file_id, entry.revision) + result.setdefault(key, False) + if entry.revision == inv.revision_id: + result[key] = True + return result + finally: + pb.finished() + + @needs_write_lock + def reconcile_canonicalize_chks(self): + """Reconcile this repository to make sure all CHKs are in canonical + form. + """ + from bzrlib.reconcile import PackReconciler + reconciler = PackReconciler(self, thorough=True, canonicalize_chks=True) + reconciler.reconcile() + return reconciler + + def _reconcile_pack(self, collection, packs, extension, revs, pb): + packer = GCCHKReconcilePacker(collection, packs, extension) + return packer.pack(pb) + + def _canonicalize_chks_pack(self, collection, packs, extension, revs, pb): + packer = GCCHKCanonicalizingPacker(collection, packs, extension, revs) + return packer.pack(pb) + + def _get_source(self, to_format): + """Return a source for streaming from this repository.""" + if self._format._serializer == to_format._serializer: + # We must be exactly the same format, otherwise stuff like the chk + # page layout might be different. + # Actually, this test is just slightly looser than exact so that + # CHK2 <-> 2a transfers will work. + return GroupCHKStreamSource(self, to_format) + return super(CHKInventoryRepository, self)._get_source(to_format) + + def _find_inconsistent_revision_parents(self, revisions_iterator=None): + """Find revisions with different parent lists in the revision object + and in the index graph. + + :param revisions_iterator: None, or an iterator of (revid, + Revision-or-None). This iterator controls the revisions checked. + :returns: an iterator yielding tuples of (revison-id, parents-in-index, + parents-in-revision). + """ + if not self.is_locked(): + raise AssertionError() + vf = self.revisions + if revisions_iterator is None: + revisions_iterator = self._iter_revisions(None) + for revid, revision in revisions_iterator: + if revision is None: + pass + parent_map = vf.get_parent_map([(revid,)]) + parents_according_to_index = tuple(parent[-1] for parent in + parent_map[(revid,)]) + parents_according_to_revision = tuple(revision.parent_ids) + if parents_according_to_index != parents_according_to_revision: + yield (revid, parents_according_to_index, + parents_according_to_revision) + + def _check_for_inconsistent_revision_parents(self): + inconsistencies = list(self._find_inconsistent_revision_parents()) + if inconsistencies: + raise errors.BzrCheckError( + "Revision index has inconsistent parents.") + + +class GroupCHKStreamSource(StreamSource): + """Used when both the source and target repo are GroupCHK repos.""" + + def __init__(self, from_repository, to_format): + """Create a StreamSource streaming from from_repository.""" + super(GroupCHKStreamSource, self).__init__(from_repository, to_format) + self._revision_keys = None + self._text_keys = None + self._text_fetch_order = 'groupcompress' + self._chk_id_roots = None + self._chk_p_id_roots = None + + def _get_inventory_stream(self, inventory_keys, allow_absent=False): + """Get a stream of inventory texts. + + When this function returns, self._chk_id_roots and self._chk_p_id_roots + should be populated. + """ + self._chk_id_roots = [] + self._chk_p_id_roots = [] + def _filtered_inv_stream(): + id_roots_set = set() + p_id_roots_set = set() + source_vf = self.from_repository.inventories + stream = source_vf.get_record_stream(inventory_keys, + 'groupcompress', True) + for record in stream: + if record.storage_kind == 'absent': + if allow_absent: + continue + else: + raise errors.NoSuchRevision(self, record.key) + bytes = record.get_bytes_as('fulltext') + chk_inv = inventory.CHKInventory.deserialise(None, bytes, + record.key) + key = chk_inv.id_to_entry.key() + if key not in id_roots_set: + self._chk_id_roots.append(key) + id_roots_set.add(key) + p_id_map = chk_inv.parent_id_basename_to_file_id + if p_id_map is None: + raise AssertionError('Parent id -> file_id map not set') + key = p_id_map.key() + if key not in p_id_roots_set: + p_id_roots_set.add(key) + self._chk_p_id_roots.append(key) + yield record + # We have finished processing all of the inventory records, we + # don't need these sets anymore + id_roots_set.clear() + p_id_roots_set.clear() + return ('inventories', _filtered_inv_stream()) + + def _get_filtered_chk_streams(self, excluded_revision_keys): + self._text_keys = set() + excluded_revision_keys.discard(_mod_revision.NULL_REVISION) + if not excluded_revision_keys: + uninteresting_root_keys = set() + uninteresting_pid_root_keys = set() + else: + # filter out any excluded revisions whose inventories are not + # actually present + # TODO: Update Repository.iter_inventories() to add + # ignore_missing=True + present_keys = self.from_repository._find_present_inventory_keys( + excluded_revision_keys) + present_ids = [k[-1] for k in present_keys] + uninteresting_root_keys = set() + uninteresting_pid_root_keys = set() + for inv in self.from_repository.iter_inventories(present_ids): + uninteresting_root_keys.add(inv.id_to_entry.key()) + uninteresting_pid_root_keys.add( + inv.parent_id_basename_to_file_id.key()) + chk_bytes = self.from_repository.chk_bytes + def _filter_id_to_entry(): + interesting_nodes = chk_map.iter_interesting_nodes(chk_bytes, + self._chk_id_roots, uninteresting_root_keys) + for record in _filter_text_keys(interesting_nodes, self._text_keys, + chk_map._bytes_to_text_key): + if record is not None: + yield record + # Consumed + self._chk_id_roots = None + yield 'chk_bytes', _filter_id_to_entry() + def _get_parent_id_basename_to_file_id_pages(): + for record, items in chk_map.iter_interesting_nodes(chk_bytes, + self._chk_p_id_roots, uninteresting_pid_root_keys): + if record is not None: + yield record + # Consumed + self._chk_p_id_roots = None + yield 'chk_bytes', _get_parent_id_basename_to_file_id_pages() + + def _get_text_stream(self): + # Note: We know we don't have to handle adding root keys, because both + # the source and target are the identical network name. + text_stream = self.from_repository.texts.get_record_stream( + self._text_keys, self._text_fetch_order, False) + return ('texts', text_stream) + + def get_stream(self, search): + def wrap_and_count(pb, rc, stream): + """Yield records from stream while showing progress.""" + count = 0 + for record in stream: + if count == rc.STEP: + rc.increment(count) + pb.update('Estimate', rc.current, rc.max) + count = 0 + count += 1 + yield record + + revision_ids = search.get_keys() + pb = ui.ui_factory.nested_progress_bar() + rc = self._record_counter + self._record_counter.setup(len(revision_ids)) + for stream_info in self._fetch_revision_texts(revision_ids): + yield (stream_info[0], + wrap_and_count(pb, rc, stream_info[1])) + self._revision_keys = [(rev_id,) for rev_id in revision_ids] + # TODO: The keys to exclude might be part of the search recipe + # For now, exclude all parents that are at the edge of ancestry, for + # which we have inventories + from_repo = self.from_repository + parent_keys = from_repo._find_parent_keys_of_revisions( + self._revision_keys) + self.from_repository.revisions.clear_cache() + self.from_repository.signatures.clear_cache() + # Clear the repo's get_parent_map cache too. + self.from_repository._unstacked_provider.disable_cache() + self.from_repository._unstacked_provider.enable_cache() + s = self._get_inventory_stream(self._revision_keys) + yield (s[0], wrap_and_count(pb, rc, s[1])) + self.from_repository.inventories.clear_cache() + for stream_info in self._get_filtered_chk_streams(parent_keys): + yield (stream_info[0], wrap_and_count(pb, rc, stream_info[1])) + self.from_repository.chk_bytes.clear_cache() + s = self._get_text_stream() + yield (s[0], wrap_and_count(pb, rc, s[1])) + self.from_repository.texts.clear_cache() + pb.update('Done', rc.max, rc.max) + pb.finished() + + def get_stream_for_missing_keys(self, missing_keys): + # missing keys can only occur when we are byte copying and not + # translating (because translation means we don't send + # unreconstructable deltas ever). + missing_inventory_keys = set() + for key in missing_keys: + if key[0] != 'inventories': + raise AssertionError('The only missing keys we should' + ' be filling in are inventory keys, not %s' + % (key[0],)) + missing_inventory_keys.add(key[1:]) + if self._chk_id_roots or self._chk_p_id_roots: + raise AssertionError('Cannot call get_stream_for_missing_keys' + ' until all of get_stream() has been consumed.') + # Yield the inventory stream, so we can find the chk stream + # Some of the missing_keys will be missing because they are ghosts. + # As such, we can ignore them. The Sink is required to verify there are + # no unavailable texts when the ghost inventories are not filled in. + yield self._get_inventory_stream(missing_inventory_keys, + allow_absent=True) + # We use the empty set for excluded_revision_keys, to make it clear + # that we want to transmit all referenced chk pages. + for stream_info in self._get_filtered_chk_streams(set()): + yield stream_info + + +class _InterestingKeyInfo(object): + def __init__(self): + self.interesting_root_keys = set() + self.interesting_pid_root_keys = set() + self.uninteresting_root_keys = set() + self.uninteresting_pid_root_keys = set() + + def all_interesting(self): + return self.interesting_root_keys.union(self.interesting_pid_root_keys) + + def all_uninteresting(self): + return self.uninteresting_root_keys.union( + self.uninteresting_pid_root_keys) + + def all_keys(self): + return self.all_interesting().union(self.all_uninteresting()) + + +def _build_interesting_key_sets(repo, inventory_ids, parent_only_inv_ids): + result = _InterestingKeyInfo() + for inv in repo.iter_inventories(inventory_ids, 'unordered'): + root_key = inv.id_to_entry.key() + pid_root_key = inv.parent_id_basename_to_file_id.key() + if inv.revision_id in parent_only_inv_ids: + result.uninteresting_root_keys.add(root_key) + result.uninteresting_pid_root_keys.add(pid_root_key) + else: + result.interesting_root_keys.add(root_key) + result.interesting_pid_root_keys.add(pid_root_key) + return result + + +def _filter_text_keys(interesting_nodes_iterable, text_keys, bytes_to_text_key): + """Iterate the result of iter_interesting_nodes, yielding the records + and adding to text_keys. + """ + text_keys_update = text_keys.update + for record, items in interesting_nodes_iterable: + text_keys_update([bytes_to_text_key(b) for n,b in items]) + yield record + + +class RepositoryFormat2a(RepositoryFormatPack): + """A CHK repository that uses the bencode revision serializer.""" + + repository_class = CHKInventoryRepository + supports_external_lookups = True + supports_chks = True + _commit_builder_class = PackRootCommitBuilder + rich_root_data = True + _serializer = chk_serializer.chk_bencode_serializer + _commit_inv_deltas = True + # What index classes to use + index_builder_class = BTreeBuilder + index_class = BTreeGraphIndex + # Note: We cannot unpack a delta that references a text we haven't + # seen yet. There are 2 options, work in fulltexts, or require + # topological sorting. Using fulltexts is more optimal for local + # operations, because the source can be smart about extracting + # multiple in-a-row (and sharing strings). Topological is better + # for remote, because we access less data. + _fetch_order = 'unordered' + _fetch_uses_deltas = False # essentially ignored by the groupcompress code. + fast_deltas = True + pack_compresses = True + + def _get_matching_bzrdir(self): + return controldir.format_registry.make_bzrdir('2a') + + def _ignore_setting_bzrdir(self, format): + pass + + _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir) + + @classmethod + def get_format_string(cls): + return ('Bazaar repository format 2a (needs bzr 1.16 or later)\n') + + def get_format_description(self): + """See RepositoryFormat.get_format_description().""" + return ("Repository format 2a - rich roots, group compression" + " and chk inventories") + + +class RepositoryFormat2aSubtree(RepositoryFormat2a): + """A 2a repository format that supports nested trees. + + """ + + def _get_matching_bzrdir(self): + return controldir.format_registry.make_bzrdir('development-subtree') + + def _ignore_setting_bzrdir(self, format): + pass + + _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir) + + @classmethod + def get_format_string(cls): + return ('Bazaar development format 8\n') + + def get_format_description(self): + """See RepositoryFormat.get_format_description().""" + return ("Development repository format 8 - nested trees, " + "group compression and chk inventories") + + experimental = True + supports_tree_reference = True diff --git a/bzrlib/repofmt/knitpack_repo.py b/bzrlib/repofmt/knitpack_repo.py new file mode 100644 index 0000000..15a819d --- /dev/null +++ b/bzrlib/repofmt/knitpack_repo.py @@ -0,0 +1,1156 @@ +# Copyright (C) 2007-2011 Canonical Ltd +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +"""Knit-based pack repository formats.""" + +from __future__ import absolute_import + +from bzrlib.lazy_import import lazy_import +lazy_import(globals(), """ +from itertools import izip +import time + +from bzrlib import ( + controldir, + debug, + errors, + knit, + osutils, + pack, + revision as _mod_revision, + trace, + tsort, + ui, + xml5, + xml6, + xml7, + ) +from bzrlib.knit import ( + _KnitGraphIndex, + KnitPlainFactory, + KnitVersionedFiles, + ) +""") + +from bzrlib import ( + btree_index, + ) +from bzrlib.index import ( + CombinedGraphIndex, + GraphIndex, + GraphIndexPrefixAdapter, + InMemoryGraphIndex, + ) +from bzrlib.repofmt.knitrepo import ( + KnitRepository, + ) +from bzrlib.repofmt.pack_repo import ( + _DirectPackAccess, + NewPack, + RepositoryFormatPack, + ResumedPack, + Packer, + PackCommitBuilder, + PackRepository, + PackRootCommitBuilder, + RepositoryPackCollection, + ) +from bzrlib.vf_repository import ( + StreamSource, + ) + + +class KnitPackRepository(PackRepository, KnitRepository): + + def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class, + _serializer): + PackRepository.__init__(self, _format, a_bzrdir, control_files, + _commit_builder_class, _serializer) + if self._format.supports_chks: + raise AssertionError("chk not supported") + index_transport = self._transport.clone('indices') + self._pack_collection = KnitRepositoryPackCollection(self, + self._transport, + index_transport, + self._transport.clone('upload'), + self._transport.clone('packs'), + _format.index_builder_class, + _format.index_class, + use_chk_index=False, + ) + self.inventories = KnitVersionedFiles( + _KnitGraphIndex(self._pack_collection.inventory_index.combined_index, + add_callback=self._pack_collection.inventory_index.add_callback, + deltas=True, parents=True, is_locked=self.is_locked), + data_access=self._pack_collection.inventory_index.data_access, + max_delta_chain=200) + self.revisions = KnitVersionedFiles( + _KnitGraphIndex(self._pack_collection.revision_index.combined_index, + add_callback=self._pack_collection.revision_index.add_callback, + deltas=False, parents=True, is_locked=self.is_locked, + track_external_parent_refs=True), + data_access=self._pack_collection.revision_index.data_access, + max_delta_chain=0) + self.signatures = KnitVersionedFiles( + _KnitGraphIndex(self._pack_collection.signature_index.combined_index, + add_callback=self._pack_collection.signature_index.add_callback, + deltas=False, parents=False, is_locked=self.is_locked), + data_access=self._pack_collection.signature_index.data_access, + max_delta_chain=0) + self.texts = KnitVersionedFiles( + _KnitGraphIndex(self._pack_collection.text_index.combined_index, + add_callback=self._pack_collection.text_index.add_callback, + deltas=True, parents=True, is_locked=self.is_locked), + data_access=self._pack_collection.text_index.data_access, + max_delta_chain=200) + self.chk_bytes = None + # True when the repository object is 'write locked' (as opposed to the + # physical lock only taken out around changes to the pack-names list.) + # Another way to represent this would be a decorator around the control + # files object that presents logical locks as physical ones - if this + # gets ugly consider that alternative design. RBC 20071011 + self._write_lock_count = 0 + self._transaction = None + # for tests + self._reconcile_does_inventory_gc = True + self._reconcile_fixes_text_parents = True + self._reconcile_backsup_inventory = False + + def _get_source(self, to_format): + if to_format.network_name() == self._format.network_name(): + return KnitPackStreamSource(self, to_format) + return PackRepository._get_source(self, to_format) + + def _reconcile_pack(self, collection, packs, extension, revs, pb): + packer = KnitReconcilePacker(collection, packs, extension, revs) + return packer.pack(pb) + + +class RepositoryFormatKnitPack1(RepositoryFormatPack): + """A no-subtrees parameterized Pack repository. + + This format was introduced in 0.92. + """ + + repository_class = KnitPackRepository + _commit_builder_class = PackCommitBuilder + @property + def _serializer(self): + return xml5.serializer_v5 + # What index classes to use + index_builder_class = InMemoryGraphIndex + index_class = GraphIndex + + def _get_matching_bzrdir(self): + return controldir.format_registry.make_bzrdir('pack-0.92') + + def _ignore_setting_bzrdir(self, format): + pass + + _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir) + + @classmethod + def get_format_string(cls): + """See RepositoryFormat.get_format_string().""" + return "Bazaar pack repository format 1 (needs bzr 0.92)\n" + + def get_format_description(self): + """See RepositoryFormat.get_format_description().""" + return "Packs containing knits without subtree support" + + +class RepositoryFormatKnitPack3(RepositoryFormatPack): + """A subtrees parameterized Pack repository. + + This repository format uses the xml7 serializer to get: + - support for recording full info about the tree root + - support for recording tree-references + + This format was introduced in 0.92. + """ + + repository_class = KnitPackRepository + _commit_builder_class = PackRootCommitBuilder + rich_root_data = True + experimental = True + supports_tree_reference = True + @property + def _serializer(self): + return xml7.serializer_v7 + # What index classes to use + index_builder_class = InMemoryGraphIndex + index_class = GraphIndex + + def _get_matching_bzrdir(self): + return controldir.format_registry.make_bzrdir( + 'pack-0.92-subtree') + + def _ignore_setting_bzrdir(self, format): + pass + + _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir) + + @classmethod + def get_format_string(cls): + """See RepositoryFormat.get_format_string().""" + return "Bazaar pack repository format 1 with subtree support (needs bzr 0.92)\n" + + def get_format_description(self): + """See RepositoryFormat.get_format_description().""" + return "Packs containing knits with subtree support\n" + + +class RepositoryFormatKnitPack4(RepositoryFormatPack): + """A rich-root, no subtrees parameterized Pack repository. + + This repository format uses the xml6 serializer to get: + - support for recording full info about the tree root + + This format was introduced in 1.0. + """ + + repository_class = KnitPackRepository + _commit_builder_class = PackRootCommitBuilder + rich_root_data = True + supports_tree_reference = False + @property + def _serializer(self): + return xml6.serializer_v6 + # What index classes to use + index_builder_class = InMemoryGraphIndex + index_class = GraphIndex + + def _get_matching_bzrdir(self): + return controldir.format_registry.make_bzrdir( + 'rich-root-pack') + + def _ignore_setting_bzrdir(self, format): + pass + + _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir) + + @classmethod + def get_format_string(cls): + """See RepositoryFormat.get_format_string().""" + return ("Bazaar pack repository format 1 with rich root" + " (needs bzr 1.0)\n") + + def get_format_description(self): + """See RepositoryFormat.get_format_description().""" + return "Packs containing knits with rich root support\n" + + +class RepositoryFormatKnitPack5(RepositoryFormatPack): + """Repository that supports external references to allow stacking. + + New in release 1.6. + + Supports external lookups, which results in non-truncated ghosts after + reconcile compared to pack-0.92 formats. + """ + + repository_class = KnitPackRepository + _commit_builder_class = PackCommitBuilder + supports_external_lookups = True + # What index classes to use + index_builder_class = InMemoryGraphIndex + index_class = GraphIndex + + @property + def _serializer(self): + return xml5.serializer_v5 + + def _get_matching_bzrdir(self): + return controldir.format_registry.make_bzrdir('1.6') + + def _ignore_setting_bzrdir(self, format): + pass + + _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir) + + @classmethod + def get_format_string(cls): + """See RepositoryFormat.get_format_string().""" + return "Bazaar RepositoryFormatKnitPack5 (bzr 1.6)\n" + + def get_format_description(self): + """See RepositoryFormat.get_format_description().""" + return "Packs 5 (adds stacking support, requires bzr 1.6)" + + +class RepositoryFormatKnitPack5RichRoot(RepositoryFormatPack): + """A repository with rich roots and stacking. + + New in release 1.6.1. + + Supports stacking on other repositories, allowing data to be accessed + without being stored locally. + """ + + repository_class = KnitPackRepository + _commit_builder_class = PackRootCommitBuilder + rich_root_data = True + supports_tree_reference = False # no subtrees + supports_external_lookups = True + # What index classes to use + index_builder_class = InMemoryGraphIndex + index_class = GraphIndex + + @property + def _serializer(self): + return xml6.serializer_v6 + + def _get_matching_bzrdir(self): + return controldir.format_registry.make_bzrdir( + '1.6.1-rich-root') + + def _ignore_setting_bzrdir(self, format): + pass + + _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir) + + @classmethod + def get_format_string(cls): + """See RepositoryFormat.get_format_string().""" + return "Bazaar RepositoryFormatKnitPack5RichRoot (bzr 1.6.1)\n" + + def get_format_description(self): + return "Packs 5 rich-root (adds stacking support, requires bzr 1.6.1)" + + +class RepositoryFormatKnitPack5RichRootBroken(RepositoryFormatPack): + """A repository with rich roots and external references. + + New in release 1.6. + + Supports external lookups, which results in non-truncated ghosts after + reconcile compared to pack-0.92 formats. + + This format was deprecated because the serializer it uses accidentally + supported subtrees, when the format was not intended to. This meant that + someone could accidentally fetch from an incorrect repository. + """ + + repository_class = KnitPackRepository + _commit_builder_class = PackRootCommitBuilder + rich_root_data = True + supports_tree_reference = False # no subtrees + + supports_external_lookups = True + # What index classes to use + index_builder_class = InMemoryGraphIndex + index_class = GraphIndex + + @property + def _serializer(self): + return xml7.serializer_v7 + + def _get_matching_bzrdir(self): + matching = controldir.format_registry.make_bzrdir( + '1.6.1-rich-root') + matching.repository_format = self + return matching + + def _ignore_setting_bzrdir(self, format): + pass + + _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir) + + @classmethod + def get_format_string(cls): + """See RepositoryFormat.get_format_string().""" + return "Bazaar RepositoryFormatKnitPack5RichRoot (bzr 1.6)\n" + + def get_format_description(self): + return ("Packs 5 rich-root (adds stacking support, requires bzr 1.6)" + " (deprecated)") + + def is_deprecated(self): + return True + + +class RepositoryFormatKnitPack6(RepositoryFormatPack): + """A repository with stacking and btree indexes, + without rich roots or subtrees. + + This is equivalent to pack-1.6 with B+Tree indices. + """ + + repository_class = KnitPackRepository + _commit_builder_class = PackCommitBuilder + supports_external_lookups = True + # What index classes to use + index_builder_class = btree_index.BTreeBuilder + index_class = btree_index.BTreeGraphIndex + + @property + def _serializer(self): + return xml5.serializer_v5 + + def _get_matching_bzrdir(self): + return controldir.format_registry.make_bzrdir('1.9') + + def _ignore_setting_bzrdir(self, format): + pass + + _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir) + + @classmethod + def get_format_string(cls): + """See RepositoryFormat.get_format_string().""" + return "Bazaar RepositoryFormatKnitPack6 (bzr 1.9)\n" + + def get_format_description(self): + """See RepositoryFormat.get_format_description().""" + return "Packs 6 (uses btree indexes, requires bzr 1.9)" + + +class RepositoryFormatKnitPack6RichRoot(RepositoryFormatPack): + """A repository with rich roots, no subtrees, stacking and btree indexes. + + 1.6-rich-root with B+Tree indices. + """ + + repository_class = KnitPackRepository + _commit_builder_class = PackRootCommitBuilder + rich_root_data = True + supports_tree_reference = False # no subtrees + supports_external_lookups = True + # What index classes to use + index_builder_class = btree_index.BTreeBuilder + index_class = btree_index.BTreeGraphIndex + + @property + def _serializer(self): + return xml6.serializer_v6 + + def _get_matching_bzrdir(self): + return controldir.format_registry.make_bzrdir( + '1.9-rich-root') + + def _ignore_setting_bzrdir(self, format): + pass + + _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir) + + @classmethod + def get_format_string(cls): + """See RepositoryFormat.get_format_string().""" + return "Bazaar RepositoryFormatKnitPack6RichRoot (bzr 1.9)\n" + + def get_format_description(self): + return "Packs 6 rich-root (uses btree indexes, requires bzr 1.9)" + + +class RepositoryFormatPackDevelopment2Subtree(RepositoryFormatPack): + """A subtrees development repository. + + This format should be retained in 2.3, to provide an upgrade path from this + to RepositoryFormat2aSubtree. It can be removed in later releases. + + 1.6.1-subtree[as it might have been] with B+Tree indices. + """ + + repository_class = KnitPackRepository + _commit_builder_class = PackRootCommitBuilder + rich_root_data = True + experimental = True + supports_tree_reference = True + supports_external_lookups = True + # What index classes to use + index_builder_class = btree_index.BTreeBuilder + index_class = btree_index.BTreeGraphIndex + + @property + def _serializer(self): + return xml7.serializer_v7 + + def _get_matching_bzrdir(self): + return controldir.format_registry.make_bzrdir( + 'development5-subtree') + + def _ignore_setting_bzrdir(self, format): + pass + + _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir) + + @classmethod + def get_format_string(cls): + """See RepositoryFormat.get_format_string().""" + return ("Bazaar development format 2 with subtree support " + "(needs bzr.dev from before 1.8)\n") + + def get_format_description(self): + """See RepositoryFormat.get_format_description().""" + return ("Development repository format, currently the same as " + "1.6.1-subtree with B+Tree indices.\n") + + +class KnitPackStreamSource(StreamSource): + """A StreamSource used to transfer data between same-format KnitPack repos. + + This source assumes: + 1) Same serialization format for all objects + 2) Same root information + 3) XML format inventories + 4) Atomic inserts (so we can stream inventory texts before text + content) + 5) No chk_bytes + """ + + def __init__(self, from_repository, to_format): + super(KnitPackStreamSource, self).__init__(from_repository, to_format) + self._text_keys = None + self._text_fetch_order = 'unordered' + + def _get_filtered_inv_stream(self, revision_ids): + from_repo = self.from_repository + parent_ids = from_repo._find_parent_ids_of_revisions(revision_ids) + parent_keys = [(p,) for p in parent_ids] + find_text_keys = from_repo._serializer._find_text_key_references + parent_text_keys = set(find_text_keys( + from_repo._inventory_xml_lines_for_keys(parent_keys))) + content_text_keys = set() + knit = KnitVersionedFiles(None, None) + factory = KnitPlainFactory() + def find_text_keys_from_content(record): + if record.storage_kind not in ('knit-delta-gz', 'knit-ft-gz'): + raise ValueError("Unknown content storage kind for" + " inventory text: %s" % (record.storage_kind,)) + # It's a knit record, it has a _raw_record field (even if it was + # reconstituted from a network stream). + raw_data = record._raw_record + # read the entire thing + revision_id = record.key[-1] + content, _ = knit._parse_record(revision_id, raw_data) + if record.storage_kind == 'knit-delta-gz': + line_iterator = factory.get_linedelta_content(content) + elif record.storage_kind == 'knit-ft-gz': + line_iterator = factory.get_fulltext_content(content) + content_text_keys.update(find_text_keys( + [(line, revision_id) for line in line_iterator])) + revision_keys = [(r,) for r in revision_ids] + def _filtered_inv_stream(): + source_vf = from_repo.inventories + stream = source_vf.get_record_stream(revision_keys, + 'unordered', False) + for record in stream: + if record.storage_kind == 'absent': + raise errors.NoSuchRevision(from_repo, record.key) + find_text_keys_from_content(record) + yield record + self._text_keys = content_text_keys - parent_text_keys + return ('inventories', _filtered_inv_stream()) + + def _get_text_stream(self): + # Note: We know we don't have to handle adding root keys, because both + # the source and target are the identical network name. + text_stream = self.from_repository.texts.get_record_stream( + self._text_keys, self._text_fetch_order, False) + return ('texts', text_stream) + + def get_stream(self, search): + revision_ids = search.get_keys() + for stream_info in self._fetch_revision_texts(revision_ids): + yield stream_info + self._revision_keys = [(rev_id,) for rev_id in revision_ids] + yield self._get_filtered_inv_stream(revision_ids) + yield self._get_text_stream() + + +class KnitPacker(Packer): + """Packer that works with knit packs.""" + + def __init__(self, pack_collection, packs, suffix, revision_ids=None, + reload_func=None): + super(KnitPacker, self).__init__(pack_collection, packs, suffix, + revision_ids=revision_ids, + reload_func=reload_func) + + def _pack_map_and_index_list(self, index_attribute): + """Convert a list of packs to an index pack map and index list. + + :param index_attribute: The attribute that the desired index is found + on. + :return: A tuple (map, list) where map contains the dict from + index:pack_tuple, and list contains the indices in the preferred + access order. + """ + indices = [] + pack_map = {} + for pack_obj in self.packs: + index = getattr(pack_obj, index_attribute) + indices.append(index) + pack_map[index] = pack_obj + return pack_map, indices + + def _index_contents(self, indices, key_filter=None): + """Get an iterable of the index contents from a pack_map. + + :param indices: The list of indices to query + :param key_filter: An optional filter to limit the keys returned. + """ + all_index = CombinedGraphIndex(indices) + if key_filter is None: + return all_index.iter_all_entries() + else: + return all_index.iter_entries(key_filter) + + def _copy_nodes(self, nodes, index_map, writer, write_index, + output_lines=None): + """Copy knit nodes between packs with no graph references. + + :param output_lines: Output full texts of copied items. + """ + pb = ui.ui_factory.nested_progress_bar() + try: + return self._do_copy_nodes(nodes, index_map, writer, + write_index, pb, output_lines=output_lines) + finally: + pb.finished() + + def _do_copy_nodes(self, nodes, index_map, writer, write_index, pb, + output_lines=None): + # for record verification + knit = KnitVersionedFiles(None, None) + # plan a readv on each source pack: + # group by pack + nodes = sorted(nodes) + # how to map this into knit.py - or knit.py into this? + # we don't want the typical knit logic, we want grouping by pack + # at this point - perhaps a helper library for the following code + # duplication points? + request_groups = {} + for index, key, value in nodes: + if index not in request_groups: + request_groups[index] = [] + request_groups[index].append((key, value)) + record_index = 0 + pb.update("Copied record", record_index, len(nodes)) + for index, items in request_groups.iteritems(): + pack_readv_requests = [] + for key, value in items: + # ---- KnitGraphIndex.get_position + bits = value[1:].split(' ') + offset, length = int(bits[0]), int(bits[1]) + pack_readv_requests.append((offset, length, (key, value[0]))) + # linear scan up the pack + pack_readv_requests.sort() + # copy the data + pack_obj = index_map[index] + transport, path = pack_obj.access_tuple() + try: + reader = pack.make_readv_reader(transport, path, + [offset[0:2] for offset in pack_readv_requests]) + except errors.NoSuchFile: + if self._reload_func is not None: + self._reload_func() + raise + for (names, read_func), (_1, _2, (key, eol_flag)) in \ + izip(reader.iter_records(), pack_readv_requests): + raw_data = read_func(None) + # check the header only + if output_lines is not None: + output_lines(knit._parse_record(key[-1], raw_data)[0]) + else: + df, _ = knit._parse_record_header(key, raw_data) + df.close() + pos, size = writer.add_bytes_record(raw_data, names) + write_index.add_node(key, eol_flag + "%d %d" % (pos, size)) + pb.update("Copied record", record_index) + record_index += 1 + + def _copy_nodes_graph(self, index_map, writer, write_index, + readv_group_iter, total_items, output_lines=False): + """Copy knit nodes between packs. + + :param output_lines: Return lines present in the copied data as + an iterator of line,version_id. + """ + pb = ui.ui_factory.nested_progress_bar() + try: + for result in self._do_copy_nodes_graph(index_map, writer, + write_index, output_lines, pb, readv_group_iter, total_items): + yield result + except Exception: + # Python 2.4 does not permit try:finally: in a generator. + pb.finished() + raise + else: + pb.finished() + + def _do_copy_nodes_graph(self, index_map, writer, write_index, + output_lines, pb, readv_group_iter, total_items): + # for record verification + knit = KnitVersionedFiles(None, None) + # for line extraction when requested (inventories only) + if output_lines: + factory = KnitPlainFactory() + record_index = 0 + pb.update("Copied record", record_index, total_items) + for index, readv_vector, node_vector in readv_group_iter: + # copy the data + pack_obj = index_map[index] + transport, path = pack_obj.access_tuple() + try: + reader = pack.make_readv_reader(transport, path, readv_vector) + except errors.NoSuchFile: + if self._reload_func is not None: + self._reload_func() + raise + for (names, read_func), (key, eol_flag, references) in \ + izip(reader.iter_records(), node_vector): + raw_data = read_func(None) + if output_lines: + # read the entire thing + content, _ = knit._parse_record(key[-1], raw_data) + if len(references[-1]) == 0: + line_iterator = factory.get_fulltext_content(content) + else: + line_iterator = factory.get_linedelta_content(content) + for line in line_iterator: + yield line, key + else: + # check the header only + df, _ = knit._parse_record_header(key, raw_data) + df.close() + pos, size = writer.add_bytes_record(raw_data, names) + write_index.add_node(key, eol_flag + "%d %d" % (pos, size), references) + pb.update("Copied record", record_index) + record_index += 1 + + def _process_inventory_lines(self, inv_lines): + """Use up the inv_lines generator and setup a text key filter.""" + repo = self._pack_collection.repo + fileid_revisions = repo._find_file_ids_from_xml_inventory_lines( + inv_lines, self.revision_keys) + text_filter = [] + for fileid, file_revids in fileid_revisions.iteritems(): + text_filter.extend([(fileid, file_revid) for file_revid in file_revids]) + self._text_filter = text_filter + + def _copy_inventory_texts(self): + # select inventory keys + inv_keys = self._revision_keys # currently the same keyspace, and note that + # querying for keys here could introduce a bug where an inventory item + # is missed, so do not change it to query separately without cross + # checking like the text key check below. + inventory_index_map, inventory_indices = self._pack_map_and_index_list( + 'inventory_index') + inv_nodes = self._index_contents(inventory_indices, inv_keys) + # copy inventory keys and adjust values + # XXX: Should be a helper function to allow different inv representation + # at this point. + self.pb.update("Copying inventory texts", 2) + total_items, readv_group_iter = self._least_readv_node_readv(inv_nodes) + # Only grab the output lines if we will be processing them + output_lines = bool(self.revision_ids) + inv_lines = self._copy_nodes_graph(inventory_index_map, + self.new_pack._writer, self.new_pack.inventory_index, + readv_group_iter, total_items, output_lines=output_lines) + if self.revision_ids: + self._process_inventory_lines(inv_lines) + else: + # eat the iterator to cause it to execute. + list(inv_lines) + self._text_filter = None + if 'pack' in debug.debug_flags: + trace.mutter('%s: create_pack: inventories copied: %s%s %d items t+%6.3fs', + time.ctime(), self._pack_collection._upload_transport.base, + self.new_pack.random_name, + self.new_pack.inventory_index.key_count(), + time.time() - self.new_pack.start_time) + + def _update_pack_order(self, entries, index_to_pack_map): + """Determine how we want our packs to be ordered. + + This changes the sort order of the self.packs list so that packs unused + by 'entries' will be at the end of the list, so that future requests + can avoid probing them. Used packs will be at the front of the + self.packs list, in the order of their first use in 'entries'. + + :param entries: A list of (index, ...) tuples + :param index_to_pack_map: A mapping from index objects to pack objects. + """ + packs = [] + seen_indexes = set() + for entry in entries: + index = entry[0] + if index not in seen_indexes: + packs.append(index_to_pack_map[index]) + seen_indexes.add(index) + if len(packs) == len(self.packs): + if 'pack' in debug.debug_flags: + trace.mutter('Not changing pack list, all packs used.') + return + seen_packs = set(packs) + for pack in self.packs: + if pack not in seen_packs: + packs.append(pack) + seen_packs.add(pack) + if 'pack' in debug.debug_flags: + old_names = [p.access_tuple()[1] for p in self.packs] + new_names = [p.access_tuple()[1] for p in packs] + trace.mutter('Reordering packs\nfrom: %s\n to: %s', + old_names, new_names) + self.packs = packs + + def _copy_revision_texts(self): + # select revisions + if self.revision_ids: + revision_keys = [(revision_id,) for revision_id in self.revision_ids] + else: + revision_keys = None + # select revision keys + revision_index_map, revision_indices = self._pack_map_and_index_list( + 'revision_index') + revision_nodes = self._index_contents(revision_indices, revision_keys) + revision_nodes = list(revision_nodes) + self._update_pack_order(revision_nodes, revision_index_map) + # copy revision keys and adjust values + self.pb.update("Copying revision texts", 1) + total_items, readv_group_iter = self._revision_node_readv(revision_nodes) + list(self._copy_nodes_graph(revision_index_map, self.new_pack._writer, + self.new_pack.revision_index, readv_group_iter, total_items)) + if 'pack' in debug.debug_flags: + trace.mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs', + time.ctime(), self._pack_collection._upload_transport.base, + self.new_pack.random_name, + self.new_pack.revision_index.key_count(), + time.time() - self.new_pack.start_time) + self._revision_keys = revision_keys + + def _get_text_nodes(self): + text_index_map, text_indices = self._pack_map_and_index_list( + 'text_index') + return text_index_map, self._index_contents(text_indices, + self._text_filter) + + def _copy_text_texts(self): + # select text keys + text_index_map, text_nodes = self._get_text_nodes() + if self._text_filter is not None: + # We could return the keys copied as part of the return value from + # _copy_nodes_graph but this doesn't work all that well with the + # need to get line output too, so we check separately, and as we're + # going to buffer everything anyway, we check beforehand, which + # saves reading knit data over the wire when we know there are + # mising records. + text_nodes = set(text_nodes) + present_text_keys = set(_node[1] for _node in text_nodes) + missing_text_keys = set(self._text_filter) - present_text_keys + if missing_text_keys: + # TODO: raise a specific error that can handle many missing + # keys. + trace.mutter("missing keys during fetch: %r", missing_text_keys) + a_missing_key = missing_text_keys.pop() + raise errors.RevisionNotPresent(a_missing_key[1], + a_missing_key[0]) + # copy text keys and adjust values + self.pb.update("Copying content texts", 3) + total_items, readv_group_iter = self._least_readv_node_readv(text_nodes) + list(self._copy_nodes_graph(text_index_map, self.new_pack._writer, + self.new_pack.text_index, readv_group_iter, total_items)) + self._log_copied_texts() + + def _create_pack_from_packs(self): + self.pb.update("Opening pack", 0, 5) + self.new_pack = self.open_pack() + new_pack = self.new_pack + # buffer data - we won't be reading-back during the pack creation and + # this makes a significant difference on sftp pushes. + new_pack.set_write_cache_size(1024*1024) + if 'pack' in debug.debug_flags: + plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name) + for a_pack in self.packs] + if self.revision_ids is not None: + rev_count = len(self.revision_ids) + else: + rev_count = 'all' + trace.mutter('%s: create_pack: creating pack from source packs: ' + '%s%s %s revisions wanted %s t=0', + time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name, + plain_pack_list, rev_count) + self._copy_revision_texts() + self._copy_inventory_texts() + self._copy_text_texts() + # select signature keys + signature_filter = self._revision_keys # same keyspace + signature_index_map, signature_indices = self._pack_map_and_index_list( + 'signature_index') + signature_nodes = self._index_contents(signature_indices, + signature_filter) + # copy signature keys and adjust values + self.pb.update("Copying signature texts", 4) + self._copy_nodes(signature_nodes, signature_index_map, new_pack._writer, + new_pack.signature_index) + if 'pack' in debug.debug_flags: + trace.mutter('%s: create_pack: revision signatures copied: %s%s %d items t+%6.3fs', + time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name, + new_pack.signature_index.key_count(), + time.time() - new_pack.start_time) + new_pack._check_references() + if not self._use_pack(new_pack): + new_pack.abort() + return None + self.pb.update("Finishing pack", 5) + new_pack.finish() + self._pack_collection.allocate(new_pack) + return new_pack + + def _least_readv_node_readv(self, nodes): + """Generate request groups for nodes using the least readv's. + + :param nodes: An iterable of graph index nodes. + :return: Total node count and an iterator of the data needed to perform + readvs to obtain the data for nodes. Each item yielded by the + iterator is a tuple with: + index, readv_vector, node_vector. readv_vector is a list ready to + hand to the transport readv method, and node_vector is a list of + (key, eol_flag, references) for the node retrieved by the + matching readv_vector. + """ + # group by pack so we do one readv per pack + nodes = sorted(nodes) + total = len(nodes) + request_groups = {} + for index, key, value, references in nodes: + if index not in request_groups: + request_groups[index] = [] + request_groups[index].append((key, value, references)) + result = [] + for index, items in request_groups.iteritems(): + pack_readv_requests = [] + for key, value, references in items: + # ---- KnitGraphIndex.get_position + bits = value[1:].split(' ') + offset, length = int(bits[0]), int(bits[1]) + pack_readv_requests.append( + ((offset, length), (key, value[0], references))) + # linear scan up the pack to maximum range combining. + pack_readv_requests.sort() + # split out the readv and the node data. + pack_readv = [readv for readv, node in pack_readv_requests] + node_vector = [node for readv, node in pack_readv_requests] + result.append((index, pack_readv, node_vector)) + return total, result + + def _revision_node_readv(self, revision_nodes): + """Return the total revisions and the readv's to issue. + + :param revision_nodes: The revision index contents for the packs being + incorporated into the new pack. + :return: As per _least_readv_node_readv. + """ + return self._least_readv_node_readv(revision_nodes) + + +class KnitReconcilePacker(KnitPacker): + """A packer which regenerates indices etc as it copies. + + This is used by ``bzr reconcile`` to cause parent text pointers to be + regenerated. + """ + + def __init__(self, *args, **kwargs): + super(KnitReconcilePacker, self).__init__(*args, **kwargs) + self._data_changed = False + + def _process_inventory_lines(self, inv_lines): + """Generate a text key reference map rather for reconciling with.""" + repo = self._pack_collection.repo + refs = repo._serializer._find_text_key_references(inv_lines) + self._text_refs = refs + # during reconcile we: + # - convert unreferenced texts to full texts + # - correct texts which reference a text not copied to be full texts + # - copy all others as-is but with corrected parents. + # - so at this point we don't know enough to decide what becomes a full + # text. + self._text_filter = None + + def _copy_text_texts(self): + """generate what texts we should have and then copy.""" + self.pb.update("Copying content texts", 3) + # we have three major tasks here: + # 1) generate the ideal index + repo = self._pack_collection.repo + ancestors = dict([(key[0], tuple(ref[0] for ref in refs[0])) for + _1, key, _2, refs in + self.new_pack.revision_index.iter_all_entries()]) + ideal_index = repo._generate_text_key_index(self._text_refs, ancestors) + # 2) generate a text_nodes list that contains all the deltas that can + # be used as-is, with corrected parents. + ok_nodes = [] + bad_texts = [] + discarded_nodes = [] + NULL_REVISION = _mod_revision.NULL_REVISION + text_index_map, text_nodes = self._get_text_nodes() + for node in text_nodes: + # 0 - index + # 1 - key + # 2 - value + # 3 - refs + try: + ideal_parents = tuple(ideal_index[node[1]]) + except KeyError: + discarded_nodes.append(node) + self._data_changed = True + else: + if ideal_parents == (NULL_REVISION,): + ideal_parents = () + if ideal_parents == node[3][0]: + # no change needed. + ok_nodes.append(node) + elif ideal_parents[0:1] == node[3][0][0:1]: + # the left most parent is the same, or there are no parents + # today. Either way, we can preserve the representation as + # long as we change the refs to be inserted. + self._data_changed = True + ok_nodes.append((node[0], node[1], node[2], + (ideal_parents, node[3][1]))) + self._data_changed = True + else: + # Reinsert this text completely + bad_texts.append((node[1], ideal_parents)) + self._data_changed = True + # we're finished with some data. + del ideal_index + del text_nodes + # 3) bulk copy the ok data + total_items, readv_group_iter = self._least_readv_node_readv(ok_nodes) + list(self._copy_nodes_graph(text_index_map, self.new_pack._writer, + self.new_pack.text_index, readv_group_iter, total_items)) + # 4) adhoc copy all the other texts. + # We have to topologically insert all texts otherwise we can fail to + # reconcile when parts of a single delta chain are preserved intact, + # and other parts are not. E.g. Discarded->d1->d2->d3. d1 will be + # reinserted, and if d3 has incorrect parents it will also be + # reinserted. If we insert d3 first, d2 is present (as it was bulk + # copied), so we will try to delta, but d2 is not currently able to be + # extracted because its basis d1 is not present. Topologically sorting + # addresses this. The following generates a sort for all the texts that + # are being inserted without having to reference the entire text key + # space (we only topo sort the revisions, which is smaller). + topo_order = tsort.topo_sort(ancestors) + rev_order = dict(zip(topo_order, range(len(topo_order)))) + bad_texts.sort(key=lambda key:rev_order.get(key[0][1], 0)) + transaction = repo.get_transaction() + file_id_index = GraphIndexPrefixAdapter( + self.new_pack.text_index, + ('blank', ), 1, + add_nodes_callback=self.new_pack.text_index.add_nodes) + data_access = _DirectPackAccess( + {self.new_pack.text_index:self.new_pack.access_tuple()}) + data_access.set_writer(self.new_pack._writer, self.new_pack.text_index, + self.new_pack.access_tuple()) + output_texts = KnitVersionedFiles( + _KnitGraphIndex(self.new_pack.text_index, + add_callback=self.new_pack.text_index.add_nodes, + deltas=True, parents=True, is_locked=repo.is_locked), + data_access=data_access, max_delta_chain=200) + for key, parent_keys in bad_texts: + # We refer to the new pack to delta data being output. + # A possible improvement would be to catch errors on short reads + # and only flush then. + self.new_pack.flush() + parents = [] + for parent_key in parent_keys: + if parent_key[0] != key[0]: + # Graph parents must match the fileid + raise errors.BzrError('Mismatched key parent %r:%r' % + (key, parent_keys)) + parents.append(parent_key[1]) + text_lines = osutils.split_lines(repo.texts.get_record_stream( + [key], 'unordered', True).next().get_bytes_as('fulltext')) + output_texts.add_lines(key, parent_keys, text_lines, + random_id=True, check_content=False) + # 5) check that nothing inserted has a reference outside the keyspace. + missing_text_keys = self.new_pack.text_index._external_references() + if missing_text_keys: + raise errors.BzrCheckError('Reference to missing compression parents %r' + % (missing_text_keys,)) + self._log_copied_texts() + + def _use_pack(self, new_pack): + """Override _use_pack to check for reconcile having changed content.""" + # XXX: we might be better checking this at the copy time. + original_inventory_keys = set() + inv_index = self._pack_collection.inventory_index.combined_index + for entry in inv_index.iter_all_entries(): + original_inventory_keys.add(entry[1]) + new_inventory_keys = set() + for entry in new_pack.inventory_index.iter_all_entries(): + new_inventory_keys.add(entry[1]) + if new_inventory_keys != original_inventory_keys: + self._data_changed = True + return new_pack.data_inserted() and self._data_changed + + +class OptimisingKnitPacker(KnitPacker): + """A packer which spends more time to create better disk layouts.""" + + def _revision_node_readv(self, revision_nodes): + """Return the total revisions and the readv's to issue. + + This sort places revisions in topological order with the ancestors + after the children. + + :param revision_nodes: The revision index contents for the packs being + incorporated into the new pack. + :return: As per _least_readv_node_readv. + """ + # build an ancestors dict + ancestors = {} + by_key = {} + for index, key, value, references in revision_nodes: + ancestors[key] = references[0] + by_key[key] = (index, value, references) + order = tsort.topo_sort(ancestors) + total = len(order) + # Single IO is pathological, but it will work as a starting point. + requests = [] + for key in reversed(order): + index, value, references = by_key[key] + # ---- KnitGraphIndex.get_position + bits = value[1:].split(' ') + offset, length = int(bits[0]), int(bits[1]) + requests.append( + (index, [(offset, length)], [(key, value[0], references)])) + # TODO: combine requests in the same index that are in ascending order. + return total, requests + + def open_pack(self): + """Open a pack for the pack we are creating.""" + new_pack = super(OptimisingKnitPacker, self).open_pack() + # Turn on the optimization flags for all the index builders. + new_pack.revision_index.set_optimize(for_size=True) + new_pack.inventory_index.set_optimize(for_size=True) + new_pack.text_index.set_optimize(for_size=True) + new_pack.signature_index.set_optimize(for_size=True) + return new_pack + + +class KnitRepositoryPackCollection(RepositoryPackCollection): + """A knit pack collection.""" + + pack_factory = NewPack + resumed_pack_factory = ResumedPack + normal_packer_class = KnitPacker + optimising_packer_class = OptimisingKnitPacker + + + diff --git a/bzrlib/repofmt/knitrepo.py b/bzrlib/repofmt/knitrepo.py new file mode 100644 index 0000000..624c4ea --- /dev/null +++ b/bzrlib/repofmt/knitrepo.py @@ -0,0 +1,522 @@ +# Copyright (C) 2007-2010 Canonical Ltd +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +from __future__ import absolute_import + +from bzrlib.lazy_import import lazy_import +lazy_import(globals(), """ +import itertools + +from bzrlib import ( + controldir, + errors, + knit as _mod_knit, + lockable_files, + lockdir, + osutils, + revision as _mod_revision, + trace, + transactions, + versionedfile, + xml5, + xml6, + xml7, + ) +""") +from bzrlib.decorators import needs_read_lock, needs_write_lock +from bzrlib.repository import ( + InterRepository, + IsInWriteGroupError, + RepositoryFormatMetaDir, + ) +from bzrlib.vf_repository import ( + InterSameDataRepository, + MetaDirVersionedFileRepository, + MetaDirVersionedFileRepositoryFormat, + VersionedFileCommitBuilder, + VersionedFileRootCommitBuilder, + ) +from bzrlib import symbol_versioning + + +class _KnitParentsProvider(object): + + def __init__(self, knit): + self._knit = knit + + def __repr__(self): + return 'KnitParentsProvider(%r)' % self._knit + + def get_parent_map(self, keys): + """See graph.StackedParentsProvider.get_parent_map""" + parent_map = {} + for revision_id in keys: + if revision_id is None: + raise ValueError('get_parent_map(None) is not valid') + if revision_id == _mod_revision.NULL_REVISION: + parent_map[revision_id] = () + else: + try: + parents = tuple( + self._knit.get_parents_with_ghosts(revision_id)) + except errors.RevisionNotPresent: + continue + else: + if len(parents) == 0: + parents = (_mod_revision.NULL_REVISION,) + parent_map[revision_id] = parents + return parent_map + + +class _KnitsParentsProvider(object): + + def __init__(self, knit, prefix=()): + """Create a parent provider for string keys mapped to tuple keys.""" + self._knit = knit + self._prefix = prefix + + def __repr__(self): + return 'KnitsParentsProvider(%r)' % self._knit + + def get_parent_map(self, keys): + """See graph.StackedParentsProvider.get_parent_map""" + parent_map = self._knit.get_parent_map( + [self._prefix + (key,) for key in keys]) + result = {} + for key, parents in parent_map.items(): + revid = key[-1] + if len(parents) == 0: + parents = (_mod_revision.NULL_REVISION,) + else: + parents = tuple(parent[-1] for parent in parents) + result[revid] = parents + for revision_id in keys: + if revision_id == _mod_revision.NULL_REVISION: + result[revision_id] = () + return result + + +class KnitRepository(MetaDirVersionedFileRepository): + """Knit format repository.""" + + # These attributes are inherited from the Repository base class. Setting + # them to None ensures that if the constructor is changed to not initialize + # them, or a subclass fails to call the constructor, that an error will + # occur rather than the system working but generating incorrect data. + _commit_builder_class = None + _serializer = None + + def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class, + _serializer): + super(KnitRepository, self).__init__(_format, a_bzrdir, control_files) + self._commit_builder_class = _commit_builder_class + self._serializer = _serializer + self._reconcile_fixes_text_parents = True + + @needs_read_lock + def _all_revision_ids(self): + """See Repository.all_revision_ids().""" + return [key[0] for key in self.revisions.keys()] + + def _activate_new_inventory(self): + """Put a replacement inventory.new into use as inventories.""" + # Copy the content across + t = self._transport + t.copy('inventory.new.kndx', 'inventory.kndx') + try: + t.copy('inventory.new.knit', 'inventory.knit') + except errors.NoSuchFile: + # empty inventories knit + t.delete('inventory.knit') + # delete the temp inventory + t.delete('inventory.new.kndx') + try: + t.delete('inventory.new.knit') + except errors.NoSuchFile: + # empty inventories knit + pass + # Force index reload (sanity check) + self.inventories._index._reset_cache() + self.inventories.keys() + + def _backup_inventory(self): + t = self._transport + t.copy('inventory.kndx', 'inventory.backup.kndx') + t.copy('inventory.knit', 'inventory.backup.knit') + + def _move_file_id(self, from_id, to_id): + t = self._transport.clone('knits') + from_rel_url = self.texts._index._mapper.map((from_id, None)) + to_rel_url = self.texts._index._mapper.map((to_id, None)) + # We expect both files to always exist in this case. + for suffix in ('.knit', '.kndx'): + t.rename(from_rel_url + suffix, to_rel_url + suffix) + + def _remove_file_id(self, file_id): + t = self._transport.clone('knits') + rel_url = self.texts._index._mapper.map((file_id, None)) + for suffix in ('.kndx', '.knit'): + try: + t.delete(rel_url + suffix) + except errors.NoSuchFile: + pass + + def _temp_inventories(self): + result = self._format._get_inventories(self._transport, self, + 'inventory.new') + # Reconciling when the output has no revisions would result in no + # writes - but we want to ensure there is an inventory for + # compatibility with older clients that don't lazy-load. + result.get_parent_map([('A',)]) + return result + + @needs_read_lock + def get_revision(self, revision_id): + """Return the Revision object for a named revision""" + revision_id = osutils.safe_revision_id(revision_id) + return self.get_revision_reconcile(revision_id) + + def _refresh_data(self): + if not self.is_locked(): + return + if self.is_in_write_group(): + raise IsInWriteGroupError(self) + # Create a new transaction to force all knits to see the scope change. + # This is safe because we're outside a write group. + self.control_files._finish_transaction() + if self.is_write_locked(): + self.control_files._set_write_transaction() + else: + self.control_files._set_read_transaction() + + @needs_write_lock + def reconcile(self, other=None, thorough=False): + """Reconcile this repository.""" + from bzrlib.reconcile import KnitReconciler + reconciler = KnitReconciler(self, thorough=thorough) + reconciler.reconcile() + return reconciler + + def _make_parents_provider(self): + return _KnitsParentsProvider(self.revisions) + + +class RepositoryFormatKnit(MetaDirVersionedFileRepositoryFormat): + """Bzr repository knit format (generalized). + + This repository format has: + - knits for file texts and inventory + - hash subdirectory based stores. + - knits for revisions and signatures + - TextStores for revisions and signatures. + - a format marker of its own + - an optional 'shared-storage' flag + - an optional 'no-working-trees' flag + - a LockDir lock + """ + + # Set this attribute in derived classes to control the repository class + # created by open and initialize. + repository_class = None + # Set this attribute in derived classes to control the + # _commit_builder_class that the repository objects will have passed to + # their constructor. + _commit_builder_class = None + # Set this attribute in derived clases to control the _serializer that the + # repository objects will have passed to their constructor. + @property + def _serializer(self): + return xml5.serializer_v5 + # Knit based repositories handle ghosts reasonably well. + supports_ghosts = True + # External lookups are not supported in this format. + supports_external_lookups = False + # No CHK support. + supports_chks = False + _fetch_order = 'topological' + _fetch_uses_deltas = True + fast_deltas = False + supports_funky_characters = True + # The revision.kndx could potentially claim a revision has a different + # parent to the revision text. + revision_graph_can_have_wrong_parents = True + + def _get_inventories(self, repo_transport, repo, name='inventory'): + mapper = versionedfile.ConstantMapper(name) + index = _mod_knit._KndxIndex(repo_transport, mapper, + repo.get_transaction, repo.is_write_locked, repo.is_locked) + access = _mod_knit._KnitKeyAccess(repo_transport, mapper) + return _mod_knit.KnitVersionedFiles(index, access, annotated=False) + + def _get_revisions(self, repo_transport, repo): + mapper = versionedfile.ConstantMapper('revisions') + index = _mod_knit._KndxIndex(repo_transport, mapper, + repo.get_transaction, repo.is_write_locked, repo.is_locked) + access = _mod_knit._KnitKeyAccess(repo_transport, mapper) + return _mod_knit.KnitVersionedFiles(index, access, max_delta_chain=0, + annotated=False) + + def _get_signatures(self, repo_transport, repo): + mapper = versionedfile.ConstantMapper('signatures') + index = _mod_knit._KndxIndex(repo_transport, mapper, + repo.get_transaction, repo.is_write_locked, repo.is_locked) + access = _mod_knit._KnitKeyAccess(repo_transport, mapper) + return _mod_knit.KnitVersionedFiles(index, access, max_delta_chain=0, + annotated=False) + + def _get_texts(self, repo_transport, repo): + mapper = versionedfile.HashEscapedPrefixMapper() + base_transport = repo_transport.clone('knits') + index = _mod_knit._KndxIndex(base_transport, mapper, + repo.get_transaction, repo.is_write_locked, repo.is_locked) + access = _mod_knit._KnitKeyAccess(base_transport, mapper) + return _mod_knit.KnitVersionedFiles(index, access, max_delta_chain=200, + annotated=True) + + def initialize(self, a_bzrdir, shared=False): + """Create a knit format 1 repository. + + :param a_bzrdir: bzrdir to contain the new repository; must already + be initialized. + :param shared: If true the repository will be initialized as a shared + repository. + """ + trace.mutter('creating repository in %s.', a_bzrdir.transport.base) + dirs = ['knits'] + files = [] + utf8_files = [('format', self.get_format_string())] + + self._upload_blank_content(a_bzrdir, dirs, files, utf8_files, shared) + repo_transport = a_bzrdir.get_repository_transport(None) + control_files = lockable_files.LockableFiles(repo_transport, + 'lock', lockdir.LockDir) + transaction = transactions.WriteTransaction() + result = self.open(a_bzrdir=a_bzrdir, _found=True) + result.lock_write() + # the revision id here is irrelevant: it will not be stored, and cannot + # already exist, we do this to create files on disk for older clients. + result.inventories.get_parent_map([('A',)]) + result.revisions.get_parent_map([('A',)]) + result.signatures.get_parent_map([('A',)]) + result.unlock() + self._run_post_repo_init_hooks(result, a_bzrdir, shared) + return result + + def open(self, a_bzrdir, _found=False, _override_transport=None): + """See RepositoryFormat.open(). + + :param _override_transport: INTERNAL USE ONLY. Allows opening the + repository at a slightly different url + than normal. I.e. during 'upgrade'. + """ + if not _found: + format = RepositoryFormatMetaDir.find_format(a_bzrdir) + if _override_transport is not None: + repo_transport = _override_transport + else: + repo_transport = a_bzrdir.get_repository_transport(None) + control_files = lockable_files.LockableFiles(repo_transport, + 'lock', lockdir.LockDir) + repo = self.repository_class(_format=self, + a_bzrdir=a_bzrdir, + control_files=control_files, + _commit_builder_class=self._commit_builder_class, + _serializer=self._serializer) + repo.revisions = self._get_revisions(repo_transport, repo) + repo.signatures = self._get_signatures(repo_transport, repo) + repo.inventories = self._get_inventories(repo_transport, repo) + repo.texts = self._get_texts(repo_transport, repo) + repo.chk_bytes = None + repo._transport = repo_transport + return repo + + +class RepositoryFormatKnit1(RepositoryFormatKnit): + """Bzr repository knit format 1. + + This repository format has: + - knits for file texts and inventory + - hash subdirectory based stores. + - knits for revisions and signatures + - TextStores for revisions and signatures. + - a format marker of its own + - an optional 'shared-storage' flag + - an optional 'no-working-trees' flag + - a LockDir lock + + This format was introduced in bzr 0.8. + """ + + repository_class = KnitRepository + _commit_builder_class = VersionedFileCommitBuilder + @property + def _serializer(self): + return xml5.serializer_v5 + + def __ne__(self, other): + return self.__class__ is not other.__class__ + + @classmethod + def get_format_string(cls): + """See RepositoryFormat.get_format_string().""" + return "Bazaar-NG Knit Repository Format 1" + + def get_format_description(self): + """See RepositoryFormat.get_format_description().""" + return "Knit repository format 1" + + +class RepositoryFormatKnit3(RepositoryFormatKnit): + """Bzr repository knit format 3. + + This repository format has: + - knits for file texts and inventory + - hash subdirectory based stores. + - knits for revisions and signatures + - TextStores for revisions and signatures. + - a format marker of its own + - an optional 'shared-storage' flag + - an optional 'no-working-trees' flag + - a LockDir lock + - support for recording full info about the tree root + - support for recording tree-references + """ + + repository_class = KnitRepository + _commit_builder_class = VersionedFileRootCommitBuilder + rich_root_data = True + experimental = True + supports_tree_reference = True + @property + def _serializer(self): + return xml7.serializer_v7 + + def _get_matching_bzrdir(self): + return controldir.format_registry.make_bzrdir('dirstate-with-subtree') + + def _ignore_setting_bzrdir(self, format): + pass + + _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir) + + @classmethod + def get_format_string(cls): + """See RepositoryFormat.get_format_string().""" + return "Bazaar Knit Repository Format 3 (bzr 0.15)\n" + + def get_format_description(self): + """See RepositoryFormat.get_format_description().""" + return "Knit repository format 3" + + +class RepositoryFormatKnit4(RepositoryFormatKnit): + """Bzr repository knit format 4. + + This repository format has everything in format 3, except for + tree-references: + - knits for file texts and inventory + - hash subdirectory based stores. + - knits for revisions and signatures + - TextStores for revisions and signatures. + - a format marker of its own + - an optional 'shared-storage' flag + - an optional 'no-working-trees' flag + - a LockDir lock + - support for recording full info about the tree root + """ + + repository_class = KnitRepository + _commit_builder_class = VersionedFileRootCommitBuilder + rich_root_data = True + supports_tree_reference = False + @property + def _serializer(self): + return xml6.serializer_v6 + + def _get_matching_bzrdir(self): + return controldir.format_registry.make_bzrdir('rich-root') + + def _ignore_setting_bzrdir(self, format): + pass + + _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir) + + @classmethod + def get_format_string(cls): + """See RepositoryFormat.get_format_string().""" + return 'Bazaar Knit Repository Format 4 (bzr 1.0)\n' + + def get_format_description(self): + """See RepositoryFormat.get_format_description().""" + return "Knit repository format 4" + + +class InterKnitRepo(InterSameDataRepository): + """Optimised code paths between Knit based repositories.""" + + @classmethod + def _get_repo_format_to_test(self): + return RepositoryFormatKnit1() + + @staticmethod + def is_compatible(source, target): + """Be compatible with known Knit formats. + + We don't test for the stores being of specific types because that + could lead to confusing results, and there is no need to be + overly general. + """ + try: + are_knits = (isinstance(source._format, RepositoryFormatKnit) and + isinstance(target._format, RepositoryFormatKnit)) + except AttributeError: + return False + return are_knits and InterRepository._same_model(source, target) + + @needs_read_lock + def search_missing_revision_ids(self, + find_ghosts=True, revision_ids=None, if_present_ids=None, + limit=None): + """See InterRepository.search_missing_revision_ids().""" + source_ids_set = self._present_source_revisions_for( + revision_ids, if_present_ids) + # source_ids is the worst possible case we may need to pull. + # now we want to filter source_ids against what we actually + # have in target, but don't try to check for existence where we know + # we do not have a revision as that would be pointless. + target_ids = set(self.target.all_revision_ids()) + possibly_present_revisions = target_ids.intersection(source_ids_set) + actually_present_revisions = set( + self.target._eliminate_revisions_not_present(possibly_present_revisions)) + required_revisions = source_ids_set.difference(actually_present_revisions) + if revision_ids is not None: + # we used get_ancestry to determine source_ids then we are assured all + # revisions referenced are present as they are installed in topological order. + # and the tip revision was validated by get_ancestry. + result_set = required_revisions + else: + # if we just grabbed the possibly available ids, then + # we only have an estimate of whats available and need to validate + # that against the revision records. + result_set = set( + self.source._eliminate_revisions_not_present(required_revisions)) + if limit is not None: + topo_ordered = self.source.get_graph().iter_topo_order(result_set) + result_set = set(itertools.islice(topo_ordered, limit)) + return self.source.revision_ids_to_search_result(result_set) + + +InterRepository.register_optimiser(InterKnitRepo) diff --git a/bzrlib/repofmt/pack_repo.py b/bzrlib/repofmt/pack_repo.py new file mode 100644 index 0000000..d513d95 --- /dev/null +++ b/bzrlib/repofmt/pack_repo.py @@ -0,0 +1,2091 @@ +# Copyright (C) 2007-2011 Canonical Ltd +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +from __future__ import absolute_import + +import re +import sys + +from bzrlib.lazy_import import lazy_import +lazy_import(globals(), """ +from itertools import izip +import time + +from bzrlib import ( + chk_map, + cleanup, + config, + debug, + graph, + osutils, + pack, + transactions, + tsort, + ui, + ) +from bzrlib.index import ( + CombinedGraphIndex, + GraphIndexPrefixAdapter, + ) +""") +from bzrlib import ( + btree_index, + errors, + lockable_files, + lockdir, + ) + +from bzrlib.decorators import ( + needs_read_lock, + needs_write_lock, + only_raises, + ) +from bzrlib.lock import LogicalLockResult +from bzrlib.repository import ( + _LazyListJoin, + MetaDirRepository, + RepositoryFormatMetaDir, + RepositoryWriteLockResult, + ) +from bzrlib.vf_repository import ( + MetaDirVersionedFileRepository, + MetaDirVersionedFileRepositoryFormat, + VersionedFileCommitBuilder, + VersionedFileRootCommitBuilder, + ) +from bzrlib.trace import ( + mutter, + note, + warning, + ) + + +class PackCommitBuilder(VersionedFileCommitBuilder): + """Subclass of VersionedFileCommitBuilder to add texts with pack semantics. + + Specifically this uses one knit object rather than one knit object per + added text, reducing memory and object pressure. + """ + + def __init__(self, repository, parents, config, timestamp=None, + timezone=None, committer=None, revprops=None, + revision_id=None, lossy=False): + VersionedFileCommitBuilder.__init__(self, repository, parents, config, + timestamp=timestamp, timezone=timezone, committer=committer, + revprops=revprops, revision_id=revision_id, lossy=lossy) + self._file_graph = graph.Graph( + repository._pack_collection.text_index.combined_index) + + def _heads(self, file_id, revision_ids): + keys = [(file_id, revision_id) for revision_id in revision_ids] + return set([key[1] for key in self._file_graph.heads(keys)]) + + +class PackRootCommitBuilder(VersionedFileRootCommitBuilder): + """A subclass of RootCommitBuilder to add texts with pack semantics. + + Specifically this uses one knit object rather than one knit object per + added text, reducing memory and object pressure. + """ + + def __init__(self, repository, parents, config, timestamp=None, + timezone=None, committer=None, revprops=None, + revision_id=None, lossy=False): + super(PackRootCommitBuilder, self).__init__(repository, parents, + config, timestamp=timestamp, timezone=timezone, + committer=committer, revprops=revprops, revision_id=revision_id, + lossy=lossy) + self._file_graph = graph.Graph( + repository._pack_collection.text_index.combined_index) + + def _heads(self, file_id, revision_ids): + keys = [(file_id, revision_id) for revision_id in revision_ids] + return set([key[1] for key in self._file_graph.heads(keys)]) + + +class Pack(object): + """An in memory proxy for a pack and its indices. + + This is a base class that is not directly used, instead the classes + ExistingPack and NewPack are used. + """ + + # A map of index 'type' to the file extension and position in the + # index_sizes array. + index_definitions = { + 'chk': ('.cix', 4), + 'revision': ('.rix', 0), + 'inventory': ('.iix', 1), + 'text': ('.tix', 2), + 'signature': ('.six', 3), + } + + def __init__(self, revision_index, inventory_index, text_index, + signature_index, chk_index=None): + """Create a pack instance. + + :param revision_index: A GraphIndex for determining what revisions are + present in the Pack and accessing the locations of their texts. + :param inventory_index: A GraphIndex for determining what inventories are + present in the Pack and accessing the locations of their + texts/deltas. + :param text_index: A GraphIndex for determining what file texts + are present in the pack and accessing the locations of their + texts/deltas (via (fileid, revisionid) tuples). + :param signature_index: A GraphIndex for determining what signatures are + present in the Pack and accessing the locations of their texts. + :param chk_index: A GraphIndex for accessing content by CHK, if the + pack has one. + """ + self.revision_index = revision_index + self.inventory_index = inventory_index + self.text_index = text_index + self.signature_index = signature_index + self.chk_index = chk_index + + def access_tuple(self): + """Return a tuple (transport, name) for the pack content.""" + return self.pack_transport, self.file_name() + + def _check_references(self): + """Make sure our external references are present. + + Packs are allowed to have deltas whose base is not in the pack, but it + must be present somewhere in this collection. It is not allowed to + have deltas based on a fallback repository. + (See <https://bugs.launchpad.net/bzr/+bug/288751>) + """ + missing_items = {} + for (index_name, external_refs, index) in [ + ('texts', + self._get_external_refs(self.text_index), + self._pack_collection.text_index.combined_index), + ('inventories', + self._get_external_refs(self.inventory_index), + self._pack_collection.inventory_index.combined_index), + ]: + missing = external_refs.difference( + k for (idx, k, v, r) in + index.iter_entries(external_refs)) + if missing: + missing_items[index_name] = sorted(list(missing)) + if missing_items: + from pprint import pformat + raise errors.BzrCheckError( + "Newly created pack file %r has delta references to " + "items not in its repository:\n%s" + % (self, pformat(missing_items))) + + def file_name(self): + """Get the file name for the pack on disk.""" + return self.name + '.pack' + + def get_revision_count(self): + return self.revision_index.key_count() + + def index_name(self, index_type, name): + """Get the disk name of an index type for pack name 'name'.""" + return name + Pack.index_definitions[index_type][0] + + def index_offset(self, index_type): + """Get the position in a index_size array for a given index type.""" + return Pack.index_definitions[index_type][1] + + def inventory_index_name(self, name): + """The inv index is the name + .iix.""" + return self.index_name('inventory', name) + + def revision_index_name(self, name): + """The revision index is the name + .rix.""" + return self.index_name('revision', name) + + def signature_index_name(self, name): + """The signature index is the name + .six.""" + return self.index_name('signature', name) + + def text_index_name(self, name): + """The text index is the name + .tix.""" + return self.index_name('text', name) + + def _replace_index_with_readonly(self, index_type): + unlimited_cache = False + if index_type == 'chk': + unlimited_cache = True + index = self.index_class(self.index_transport, + self.index_name(index_type, self.name), + self.index_sizes[self.index_offset(index_type)], + unlimited_cache=unlimited_cache) + if index_type == 'chk': + index._leaf_factory = btree_index._gcchk_factory + setattr(self, index_type + '_index', index) + + +class ExistingPack(Pack): + """An in memory proxy for an existing .pack and its disk indices.""" + + def __init__(self, pack_transport, name, revision_index, inventory_index, + text_index, signature_index, chk_index=None): + """Create an ExistingPack object. + + :param pack_transport: The transport where the pack file resides. + :param name: The name of the pack on disk in the pack_transport. + """ + Pack.__init__(self, revision_index, inventory_index, text_index, + signature_index, chk_index) + self.name = name + self.pack_transport = pack_transport + if None in (revision_index, inventory_index, text_index, + signature_index, name, pack_transport): + raise AssertionError() + + def __eq__(self, other): + return self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not self.__eq__(other) + + def __repr__(self): + return "<%s.%s object at 0x%x, %s, %s" % ( + self.__class__.__module__, self.__class__.__name__, id(self), + self.pack_transport, self.name) + + +class ResumedPack(ExistingPack): + + def __init__(self, name, revision_index, inventory_index, text_index, + signature_index, upload_transport, pack_transport, index_transport, + pack_collection, chk_index=None): + """Create a ResumedPack object.""" + ExistingPack.__init__(self, pack_transport, name, revision_index, + inventory_index, text_index, signature_index, + chk_index=chk_index) + self.upload_transport = upload_transport + self.index_transport = index_transport + self.index_sizes = [None, None, None, None] + indices = [ + ('revision', revision_index), + ('inventory', inventory_index), + ('text', text_index), + ('signature', signature_index), + ] + if chk_index is not None: + indices.append(('chk', chk_index)) + self.index_sizes.append(None) + for index_type, index in indices: + offset = self.index_offset(index_type) + self.index_sizes[offset] = index._size + self.index_class = pack_collection._index_class + self._pack_collection = pack_collection + self._state = 'resumed' + # XXX: perhaps check that the .pack file exists? + + def access_tuple(self): + if self._state == 'finished': + return Pack.access_tuple(self) + elif self._state == 'resumed': + return self.upload_transport, self.file_name() + else: + raise AssertionError(self._state) + + def abort(self): + self.upload_transport.delete(self.file_name()) + indices = [self.revision_index, self.inventory_index, self.text_index, + self.signature_index] + if self.chk_index is not None: + indices.append(self.chk_index) + for index in indices: + index._transport.delete(index._name) + + def finish(self): + self._check_references() + index_types = ['revision', 'inventory', 'text', 'signature'] + if self.chk_index is not None: + index_types.append('chk') + for index_type in index_types: + old_name = self.index_name(index_type, self.name) + new_name = '../indices/' + old_name + self.upload_transport.move(old_name, new_name) + self._replace_index_with_readonly(index_type) + new_name = '../packs/' + self.file_name() + self.upload_transport.move(self.file_name(), new_name) + self._state = 'finished' + + def _get_external_refs(self, index): + """Return compression parents for this index that are not present. + + This returns any compression parents that are referenced by this index, + which are not contained *in* this index. They may be present elsewhere. + """ + return index.external_references(1) + + +class NewPack(Pack): + """An in memory proxy for a pack which is being created.""" + + def __init__(self, pack_collection, upload_suffix='', file_mode=None): + """Create a NewPack instance. + + :param pack_collection: A PackCollection into which this is being inserted. + :param upload_suffix: An optional suffix to be given to any temporary + files created during the pack creation. e.g '.autopack' + :param file_mode: Unix permissions for newly created file. + """ + # The relative locations of the packs are constrained, but all are + # passed in because the caller has them, so as to avoid object churn. + index_builder_class = pack_collection._index_builder_class + if pack_collection.chk_index is not None: + chk_index = index_builder_class(reference_lists=0) + else: + chk_index = None + Pack.__init__(self, + # Revisions: parents list, no text compression. + index_builder_class(reference_lists=1), + # Inventory: We want to map compression only, but currently the + # knit code hasn't been updated enough to understand that, so we + # have a regular 2-list index giving parents and compression + # source. + index_builder_class(reference_lists=2), + # Texts: compression and per file graph, for all fileids - so two + # reference lists and two elements in the key tuple. + index_builder_class(reference_lists=2, key_elements=2), + # Signatures: Just blobs to store, no compression, no parents + # listing. + index_builder_class(reference_lists=0), + # CHK based storage - just blobs, no compression or parents. + chk_index=chk_index + ) + self._pack_collection = pack_collection + # When we make readonly indices, we need this. + self.index_class = pack_collection._index_class + # where should the new pack be opened + self.upload_transport = pack_collection._upload_transport + # where are indices written out to + self.index_transport = pack_collection._index_transport + # where is the pack renamed to when it is finished? + self.pack_transport = pack_collection._pack_transport + # What file mode to upload the pack and indices with. + self._file_mode = file_mode + # tracks the content written to the .pack file. + self._hash = osutils.md5() + # a tuple with the length in bytes of the indices, once the pack + # is finalised. (rev, inv, text, sigs, chk_if_in_use) + self.index_sizes = None + # How much data to cache when writing packs. Note that this is not + # synchronised with reads, because it's not in the transport layer, so + # is not safe unless the client knows it won't be reading from the pack + # under creation. + self._cache_limit = 0 + # the temporary pack file name. + self.random_name = osutils.rand_chars(20) + upload_suffix + # when was this pack started ? + self.start_time = time.time() + # open an output stream for the data added to the pack. + self.write_stream = self.upload_transport.open_write_stream( + self.random_name, mode=self._file_mode) + if 'pack' in debug.debug_flags: + mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs', + time.ctime(), self.upload_transport.base, self.random_name, + time.time() - self.start_time) + # A list of byte sequences to be written to the new pack, and the + # aggregate size of them. Stored as a list rather than separate + # variables so that the _write_data closure below can update them. + self._buffer = [[], 0] + # create a callable for adding data + # + # robertc says- this is a closure rather than a method on the object + # so that the variables are locals, and faster than accessing object + # members. + def _write_data(bytes, flush=False, _buffer=self._buffer, + _write=self.write_stream.write, _update=self._hash.update): + _buffer[0].append(bytes) + _buffer[1] += len(bytes) + # buffer cap + if _buffer[1] > self._cache_limit or flush: + bytes = ''.join(_buffer[0]) + _write(bytes) + _update(bytes) + _buffer[:] = [[], 0] + # expose this on self, for the occasion when clients want to add data. + self._write_data = _write_data + # a pack writer object to serialise pack records. + self._writer = pack.ContainerWriter(self._write_data) + self._writer.begin() + # what state is the pack in? (open, finished, aborted) + self._state = 'open' + # no name until we finish writing the content + self.name = None + + def abort(self): + """Cancel creating this pack.""" + self._state = 'aborted' + self.write_stream.close() + # Remove the temporary pack file. + self.upload_transport.delete(self.random_name) + # The indices have no state on disk. + + def access_tuple(self): + """Return a tuple (transport, name) for the pack content.""" + if self._state == 'finished': + return Pack.access_tuple(self) + elif self._state == 'open': + return self.upload_transport, self.random_name + else: + raise AssertionError(self._state) + + def data_inserted(self): + """True if data has been added to this pack.""" + return bool(self.get_revision_count() or + self.inventory_index.key_count() or + self.text_index.key_count() or + self.signature_index.key_count() or + (self.chk_index is not None and self.chk_index.key_count())) + + def finish_content(self): + if self.name is not None: + return + self._writer.end() + if self._buffer[1]: + self._write_data('', flush=True) + self.name = self._hash.hexdigest() + + def finish(self, suspend=False): + """Finish the new pack. + + This: + - finalises the content + - assigns a name (the md5 of the content, currently) + - writes out the associated indices + - renames the pack into place. + - stores the index size tuple for the pack in the index_sizes + attribute. + """ + self.finish_content() + if not suspend: + self._check_references() + # write indices + # XXX: It'd be better to write them all to temporary names, then + # rename them all into place, so that the window when only some are + # visible is smaller. On the other hand none will be seen until + # they're in the names list. + self.index_sizes = [None, None, None, None] + self._write_index('revision', self.revision_index, 'revision', + suspend) + self._write_index('inventory', self.inventory_index, 'inventory', + suspend) + self._write_index('text', self.text_index, 'file texts', suspend) + self._write_index('signature', self.signature_index, + 'revision signatures', suspend) + if self.chk_index is not None: + self.index_sizes.append(None) + self._write_index('chk', self.chk_index, + 'content hash bytes', suspend) + self.write_stream.close( + want_fdatasync=self._pack_collection.config_stack.get('repository.fdatasync')) + # Note that this will clobber an existing pack with the same name, + # without checking for hash collisions. While this is undesirable this + # is something that can be rectified in a subsequent release. One way + # to rectify it may be to leave the pack at the original name, writing + # its pack-names entry as something like 'HASH: index-sizes + # temporary-name'. Allocate that and check for collisions, if it is + # collision free then rename it into place. If clients know this scheme + # they can handle missing-file errors by: + # - try for HASH.pack + # - try for temporary-name + # - refresh the pack-list to see if the pack is now absent + new_name = self.name + '.pack' + if not suspend: + new_name = '../packs/' + new_name + self.upload_transport.move(self.random_name, new_name) + self._state = 'finished' + if 'pack' in debug.debug_flags: + # XXX: size might be interesting? + mutter('%s: create_pack: pack finished: %s%s->%s t+%6.3fs', + time.ctime(), self.upload_transport.base, self.random_name, + new_name, time.time() - self.start_time) + + def flush(self): + """Flush any current data.""" + if self._buffer[1]: + bytes = ''.join(self._buffer[0]) + self.write_stream.write(bytes) + self._hash.update(bytes) + self._buffer[:] = [[], 0] + + def _get_external_refs(self, index): + return index._external_references() + + def set_write_cache_size(self, size): + self._cache_limit = size + + def _write_index(self, index_type, index, label, suspend=False): + """Write out an index. + + :param index_type: The type of index to write - e.g. 'revision'. + :param index: The index object to serialise. + :param label: What label to give the index e.g. 'revision'. + """ + index_name = self.index_name(index_type, self.name) + if suspend: + transport = self.upload_transport + else: + transport = self.index_transport + index_tempfile = index.finish() + index_bytes = index_tempfile.read() + write_stream = transport.open_write_stream(index_name, + mode=self._file_mode) + write_stream.write(index_bytes) + write_stream.close( + want_fdatasync=self._pack_collection.config_stack.get('repository.fdatasync')) + self.index_sizes[self.index_offset(index_type)] = len(index_bytes) + if 'pack' in debug.debug_flags: + # XXX: size might be interesting? + mutter('%s: create_pack: wrote %s index: %s%s t+%6.3fs', + time.ctime(), label, self.upload_transport.base, + self.random_name, time.time() - self.start_time) + # Replace the writable index on this object with a readonly, + # presently unloaded index. We should alter + # the index layer to make its finish() error if add_node is + # subsequently used. RBC + self._replace_index_with_readonly(index_type) + + +class AggregateIndex(object): + """An aggregated index for the RepositoryPackCollection. + + AggregateIndex is reponsible for managing the PackAccess object, + Index-To-Pack mapping, and all indices list for a specific type of index + such as 'revision index'. + + A CombinedIndex provides an index on a single key space built up + from several on-disk indices. The AggregateIndex builds on this + to provide a knit access layer, and allows having up to one writable + index within the collection. + """ + # XXX: Probably 'can be written to' could/should be separated from 'acts + # like a knit index' -- mbp 20071024 + + def __init__(self, reload_func=None, flush_func=None): + """Create an AggregateIndex. + + :param reload_func: A function to call if we find we are missing an + index. Should have the form reload_func() => True if the list of + active pack files has changed. + """ + self._reload_func = reload_func + self.index_to_pack = {} + self.combined_index = CombinedGraphIndex([], reload_func=reload_func) + self.data_access = _DirectPackAccess(self.index_to_pack, + reload_func=reload_func, + flush_func=flush_func) + self.add_callback = None + + def add_index(self, index, pack): + """Add index to the aggregate, which is an index for Pack pack. + + Future searches on the aggregate index will seach this new index + before all previously inserted indices. + + :param index: An Index for the pack. + :param pack: A Pack instance. + """ + # expose it to the index map + self.index_to_pack[index] = pack.access_tuple() + # put it at the front of the linear index list + self.combined_index.insert_index(0, index, pack.name) + + def add_writable_index(self, index, pack): + """Add an index which is able to have data added to it. + + There can be at most one writable index at any time. Any + modifications made to the knit are put into this index. + + :param index: An index from the pack parameter. + :param pack: A Pack instance. + """ + if self.add_callback is not None: + raise AssertionError( + "%s already has a writable index through %s" % \ + (self, self.add_callback)) + # allow writing: queue writes to a new index + self.add_index(index, pack) + # Updates the index to packs mapping as a side effect, + self.data_access.set_writer(pack._writer, index, pack.access_tuple()) + self.add_callback = index.add_nodes + + def clear(self): + """Reset all the aggregate data to nothing.""" + self.data_access.set_writer(None, None, (None, None)) + self.index_to_pack.clear() + del self.combined_index._indices[:] + del self.combined_index._index_names[:] + self.add_callback = None + + def remove_index(self, index): + """Remove index from the indices used to answer queries. + + :param index: An index from the pack parameter. + """ + del self.index_to_pack[index] + pos = self.combined_index._indices.index(index) + del self.combined_index._indices[pos] + del self.combined_index._index_names[pos] + if (self.add_callback is not None and + getattr(index, 'add_nodes', None) == self.add_callback): + self.add_callback = None + self.data_access.set_writer(None, None, (None, None)) + + +class Packer(object): + """Create a pack from packs.""" + + def __init__(self, pack_collection, packs, suffix, revision_ids=None, + reload_func=None): + """Create a Packer. + + :param pack_collection: A RepositoryPackCollection object where the + new pack is being written to. + :param packs: The packs to combine. + :param suffix: The suffix to use on the temporary files for the pack. + :param revision_ids: Revision ids to limit the pack to. + :param reload_func: A function to call if a pack file/index goes + missing. The side effect of calling this function should be to + update self.packs. See also AggregateIndex + """ + self.packs = packs + self.suffix = suffix + self.revision_ids = revision_ids + # The pack object we are creating. + self.new_pack = None + self._pack_collection = pack_collection + self._reload_func = reload_func + # The index layer keys for the revisions being copied. None for 'all + # objects'. + self._revision_keys = None + # What text keys to copy. None for 'all texts'. This is set by + # _copy_inventory_texts + self._text_filter = None + + def pack(self, pb=None): + """Create a new pack by reading data from other packs. + + This does little more than a bulk copy of data. One key difference + is that data with the same item key across multiple packs is elided + from the output. The new pack is written into the current pack store + along with its indices, and the name added to the pack names. The + source packs are not altered and are not required to be in the current + pack collection. + + :param pb: An optional progress bar to use. A nested bar is created if + this is None. + :return: A Pack object, or None if nothing was copied. + """ + # open a pack - using the same name as the last temporary file + # - which has already been flushed, so it's safe. + # XXX: - duplicate code warning with start_write_group; fix before + # considering 'done'. + if self._pack_collection._new_pack is not None: + raise errors.BzrError('call to %s.pack() while another pack is' + ' being written.' + % (self.__class__.__name__,)) + if self.revision_ids is not None: + if len(self.revision_ids) == 0: + # silly fetch request. + return None + else: + self.revision_ids = frozenset(self.revision_ids) + self.revision_keys = frozenset((revid,) for revid in + self.revision_ids) + if pb is None: + self.pb = ui.ui_factory.nested_progress_bar() + else: + self.pb = pb + try: + return self._create_pack_from_packs() + finally: + if pb is None: + self.pb.finished() + + def open_pack(self): + """Open a pack for the pack we are creating.""" + new_pack = self._pack_collection.pack_factory(self._pack_collection, + upload_suffix=self.suffix, + file_mode=self._pack_collection.repo.bzrdir._get_file_mode()) + # We know that we will process all nodes in order, and don't need to + # query, so don't combine any indices spilled to disk until we are done + new_pack.revision_index.set_optimize(combine_backing_indices=False) + new_pack.inventory_index.set_optimize(combine_backing_indices=False) + new_pack.text_index.set_optimize(combine_backing_indices=False) + new_pack.signature_index.set_optimize(combine_backing_indices=False) + return new_pack + + def _copy_revision_texts(self): + """Copy revision data to the new pack.""" + raise NotImplementedError(self._copy_revision_texts) + + def _copy_inventory_texts(self): + """Copy the inventory texts to the new pack. + + self._revision_keys is used to determine what inventories to copy. + + Sets self._text_filter appropriately. + """ + raise NotImplementedError(self._copy_inventory_texts) + + def _copy_text_texts(self): + raise NotImplementedError(self._copy_text_texts) + + def _create_pack_from_packs(self): + raise NotImplementedError(self._create_pack_from_packs) + + def _log_copied_texts(self): + if 'pack' in debug.debug_flags: + mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs', + time.ctime(), self._pack_collection._upload_transport.base, + self.new_pack.random_name, + self.new_pack.text_index.key_count(), + time.time() - self.new_pack.start_time) + + def _use_pack(self, new_pack): + """Return True if new_pack should be used. + + :param new_pack: The pack that has just been created. + :return: True if the pack should be used. + """ + return new_pack.data_inserted() + + +class RepositoryPackCollection(object): + """Management of packs within a repository. + + :ivar _names: map of {pack_name: (index_size,)} + """ + + pack_factory = None + resumed_pack_factory = None + normal_packer_class = None + optimising_packer_class = None + + def __init__(self, repo, transport, index_transport, upload_transport, + pack_transport, index_builder_class, index_class, + use_chk_index): + """Create a new RepositoryPackCollection. + + :param transport: Addresses the repository base directory + (typically .bzr/repository/). + :param index_transport: Addresses the directory containing indices. + :param upload_transport: Addresses the directory into which packs are written + while they're being created. + :param pack_transport: Addresses the directory of existing complete packs. + :param index_builder_class: The index builder class to use. + :param index_class: The index class to use. + :param use_chk_index: Whether to setup and manage a CHK index. + """ + # XXX: This should call self.reset() + self.repo = repo + self.transport = transport + self._index_transport = index_transport + self._upload_transport = upload_transport + self._pack_transport = pack_transport + self._index_builder_class = index_builder_class + self._index_class = index_class + self._suffix_offsets = {'.rix': 0, '.iix': 1, '.tix': 2, '.six': 3, + '.cix': 4} + self.packs = [] + # name:Pack mapping + self._names = None + self._packs_by_name = {} + # the previous pack-names content + self._packs_at_load = None + # when a pack is being created by this object, the state of that pack. + self._new_pack = None + # aggregated revision index data + flush = self._flush_new_pack + self.revision_index = AggregateIndex(self.reload_pack_names, flush) + self.inventory_index = AggregateIndex(self.reload_pack_names, flush) + self.text_index = AggregateIndex(self.reload_pack_names, flush) + self.signature_index = AggregateIndex(self.reload_pack_names, flush) + all_indices = [self.revision_index, self.inventory_index, + self.text_index, self.signature_index] + if use_chk_index: + self.chk_index = AggregateIndex(self.reload_pack_names, flush) + all_indices.append(self.chk_index) + else: + # used to determine if we're using a chk_index elsewhere. + self.chk_index = None + # Tell all the CombinedGraphIndex objects about each other, so they can + # share hints about which pack names to search first. + all_combined = [agg_idx.combined_index for agg_idx in all_indices] + for combined_idx in all_combined: + combined_idx.set_sibling_indices( + set(all_combined).difference([combined_idx])) + # resumed packs + self._resumed_packs = [] + self.config_stack = config.LocationStack(self.transport.base) + + def __repr__(self): + return '%s(%r)' % (self.__class__.__name__, self.repo) + + def add_pack_to_memory(self, pack): + """Make a Pack object available to the repository to satisfy queries. + + :param pack: A Pack object. + """ + if pack.name in self._packs_by_name: + raise AssertionError( + 'pack %s already in _packs_by_name' % (pack.name,)) + self.packs.append(pack) + self._packs_by_name[pack.name] = pack + self.revision_index.add_index(pack.revision_index, pack) + self.inventory_index.add_index(pack.inventory_index, pack) + self.text_index.add_index(pack.text_index, pack) + self.signature_index.add_index(pack.signature_index, pack) + if self.chk_index is not None: + self.chk_index.add_index(pack.chk_index, pack) + + def all_packs(self): + """Return a list of all the Pack objects this repository has. + + Note that an in-progress pack being created is not returned. + + :return: A list of Pack objects for all the packs in the repository. + """ + result = [] + for name in self.names(): + result.append(self.get_pack_by_name(name)) + return result + + def autopack(self): + """Pack the pack collection incrementally. + + This will not attempt global reorganisation or recompression, + rather it will just ensure that the total number of packs does + not grow without bound. It uses the _max_pack_count method to + determine if autopacking is needed, and the pack_distribution + method to determine the number of revisions in each pack. + + If autopacking takes place then the packs name collection will have + been flushed to disk - packing requires updating the name collection + in synchronisation with certain steps. Otherwise the names collection + is not flushed. + + :return: Something evaluating true if packing took place. + """ + while True: + try: + return self._do_autopack() + except errors.RetryAutopack: + # If we get a RetryAutopack exception, we should abort the + # current action, and retry. + pass + + def _do_autopack(self): + # XXX: Should not be needed when the management of indices is sane. + total_revisions = self.revision_index.combined_index.key_count() + total_packs = len(self._names) + if self._max_pack_count(total_revisions) >= total_packs: + return None + # determine which packs need changing + pack_distribution = self.pack_distribution(total_revisions) + existing_packs = [] + for pack in self.all_packs(): + revision_count = pack.get_revision_count() + if revision_count == 0: + # revision less packs are not generated by normal operation, + # only by operations like sign-my-commits, and thus will not + # tend to grow rapdily or without bound like commit containing + # packs do - leave them alone as packing them really should + # group their data with the relevant commit, and that may + # involve rewriting ancient history - which autopack tries to + # avoid. Alternatively we could not group the data but treat + # each of these as having a single revision, and thus add + # one revision for each to the total revision count, to get + # a matching distribution. + continue + existing_packs.append((revision_count, pack)) + pack_operations = self.plan_autopack_combinations( + existing_packs, pack_distribution) + num_new_packs = len(pack_operations) + num_old_packs = sum([len(po[1]) for po in pack_operations]) + num_revs_affected = sum([po[0] for po in pack_operations]) + mutter('Auto-packing repository %s, which has %d pack files, ' + 'containing %d revisions. Packing %d files into %d affecting %d' + ' revisions', self, total_packs, total_revisions, num_old_packs, + num_new_packs, num_revs_affected) + result = self._execute_pack_operations(pack_operations, packer_class=self.normal_packer_class, + reload_func=self._restart_autopack) + mutter('Auto-packing repository %s completed', self) + return result + + def _execute_pack_operations(self, pack_operations, packer_class, + reload_func=None): + """Execute a series of pack operations. + + :param pack_operations: A list of [revision_count, packs_to_combine]. + :param packer_class: The class of packer to use + :return: The new pack names. + """ + for revision_count, packs in pack_operations: + # we may have no-ops from the setup logic + if len(packs) == 0: + continue + packer = packer_class(self, packs, '.autopack', + reload_func=reload_func) + try: + result = packer.pack() + except errors.RetryWithNewPacks: + # An exception is propagating out of this context, make sure + # this packer has cleaned up. Packer() doesn't set its new_pack + # state into the RepositoryPackCollection object, so we only + # have access to it directly here. + if packer.new_pack is not None: + packer.new_pack.abort() + raise + if result is None: + return + for pack in packs: + self._remove_pack_from_memory(pack) + # record the newly available packs and stop advertising the old + # packs + to_be_obsoleted = [] + for _, packs in pack_operations: + to_be_obsoleted.extend(packs) + result = self._save_pack_names(clear_obsolete_packs=True, + obsolete_packs=to_be_obsoleted) + return result + + def _flush_new_pack(self): + if self._new_pack is not None: + self._new_pack.flush() + + def lock_names(self): + """Acquire the mutex around the pack-names index. + + This cannot be used in the middle of a read-only transaction on the + repository. + """ + self.repo.control_files.lock_write() + + def _already_packed(self): + """Is the collection already packed?""" + return not (self.repo._format.pack_compresses or (len(self._names) > 1)) + + def pack(self, hint=None, clean_obsolete_packs=False): + """Pack the pack collection totally.""" + self.ensure_loaded() + total_packs = len(self._names) + if self._already_packed(): + return + total_revisions = self.revision_index.combined_index.key_count() + # XXX: the following may want to be a class, to pack with a given + # policy. + mutter('Packing repository %s, which has %d pack files, ' + 'containing %d revisions with hint %r.', self, total_packs, + total_revisions, hint) + while True: + try: + self._try_pack_operations(hint) + except RetryPackOperations: + continue + break + + if clean_obsolete_packs: + self._clear_obsolete_packs() + + def _try_pack_operations(self, hint): + """Calculate the pack operations based on the hint (if any), and + execute them. + """ + # determine which packs need changing + pack_operations = [[0, []]] + for pack in self.all_packs(): + if hint is None or pack.name in hint: + # Either no hint was provided (so we are packing everything), + # or this pack was included in the hint. + pack_operations[-1][0] += pack.get_revision_count() + pack_operations[-1][1].append(pack) + self._execute_pack_operations(pack_operations, + packer_class=self.optimising_packer_class, + reload_func=self._restart_pack_operations) + + def plan_autopack_combinations(self, existing_packs, pack_distribution): + """Plan a pack operation. + + :param existing_packs: The packs to pack. (A list of (revcount, Pack) + tuples). + :param pack_distribution: A list with the number of revisions desired + in each pack. + """ + if len(existing_packs) <= len(pack_distribution): + return [] + existing_packs.sort(reverse=True) + pack_operations = [[0, []]] + # plan out what packs to keep, and what to reorganise + while len(existing_packs): + # take the largest pack, and if it's less than the head of the + # distribution chart we will include its contents in the new pack + # for that position. If it's larger, we remove its size from the + # distribution chart + next_pack_rev_count, next_pack = existing_packs.pop(0) + if next_pack_rev_count >= pack_distribution[0]: + # this is already packed 'better' than this, so we can + # not waste time packing it. + while next_pack_rev_count > 0: + next_pack_rev_count -= pack_distribution[0] + if next_pack_rev_count >= 0: + # more to go + del pack_distribution[0] + else: + # didn't use that entire bucket up + pack_distribution[0] = -next_pack_rev_count + else: + # add the revisions we're going to add to the next output pack + pack_operations[-1][0] += next_pack_rev_count + # allocate this pack to the next pack sub operation + pack_operations[-1][1].append(next_pack) + if pack_operations[-1][0] >= pack_distribution[0]: + # this pack is used up, shift left. + del pack_distribution[0] + pack_operations.append([0, []]) + # Now that we know which pack files we want to move, shove them all + # into a single pack file. + final_rev_count = 0 + final_pack_list = [] + for num_revs, pack_files in pack_operations: + final_rev_count += num_revs + final_pack_list.extend(pack_files) + if len(final_pack_list) == 1: + raise AssertionError('We somehow generated an autopack with a' + ' single pack file being moved.') + return [] + return [[final_rev_count, final_pack_list]] + + def ensure_loaded(self): + """Ensure we have read names from disk. + + :return: True if the disk names had not been previously read. + """ + # NB: if you see an assertion error here, it's probably access against + # an unlocked repo. Naughty. + if not self.repo.is_locked(): + raise errors.ObjectNotLocked(self.repo) + if self._names is None: + self._names = {} + self._packs_at_load = set() + for index, key, value in self._iter_disk_pack_index(): + name = key[0] + self._names[name] = self._parse_index_sizes(value) + self._packs_at_load.add((key, value)) + result = True + else: + result = False + # populate all the metadata. + self.all_packs() + return result + + def _parse_index_sizes(self, value): + """Parse a string of index sizes.""" + return tuple([int(digits) for digits in value.split(' ')]) + + def get_pack_by_name(self, name): + """Get a Pack object by name. + + :param name: The name of the pack - e.g. '123456' + :return: A Pack object. + """ + try: + return self._packs_by_name[name] + except KeyError: + rev_index = self._make_index(name, '.rix') + inv_index = self._make_index(name, '.iix') + txt_index = self._make_index(name, '.tix') + sig_index = self._make_index(name, '.six') + if self.chk_index is not None: + chk_index = self._make_index(name, '.cix', is_chk=True) + else: + chk_index = None + result = ExistingPack(self._pack_transport, name, rev_index, + inv_index, txt_index, sig_index, chk_index) + self.add_pack_to_memory(result) + return result + + def _resume_pack(self, name): + """Get a suspended Pack object by name. + + :param name: The name of the pack - e.g. '123456' + :return: A Pack object. + """ + if not re.match('[a-f0-9]{32}', name): + # Tokens should be md5sums of the suspended pack file, i.e. 32 hex + # digits. + raise errors.UnresumableWriteGroup( + self.repo, [name], 'Malformed write group token') + try: + rev_index = self._make_index(name, '.rix', resume=True) + inv_index = self._make_index(name, '.iix', resume=True) + txt_index = self._make_index(name, '.tix', resume=True) + sig_index = self._make_index(name, '.six', resume=True) + if self.chk_index is not None: + chk_index = self._make_index(name, '.cix', resume=True, + is_chk=True) + else: + chk_index = None + result = self.resumed_pack_factory(name, rev_index, inv_index, + txt_index, sig_index, self._upload_transport, + self._pack_transport, self._index_transport, self, + chk_index=chk_index) + except errors.NoSuchFile, e: + raise errors.UnresumableWriteGroup(self.repo, [name], str(e)) + self.add_pack_to_memory(result) + self._resumed_packs.append(result) + return result + + def allocate(self, a_new_pack): + """Allocate name in the list of packs. + + :param a_new_pack: A NewPack instance to be added to the collection of + packs for this repository. + """ + self.ensure_loaded() + if a_new_pack.name in self._names: + raise errors.BzrError( + 'Pack %r already exists in %s' % (a_new_pack.name, self)) + self._names[a_new_pack.name] = tuple(a_new_pack.index_sizes) + self.add_pack_to_memory(a_new_pack) + + def _iter_disk_pack_index(self): + """Iterate over the contents of the pack-names index. + + This is used when loading the list from disk, and before writing to + detect updates from others during our write operation. + :return: An iterator of the index contents. + """ + return self._index_class(self.transport, 'pack-names', None + ).iter_all_entries() + + def _make_index(self, name, suffix, resume=False, is_chk=False): + size_offset = self._suffix_offsets[suffix] + index_name = name + suffix + if resume: + transport = self._upload_transport + index_size = transport.stat(index_name).st_size + else: + transport = self._index_transport + index_size = self._names[name][size_offset] + index = self._index_class(transport, index_name, index_size, + unlimited_cache=is_chk) + if is_chk and self._index_class is btree_index.BTreeGraphIndex: + index._leaf_factory = btree_index._gcchk_factory + return index + + def _max_pack_count(self, total_revisions): + """Return the maximum number of packs to use for total revisions. + + :param total_revisions: The total number of revisions in the + repository. + """ + if not total_revisions: + return 1 + digits = str(total_revisions) + result = 0 + for digit in digits: + result += int(digit) + return result + + def names(self): + """Provide an order to the underlying names.""" + return sorted(self._names.keys()) + + def _obsolete_packs(self, packs): + """Move a number of packs which have been obsoleted out of the way. + + Each pack and its associated indices are moved out of the way. + + Note: for correctness this function should only be called after a new + pack names index has been written without these pack names, and with + the names of packs that contain the data previously available via these + packs. + + :param packs: The packs to obsolete. + :param return: None. + """ + for pack in packs: + try: + try: + pack.pack_transport.move(pack.file_name(), + '../obsolete_packs/' + pack.file_name()) + except errors.NoSuchFile: + # perhaps obsolete_packs was removed? Let's create it and + # try again + try: + pack.pack_transport.mkdir('../obsolete_packs/') + except errors.FileExists: + pass + pack.pack_transport.move(pack.file_name(), + '../obsolete_packs/' + pack.file_name()) + except (errors.PathError, errors.TransportError), e: + # TODO: Should these be warnings or mutters? + mutter("couldn't rename obsolete pack, skipping it:\n%s" + % (e,)) + # TODO: Probably needs to know all possible indices for this pack + # - or maybe list the directory and move all indices matching this + # name whether we recognize it or not? + suffixes = ['.iix', '.six', '.tix', '.rix'] + if self.chk_index is not None: + suffixes.append('.cix') + for suffix in suffixes: + try: + self._index_transport.move(pack.name + suffix, + '../obsolete_packs/' + pack.name + suffix) + except (errors.PathError, errors.TransportError), e: + mutter("couldn't rename obsolete index, skipping it:\n%s" + % (e,)) + + def pack_distribution(self, total_revisions): + """Generate a list of the number of revisions to put in each pack. + + :param total_revisions: The total number of revisions in the + repository. + """ + if total_revisions == 0: + return [0] + digits = reversed(str(total_revisions)) + result = [] + for exponent, count in enumerate(digits): + size = 10 ** exponent + for pos in range(int(count)): + result.append(size) + return list(reversed(result)) + + def _pack_tuple(self, name): + """Return a tuple with the transport and file name for a pack name.""" + return self._pack_transport, name + '.pack' + + def _remove_pack_from_memory(self, pack): + """Remove pack from the packs accessed by this repository. + + Only affects memory state, until self._save_pack_names() is invoked. + """ + self._names.pop(pack.name) + self._packs_by_name.pop(pack.name) + self._remove_pack_indices(pack) + self.packs.remove(pack) + + def _remove_pack_indices(self, pack, ignore_missing=False): + """Remove the indices for pack from the aggregated indices. + + :param ignore_missing: Suppress KeyErrors from calling remove_index. + """ + for index_type in Pack.index_definitions.keys(): + attr_name = index_type + '_index' + aggregate_index = getattr(self, attr_name) + if aggregate_index is not None: + pack_index = getattr(pack, attr_name) + try: + aggregate_index.remove_index(pack_index) + except KeyError: + if ignore_missing: + continue + raise + + def reset(self): + """Clear all cached data.""" + # cached revision data + self.revision_index.clear() + # cached signature data + self.signature_index.clear() + # cached file text data + self.text_index.clear() + # cached inventory data + self.inventory_index.clear() + # cached chk data + if self.chk_index is not None: + self.chk_index.clear() + # remove the open pack + self._new_pack = None + # information about packs. + self._names = None + self.packs = [] + self._packs_by_name = {} + self._packs_at_load = None + + def _unlock_names(self): + """Release the mutex around the pack-names index.""" + self.repo.control_files.unlock() + + def _diff_pack_names(self): + """Read the pack names from disk, and compare it to the one in memory. + + :return: (disk_nodes, deleted_nodes, new_nodes) + disk_nodes The final set of nodes that should be referenced + deleted_nodes Nodes which have been removed from when we started + new_nodes Nodes that are newly introduced + """ + # load the disk nodes across + disk_nodes = set() + for index, key, value in self._iter_disk_pack_index(): + disk_nodes.add((key, value)) + orig_disk_nodes = set(disk_nodes) + + # do a two-way diff against our original content + current_nodes = set() + for name, sizes in self._names.iteritems(): + current_nodes.add( + ((name, ), ' '.join(str(size) for size in sizes))) + + # Packs no longer present in the repository, which were present when we + # locked the repository + deleted_nodes = self._packs_at_load - current_nodes + # Packs which this process is adding + new_nodes = current_nodes - self._packs_at_load + + # Update the disk_nodes set to include the ones we are adding, and + # remove the ones which were removed by someone else + disk_nodes.difference_update(deleted_nodes) + disk_nodes.update(new_nodes) + + return disk_nodes, deleted_nodes, new_nodes, orig_disk_nodes + + def _syncronize_pack_names_from_disk_nodes(self, disk_nodes): + """Given the correct set of pack files, update our saved info. + + :return: (removed, added, modified) + removed pack names removed from self._names + added pack names added to self._names + modified pack names that had changed value + """ + removed = [] + added = [] + modified = [] + ## self._packs_at_load = disk_nodes + new_names = dict(disk_nodes) + # drop no longer present nodes + for pack in self.all_packs(): + if (pack.name,) not in new_names: + removed.append(pack.name) + self._remove_pack_from_memory(pack) + # add new nodes/refresh existing ones + for key, value in disk_nodes: + name = key[0] + sizes = self._parse_index_sizes(value) + if name in self._names: + # existing + if sizes != self._names[name]: + # the pack for name has had its indices replaced - rare but + # important to handle. XXX: probably can never happen today + # because the three-way merge code above does not handle it + # - you may end up adding the same key twice to the new + # disk index because the set values are the same, unless + # the only index shows up as deleted by the set difference + # - which it may. Until there is a specific test for this, + # assume it's broken. RBC 20071017. + self._remove_pack_from_memory(self.get_pack_by_name(name)) + self._names[name] = sizes + self.get_pack_by_name(name) + modified.append(name) + else: + # new + self._names[name] = sizes + self.get_pack_by_name(name) + added.append(name) + return removed, added, modified + + def _save_pack_names(self, clear_obsolete_packs=False, obsolete_packs=None): + """Save the list of packs. + + This will take out the mutex around the pack names list for the + duration of the method call. If concurrent updates have been made, a + three-way merge between the current list and the current in memory list + is performed. + + :param clear_obsolete_packs: If True, clear out the contents of the + obsolete_packs directory. + :param obsolete_packs: Packs that are obsolete once the new pack-names + file has been written. + :return: A list of the names saved that were not previously on disk. + """ + already_obsolete = [] + self.lock_names() + try: + builder = self._index_builder_class() + (disk_nodes, deleted_nodes, new_nodes, + orig_disk_nodes) = self._diff_pack_names() + # TODO: handle same-name, index-size-changes here - + # e.g. use the value from disk, not ours, *unless* we're the one + # changing it. + for key, value in disk_nodes: + builder.add_node(key, value) + self.transport.put_file('pack-names', builder.finish(), + mode=self.repo.bzrdir._get_file_mode()) + self._packs_at_load = disk_nodes + if clear_obsolete_packs: + to_preserve = None + if obsolete_packs: + to_preserve = set([o.name for o in obsolete_packs]) + already_obsolete = self._clear_obsolete_packs(to_preserve) + finally: + self._unlock_names() + # synchronise the memory packs list with what we just wrote: + self._syncronize_pack_names_from_disk_nodes(disk_nodes) + if obsolete_packs: + # TODO: We could add one more condition here. "if o.name not in + # orig_disk_nodes and o != the new_pack we haven't written to + # disk yet. However, the new pack object is not easily + # accessible here (it would have to be passed through the + # autopacking code, etc.) + obsolete_packs = [o for o in obsolete_packs + if o.name not in already_obsolete] + self._obsolete_packs(obsolete_packs) + return [new_node[0][0] for new_node in new_nodes] + + def reload_pack_names(self): + """Sync our pack listing with what is present in the repository. + + This should be called when we find out that something we thought was + present is now missing. This happens when another process re-packs the + repository, etc. + + :return: True if the in-memory list of packs has been altered at all. + """ + # The ensure_loaded call is to handle the case where the first call + # made involving the collection was to reload_pack_names, where we + # don't have a view of disk contents. It's a bit of a bandaid, and + # causes two reads of pack-names, but it's a rare corner case not + # struck with regular push/pull etc. + first_read = self.ensure_loaded() + if first_read: + return True + # out the new value. + (disk_nodes, deleted_nodes, new_nodes, + orig_disk_nodes) = self._diff_pack_names() + # _packs_at_load is meant to be the explicit list of names in + # 'pack-names' at then start. As such, it should not contain any + # pending names that haven't been written out yet. + self._packs_at_load = orig_disk_nodes + (removed, added, + modified) = self._syncronize_pack_names_from_disk_nodes(disk_nodes) + if removed or added or modified: + return True + return False + + def _restart_autopack(self): + """Reload the pack names list, and restart the autopack code.""" + if not self.reload_pack_names(): + # Re-raise the original exception, because something went missing + # and a restart didn't find it + raise + raise errors.RetryAutopack(self.repo, False, sys.exc_info()) + + def _restart_pack_operations(self): + """Reload the pack names list, and restart the autopack code.""" + if not self.reload_pack_names(): + # Re-raise the original exception, because something went missing + # and a restart didn't find it + raise + raise RetryPackOperations(self.repo, False, sys.exc_info()) + + def _clear_obsolete_packs(self, preserve=None): + """Delete everything from the obsolete-packs directory. + + :return: A list of pack identifiers (the filename without '.pack') that + were found in obsolete_packs. + """ + found = [] + obsolete_pack_transport = self.transport.clone('obsolete_packs') + if preserve is None: + preserve = set() + try: + obsolete_pack_files = obsolete_pack_transport.list_dir('.') + except errors.NoSuchFile: + return found + for filename in obsolete_pack_files: + name, ext = osutils.splitext(filename) + if ext == '.pack': + found.append(name) + if name in preserve: + continue + try: + obsolete_pack_transport.delete(filename) + except (errors.PathError, errors.TransportError), e: + warning("couldn't delete obsolete pack, skipping it:\n%s" + % (e,)) + return found + + def _start_write_group(self): + # Do not permit preparation for writing if we're not in a 'write lock'. + if not self.repo.is_write_locked(): + raise errors.NotWriteLocked(self) + self._new_pack = self.pack_factory(self, upload_suffix='.pack', + file_mode=self.repo.bzrdir._get_file_mode()) + # allow writing: queue writes to a new index + self.revision_index.add_writable_index(self._new_pack.revision_index, + self._new_pack) + self.inventory_index.add_writable_index(self._new_pack.inventory_index, + self._new_pack) + self.text_index.add_writable_index(self._new_pack.text_index, + self._new_pack) + self._new_pack.text_index.set_optimize(combine_backing_indices=False) + self.signature_index.add_writable_index(self._new_pack.signature_index, + self._new_pack) + if self.chk_index is not None: + self.chk_index.add_writable_index(self._new_pack.chk_index, + self._new_pack) + self.repo.chk_bytes._index._add_callback = self.chk_index.add_callback + self._new_pack.chk_index.set_optimize(combine_backing_indices=False) + + self.repo.inventories._index._add_callback = self.inventory_index.add_callback + self.repo.revisions._index._add_callback = self.revision_index.add_callback + self.repo.signatures._index._add_callback = self.signature_index.add_callback + self.repo.texts._index._add_callback = self.text_index.add_callback + + def _abort_write_group(self): + # FIXME: just drop the transient index. + # forget what names there are + if self._new_pack is not None: + operation = cleanup.OperationWithCleanups(self._new_pack.abort) + operation.add_cleanup(setattr, self, '_new_pack', None) + # If we aborted while in the middle of finishing the write + # group, _remove_pack_indices could fail because the indexes are + # already gone. But they're not there we shouldn't fail in this + # case, so we pass ignore_missing=True. + operation.add_cleanup(self._remove_pack_indices, self._new_pack, + ignore_missing=True) + operation.run_simple() + for resumed_pack in self._resumed_packs: + operation = cleanup.OperationWithCleanups(resumed_pack.abort) + # See comment in previous finally block. + operation.add_cleanup(self._remove_pack_indices, resumed_pack, + ignore_missing=True) + operation.run_simple() + del self._resumed_packs[:] + + def _remove_resumed_pack_indices(self): + for resumed_pack in self._resumed_packs: + self._remove_pack_indices(resumed_pack) + del self._resumed_packs[:] + + def _check_new_inventories(self): + """Detect missing inventories in this write group. + + :returns: list of strs, summarising any problems found. If the list is + empty no problems were found. + """ + # The base implementation does no checks. GCRepositoryPackCollection + # overrides this. + return [] + + def _commit_write_group(self): + all_missing = set() + for prefix, versioned_file in ( + ('revisions', self.repo.revisions), + ('inventories', self.repo.inventories), + ('texts', self.repo.texts), + ('signatures', self.repo.signatures), + ): + missing = versioned_file.get_missing_compression_parent_keys() + all_missing.update([(prefix,) + key for key in missing]) + if all_missing: + raise errors.BzrCheckError( + "Repository %s has missing compression parent(s) %r " + % (self.repo, sorted(all_missing))) + problems = self._check_new_inventories() + if problems: + problems_summary = '\n'.join(problems) + raise errors.BzrCheckError( + "Cannot add revision(s) to repository: " + problems_summary) + self._remove_pack_indices(self._new_pack) + any_new_content = False + if self._new_pack.data_inserted(): + # get all the data to disk and read to use + self._new_pack.finish() + self.allocate(self._new_pack) + self._new_pack = None + any_new_content = True + else: + self._new_pack.abort() + self._new_pack = None + for resumed_pack in self._resumed_packs: + # XXX: this is a pretty ugly way to turn the resumed pack into a + # properly committed pack. + self._names[resumed_pack.name] = None + self._remove_pack_from_memory(resumed_pack) + resumed_pack.finish() + self.allocate(resumed_pack) + any_new_content = True + del self._resumed_packs[:] + if any_new_content: + result = self.autopack() + if not result: + # when autopack takes no steps, the names list is still + # unsaved. + return self._save_pack_names() + return result + return [] + + def _suspend_write_group(self): + tokens = [pack.name for pack in self._resumed_packs] + self._remove_pack_indices(self._new_pack) + if self._new_pack.data_inserted(): + # get all the data to disk and read to use + self._new_pack.finish(suspend=True) + tokens.append(self._new_pack.name) + self._new_pack = None + else: + self._new_pack.abort() + self._new_pack = None + self._remove_resumed_pack_indices() + return tokens + + def _resume_write_group(self, tokens): + for token in tokens: + self._resume_pack(token) + + +class PackRepository(MetaDirVersionedFileRepository): + """Repository with knit objects stored inside pack containers. + + The layering for a KnitPackRepository is: + + Graph | HPSS | Repository public layer | + =================================================== + Tuple based apis below, string based, and key based apis above + --------------------------------------------------- + VersionedFiles + Provides .texts, .revisions etc + This adapts the N-tuple keys to physical knit records which only have a + single string identifier (for historical reasons), which in older formats + was always the revision_id, and in the mapped code for packs is always + the last element of key tuples. + --------------------------------------------------- + GraphIndex + A separate GraphIndex is used for each of the + texts/inventories/revisions/signatures contained within each individual + pack file. The GraphIndex layer works in N-tuples and is unaware of any + semantic value. + =================================================== + + """ + + # These attributes are inherited from the Repository base class. Setting + # them to None ensures that if the constructor is changed to not initialize + # them, or a subclass fails to call the constructor, that an error will + # occur rather than the system working but generating incorrect data. + _commit_builder_class = None + _serializer = None + + def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class, + _serializer): + MetaDirRepository.__init__(self, _format, a_bzrdir, control_files) + self._commit_builder_class = _commit_builder_class + self._serializer = _serializer + self._reconcile_fixes_text_parents = True + if self._format.supports_external_lookups: + self._unstacked_provider = graph.CachingParentsProvider( + self._make_parents_provider_unstacked()) + else: + self._unstacked_provider = graph.CachingParentsProvider(self) + self._unstacked_provider.disable_cache() + + @needs_read_lock + def _all_revision_ids(self): + """See Repository.all_revision_ids().""" + return [key[0] for key in self.revisions.keys()] + + def _abort_write_group(self): + self.revisions._index._key_dependencies.clear() + self._pack_collection._abort_write_group() + + def _make_parents_provider(self): + if not self._format.supports_external_lookups: + return self._unstacked_provider + return graph.StackedParentsProvider(_LazyListJoin( + [self._unstacked_provider], self._fallback_repositories)) + + def _refresh_data(self): + if not self.is_locked(): + return + self._pack_collection.reload_pack_names() + self._unstacked_provider.disable_cache() + self._unstacked_provider.enable_cache() + + def _start_write_group(self): + self._pack_collection._start_write_group() + + def _commit_write_group(self): + hint = self._pack_collection._commit_write_group() + self.revisions._index._key_dependencies.clear() + # The commit may have added keys that were previously cached as + # missing, so reset the cache. + self._unstacked_provider.disable_cache() + self._unstacked_provider.enable_cache() + return hint + + def suspend_write_group(self): + # XXX check self._write_group is self.get_transaction()? + tokens = self._pack_collection._suspend_write_group() + self.revisions._index._key_dependencies.clear() + self._write_group = None + return tokens + + def _resume_write_group(self, tokens): + self._start_write_group() + try: + self._pack_collection._resume_write_group(tokens) + except errors.UnresumableWriteGroup: + self._abort_write_group() + raise + for pack in self._pack_collection._resumed_packs: + self.revisions._index.scan_unvalidated_index(pack.revision_index) + + def get_transaction(self): + if self._write_lock_count: + return self._transaction + else: + return self.control_files.get_transaction() + + def is_locked(self): + return self._write_lock_count or self.control_files.is_locked() + + def is_write_locked(self): + return self._write_lock_count + + def lock_write(self, token=None): + """Lock the repository for writes. + + :return: A bzrlib.repository.RepositoryWriteLockResult. + """ + locked = self.is_locked() + if not self._write_lock_count and locked: + raise errors.ReadOnlyError(self) + self._write_lock_count += 1 + if self._write_lock_count == 1: + self._transaction = transactions.WriteTransaction() + if not locked: + if 'relock' in debug.debug_flags and self._prev_lock == 'w': + note('%r was write locked again', self) + self._prev_lock = 'w' + self._unstacked_provider.enable_cache() + for repo in self._fallback_repositories: + # Writes don't affect fallback repos + repo.lock_read() + self._refresh_data() + return RepositoryWriteLockResult(self.unlock, None) + + def lock_read(self): + """Lock the repository for reads. + + :return: A bzrlib.lock.LogicalLockResult. + """ + locked = self.is_locked() + if self._write_lock_count: + self._write_lock_count += 1 + else: + self.control_files.lock_read() + if not locked: + if 'relock' in debug.debug_flags and self._prev_lock == 'r': + note('%r was read locked again', self) + self._prev_lock = 'r' + self._unstacked_provider.enable_cache() + for repo in self._fallback_repositories: + repo.lock_read() + self._refresh_data() + return LogicalLockResult(self.unlock) + + def leave_lock_in_place(self): + # not supported - raise an error + raise NotImplementedError(self.leave_lock_in_place) + + def dont_leave_lock_in_place(self): + # not supported - raise an error + raise NotImplementedError(self.dont_leave_lock_in_place) + + @needs_write_lock + def pack(self, hint=None, clean_obsolete_packs=False): + """Compress the data within the repository. + + This will pack all the data to a single pack. In future it may + recompress deltas or do other such expensive operations. + """ + self._pack_collection.pack(hint=hint, clean_obsolete_packs=clean_obsolete_packs) + + @needs_write_lock + def reconcile(self, other=None, thorough=False): + """Reconcile this repository.""" + from bzrlib.reconcile import PackReconciler + reconciler = PackReconciler(self, thorough=thorough) + reconciler.reconcile() + return reconciler + + def _reconcile_pack(self, collection, packs, extension, revs, pb): + raise NotImplementedError(self._reconcile_pack) + + @only_raises(errors.LockNotHeld, errors.LockBroken) + def unlock(self): + if self._write_lock_count == 1 and self._write_group is not None: + self.abort_write_group() + self._unstacked_provider.disable_cache() + self._transaction = None + self._write_lock_count = 0 + raise errors.BzrError( + 'Must end write group before releasing write lock on %s' + % self) + if self._write_lock_count: + self._write_lock_count -= 1 + if not self._write_lock_count: + transaction = self._transaction + self._transaction = None + transaction.finish() + else: + self.control_files.unlock() + + if not self.is_locked(): + self._unstacked_provider.disable_cache() + for repo in self._fallback_repositories: + repo.unlock() + + +class RepositoryFormatPack(MetaDirVersionedFileRepositoryFormat): + """Format logic for pack structured repositories. + + This repository format has: + - a list of packs in pack-names + - packs in packs/NAME.pack + - indices in indices/NAME.{iix,six,tix,rix} + - knit deltas in the packs, knit indices mapped to the indices. + - thunk objects to support the knits programming API. + - a format marker of its own + - an optional 'shared-storage' flag + - an optional 'no-working-trees' flag + - a LockDir lock + """ + + # Set this attribute in derived classes to control the repository class + # created by open and initialize. + repository_class = None + # Set this attribute in derived classes to control the + # _commit_builder_class that the repository objects will have passed to + # their constructor. + _commit_builder_class = None + # Set this attribute in derived clases to control the _serializer that the + # repository objects will have passed to their constructor. + _serializer = None + # Packs are not confused by ghosts. + supports_ghosts = True + # External references are not supported in pack repositories yet. + supports_external_lookups = False + # Most pack formats do not use chk lookups. + supports_chks = False + # What index classes to use + index_builder_class = None + index_class = None + _fetch_uses_deltas = True + fast_deltas = False + supports_funky_characters = True + revision_graph_can_have_wrong_parents = True + + def initialize(self, a_bzrdir, shared=False): + """Create a pack based repository. + + :param a_bzrdir: bzrdir to contain the new repository; must already + be initialized. + :param shared: If true the repository will be initialized as a shared + repository. + """ + mutter('creating repository in %s.', a_bzrdir.transport.base) + dirs = ['indices', 'obsolete_packs', 'packs', 'upload'] + builder = self.index_builder_class() + files = [('pack-names', builder.finish())] + utf8_files = [('format', self.get_format_string())] + + self._upload_blank_content(a_bzrdir, dirs, files, utf8_files, shared) + repository = self.open(a_bzrdir=a_bzrdir, _found=True) + self._run_post_repo_init_hooks(repository, a_bzrdir, shared) + return repository + + def open(self, a_bzrdir, _found=False, _override_transport=None): + """See RepositoryFormat.open(). + + :param _override_transport: INTERNAL USE ONLY. Allows opening the + repository at a slightly different url + than normal. I.e. during 'upgrade'. + """ + if not _found: + format = RepositoryFormatMetaDir.find_format(a_bzrdir) + if _override_transport is not None: + repo_transport = _override_transport + else: + repo_transport = a_bzrdir.get_repository_transport(None) + control_files = lockable_files.LockableFiles(repo_transport, + 'lock', lockdir.LockDir) + return self.repository_class(_format=self, + a_bzrdir=a_bzrdir, + control_files=control_files, + _commit_builder_class=self._commit_builder_class, + _serializer=self._serializer) + + +class RetryPackOperations(errors.RetryWithNewPacks): + """Raised when we are packing and we find a missing file. + + Meant as a signaling exception, to tell the RepositoryPackCollection.pack + code it should try again. + """ + + internal_error = True + + _fmt = ("Pack files have changed, reload and try pack again." + " context: %(context)s %(orig_error)s") + + +class _DirectPackAccess(object): + """Access to data in one or more packs with less translation.""" + + def __init__(self, index_to_packs, reload_func=None, flush_func=None): + """Create a _DirectPackAccess object. + + :param index_to_packs: A dict mapping index objects to the transport + and file names for obtaining data. + :param reload_func: A function to call if we determine that the pack + files have moved and we need to reload our caches. See + bzrlib.repo_fmt.pack_repo.AggregateIndex for more details. + """ + self._container_writer = None + self._write_index = None + self._indices = index_to_packs + self._reload_func = reload_func + self._flush_func = flush_func + + def add_raw_records(self, key_sizes, raw_data): + """Add raw knit bytes to a storage area. + + The data is spooled to the container writer in one bytes-record per + raw data item. + + :param sizes: An iterable of tuples containing the key and size of each + raw data segment. + :param raw_data: A bytestring containing the data. + :return: A list of memos to retrieve the record later. Each memo is an + opaque index memo. For _DirectPackAccess the memo is (index, pos, + length), where the index field is the write_index object supplied + to the PackAccess object. + """ + if type(raw_data) is not str: + raise AssertionError( + 'data must be plain bytes was %s' % type(raw_data)) + result = [] + offset = 0 + for key, size in key_sizes: + p_offset, p_length = self._container_writer.add_bytes_record( + raw_data[offset:offset+size], []) + offset += size + result.append((self._write_index, p_offset, p_length)) + return result + + def flush(self): + """Flush pending writes on this access object. + + This will flush any buffered writes to a NewPack. + """ + if self._flush_func is not None: + self._flush_func() + + def get_raw_records(self, memos_for_retrieval): + """Get the raw bytes for a records. + + :param memos_for_retrieval: An iterable containing the (index, pos, + length) memo for retrieving the bytes. The Pack access method + looks up the pack to use for a given record in its index_to_pack + map. + :return: An iterator over the bytes of the records. + """ + # first pass, group into same-index requests + request_lists = [] + current_index = None + for (index, offset, length) in memos_for_retrieval: + if current_index == index: + current_list.append((offset, length)) + else: + if current_index is not None: + request_lists.append((current_index, current_list)) + current_index = index + current_list = [(offset, length)] + # handle the last entry + if current_index is not None: + request_lists.append((current_index, current_list)) + for index, offsets in request_lists: + try: + transport, path = self._indices[index] + except KeyError: + # A KeyError here indicates that someone has triggered an index + # reload, and this index has gone missing, we need to start + # over. + if self._reload_func is None: + # If we don't have a _reload_func there is nothing that can + # be done + raise + raise errors.RetryWithNewPacks(index, + reload_occurred=True, + exc_info=sys.exc_info()) + try: + reader = pack.make_readv_reader(transport, path, offsets) + for names, read_func in reader.iter_records(): + yield read_func(None) + except errors.NoSuchFile: + # A NoSuchFile error indicates that a pack file has gone + # missing on disk, we need to trigger a reload, and start over. + if self._reload_func is None: + raise + raise errors.RetryWithNewPacks(transport.abspath(path), + reload_occurred=False, + exc_info=sys.exc_info()) + + def set_writer(self, writer, index, transport_packname): + """Set a writer to use for adding data.""" + if index is not None: + self._indices[index] = transport_packname + self._container_writer = writer + self._write_index = index + + def reload_or_raise(self, retry_exc): + """Try calling the reload function, or re-raise the original exception. + + This should be called after _DirectPackAccess raises a + RetryWithNewPacks exception. This function will handle the common logic + of determining when the error is fatal versus being temporary. + It will also make sure that the original exception is raised, rather + than the RetryWithNewPacks exception. + + If this function returns, then the calling function should retry + whatever operation was being performed. Otherwise an exception will + be raised. + + :param retry_exc: A RetryWithNewPacks exception. + """ + is_error = False + if self._reload_func is None: + is_error = True + elif not self._reload_func(): + # The reload claimed that nothing changed + if not retry_exc.reload_occurred: + # If there wasn't an earlier reload, then we really were + # expecting to find changes. We didn't find them, so this is a + # hard error + is_error = True + if is_error: + exc_class, exc_value, exc_traceback = retry_exc.exc_info + raise exc_class, exc_value, exc_traceback + + + |