diff options
Diffstat (limited to 'processors/generic_processor.py')
-rw-r--r-- | processors/generic_processor.py | 569 |
1 files changed, 0 insertions, 569 deletions
diff --git a/processors/generic_processor.py b/processors/generic_processor.py deleted file mode 100644 index e9006c1..0000000 --- a/processors/generic_processor.py +++ /dev/null @@ -1,569 +0,0 @@ -# Copyright (C) 2008 Canonical Ltd -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -"""Import processor that supports all Bazaar repository formats.""" - - -import time -from bzrlib import ( - delta, - errors, - osutils, - progress, - ) -from bzrlib.repofmt import pack_repo -from bzrlib.trace import note -try: - import bzrlib.util.configobj.configobj as configobj -except ImportError: - import configobj -from bzrlib.plugins.fastimport import ( - branch_updater, - bzr_commit_handler, - cache_manager, - marks_file, - revision_store, - ) -from bzrlib.plugins.fastimport.fastimport import ( - commands, - errors as plugin_errors, - helpers, - idmapfile, - processor, - ) - - -# How many commits before automatically reporting progress -_DEFAULT_AUTO_PROGRESS = 1000 - -# How many commits before automatically checkpointing -_DEFAULT_AUTO_CHECKPOINT = 10000 - -# How many checkpoints before automatically packing -_DEFAULT_AUTO_PACK = 4 - -# How many inventories to cache -_DEFAULT_INV_CACHE_SIZE = 1 -_DEFAULT_CHK_INV_CACHE_SIZE = 1 - - -class GenericProcessor(processor.ImportProcessor): - """An import processor that handles basic imports. - - Current features supported: - - * blobs are cached in memory - * files and symlinks commits are supported - * checkpoints automatically happen at a configurable frequency - over and above the stream requested checkpoints - * timestamped progress reporting, both automatic and stream requested - * some basic statistics are dumped on completion. - - At checkpoints and on completion, the commit-id -> revision-id map is - saved to a file called 'fastimport-id-map'. If the import crashes - or is interrupted, it can be started again and this file will be - used to skip over already loaded revisions. The format of each line - is "commit-id revision-id" so commit-ids cannot include spaces. - - Here are the supported parameters: - - * info - name of a hints file holding the analysis generated - by running the fast-import-info processor in verbose mode. When - importing large repositories, this parameter is needed so - that the importer knows what blobs to intelligently cache. - - * trees - update the working trees before completing. - By default, the importer updates the repository - and branches and the user needs to run 'bzr update' for the - branches of interest afterwards. - - * count - only import this many commits then exit. If not set - or negative, all commits are imported. - - * checkpoint - automatically checkpoint every n commits over and - above any checkpoints contained in the import stream. - The default is 10000. - - * autopack - pack every n checkpoints. The default is 4. - - * inv-cache - number of inventories to cache. - If not set, the default is 1. - - * mode - import algorithm to use: default, experimental or classic. - - * import-marks - name of file to read to load mark information from - - * export-marks - name of file to write to save mark information to - """ - - known_params = [ - 'info', - 'trees', - 'count', - 'checkpoint', - 'autopack', - 'inv-cache', - 'mode', - 'import-marks', - 'export-marks', - ] - - def __init__(self, bzrdir, params=None, verbose=False, outf=None, - prune_empty_dirs=True): - processor.ImportProcessor.__init__(self, bzrdir, params, verbose) - self.prune_empty_dirs = prune_empty_dirs - - def pre_process(self): - self._start_time = time.time() - self._load_info_and_params() - if self.total_commits: - self.note("Starting import of %d commits ..." % - (self.total_commits,)) - else: - self.note("Starting import ...") - self.cache_mgr = cache_manager.CacheManager(self.info, self.verbose, - self.inventory_cache_size) - - if self.params.get("import-marks") is not None: - mark_info = marks_file.import_marks(self.params.get("import-marks")) - if mark_info is not None: - self.cache_mgr.revision_ids = mark_info[0] - self.skip_total = False - self.first_incremental_commit = True - else: - self.first_incremental_commit = False - self.skip_total = self._init_id_map() - if self.skip_total: - self.note("Found %d commits already loaded - " - "skipping over these ...", self.skip_total) - self._revision_count = 0 - - # mapping of tag name to revision_id - self.tags = {} - - # Create the revision store to use for committing, if any - self.rev_store = self._revision_store_factory() - - # Disable autopacking if the repo format supports it. - # THIS IS A HACK - there is no sanctioned way of doing this yet. - if isinstance(self.repo, pack_repo.KnitPackRepository): - self._original_max_pack_count = \ - self.repo._pack_collection._max_pack_count - def _max_pack_count_for_import(total_revisions): - return total_revisions + 1 - self.repo._pack_collection._max_pack_count = \ - _max_pack_count_for_import - else: - self._original_max_pack_count = None - - # Make groupcompress use the fast algorithm during importing. - # We want to repack at the end anyhow when more information - # is available to do a better job of saving space. - try: - from bzrlib import groupcompress - groupcompress._FAST = True - except ImportError: - pass - - # Create a write group. This is committed at the end of the import. - # Checkpointing closes the current one and starts a new one. - self.repo.start_write_group() - - def _load_info_and_params(self): - self._mode = bool(self.params.get('mode', 'default')) - self._experimental = self._mode == 'experimental' - - # This is currently hard-coded but might be configurable via - # parameters one day if that's needed - repo_transport = self.repo.control_files._transport - self.id_map_path = repo_transport.local_abspath("fastimport-id-map") - - # Load the info file, if any - info_path = self.params.get('info') - if info_path is not None: - self.info = configobj.ConfigObj(info_path) - else: - self.info = None - - # Decide which CommitHandler to use - self.supports_chk = getattr(self.repo._format, 'supports_chks', False) - if self.supports_chk and self._mode == 'classic': - note("Cannot use classic algorithm on CHK repositories" - " - using default one instead") - self._mode = 'default' - if self._mode == 'classic': - self.commit_handler_factory = \ - bzr_commit_handler.InventoryCommitHandler - else: - self.commit_handler_factory = \ - bzr_commit_handler.InventoryDeltaCommitHandler - - # Decide how often to automatically report progress - # (not a parameter yet) - self.progress_every = _DEFAULT_AUTO_PROGRESS - if self.verbose: - self.progress_every = self.progress_every / 10 - - # Decide how often (# of commits) to automatically checkpoint - self.checkpoint_every = int(self.params.get('checkpoint', - _DEFAULT_AUTO_CHECKPOINT)) - - # Decide how often (# of checkpoints) to automatically pack - self.checkpoint_count = 0 - self.autopack_every = int(self.params.get('autopack', - _DEFAULT_AUTO_PACK)) - - # Decide how big to make the inventory cache - cache_size = int(self.params.get('inv-cache', -1)) - if cache_size == -1: - if self.supports_chk: - cache_size = _DEFAULT_CHK_INV_CACHE_SIZE - else: - cache_size = _DEFAULT_INV_CACHE_SIZE - self.inventory_cache_size = cache_size - - # Find the maximum number of commits to import (None means all) - # and prepare progress reporting. Just in case the info file - # has an outdated count of commits, we store the max counts - # at which we need to terminate separately to the total used - # for progress tracking. - try: - self.max_commits = int(self.params['count']) - if self.max_commits < 0: - self.max_commits = None - except KeyError: - self.max_commits = None - if self.info is not None: - self.total_commits = int(self.info['Command counts']['commit']) - if (self.max_commits is not None and - self.total_commits > self.max_commits): - self.total_commits = self.max_commits - else: - self.total_commits = self.max_commits - - def _revision_store_factory(self): - """Make a RevisionStore based on what the repository supports.""" - new_repo_api = hasattr(self.repo, 'revisions') - if new_repo_api: - return revision_store.RevisionStore2(self.repo) - elif not self._experimental: - return revision_store.RevisionStore1(self.repo) - else: - def fulltext_when(count): - total = self.total_commits - if total is not None and count == total: - fulltext = True - else: - # Create an inventory fulltext every 200 revisions - fulltext = count % 200 == 0 - if fulltext: - self.note("%d commits - storing inventory as full-text", - count) - return fulltext - - return revision_store.ImportRevisionStore1( - self.repo, self.inventory_cache_size, - fulltext_when=fulltext_when) - - def _process(self, command_iter): - # if anything goes wrong, abort the write group if any - try: - processor.ImportProcessor._process(self, command_iter) - except: - if self.repo is not None and self.repo.is_in_write_group(): - self.repo.abort_write_group() - raise - - def post_process(self): - # Commit the current write group and checkpoint the id map - self.repo.commit_write_group() - self._save_id_map() - - if self.params.get("export-marks") is not None: - marks_file.export_marks(self.params.get("export-marks"), - self.cache_mgr.revision_ids) - - if self.cache_mgr.last_ref == None: - """Nothing to refresh""" - return - - # Update the branches - self.note("Updating branch information ...") - updater = branch_updater.BranchUpdater(self.repo, self.branch, - self.cache_mgr, helpers.invert_dictset(self.cache_mgr.heads), - self.cache_mgr.last_ref, self.tags) - branches_updated, branches_lost = updater.update() - self._branch_count = len(branches_updated) - - # Tell the user about branches that were not created - if branches_lost: - if not self.repo.is_shared(): - self.warning("Cannot import multiple branches into " - "a standalone branch") - self.warning("Not creating branches for these head revisions:") - for lost_info in branches_lost: - head_revision = lost_info[1] - branch_name = lost_info[0] - self.note("\t %s = %s", head_revision, branch_name) - - # Update the working trees as requested - self._tree_count = 0 - remind_about_update = True - if self._branch_count == 0: - self.note("no branches to update") - self.note("no working trees to update") - remind_about_update = False - elif self.params.get('trees', False): - trees = self._get_working_trees(branches_updated) - if trees: - self._update_working_trees(trees) - remind_about_update = False - else: - self.warning("No working trees available to update") - else: - # Update just the trunk. (This is always the first branch - # returned by the branch updater.) - trunk_branch = branches_updated[0] - trees = self._get_working_trees([trunk_branch]) - if trees: - self._update_working_trees(trees) - remind_about_update = self._branch_count > 1 - - # Dump the cache stats now because we clear it before the final pack - if self.verbose: - self.cache_mgr.dump_stats() - if self._original_max_pack_count: - # We earlier disabled autopacking, creating one pack every - # checkpoint instead. We now pack the repository to optimise - # how data is stored. - self.cache_mgr.clear_all() - self._pack_repository() - - # Finish up by dumping stats & telling the user what to do next. - self.dump_stats() - if remind_about_update: - # This message is explicitly not timestamped. - note("To refresh the working tree for other branches, " - "use 'bzr update' inside that branch.") - - def _update_working_trees(self, trees): - if self.verbose: - reporter = delta._ChangeReporter() - else: - reporter = None - for wt in trees: - self.note("Updating the working tree for %s ...", wt.basedir) - wt.update(reporter) - self._tree_count += 1 - - def _pack_repository(self, final=True): - # Before packing, free whatever memory we can and ensure - # that groupcompress is configured to optimise disk space - import gc - if final: - try: - from bzrlib import groupcompress - except ImportError: - pass - else: - groupcompress._FAST = False - gc.collect() - self.note("Packing repository ...") - self.repo.pack() - - # To be conservative, packing puts the old packs and - # indices in obsolete_packs. We err on the side of - # optimism and clear out that directory to save space. - self.note("Removing obsolete packs ...") - # TODO: Use a public API for this once one exists - repo_transport = self.repo._pack_collection.transport - repo_transport.clone('obsolete_packs').delete_multi( - repo_transport.list_dir('obsolete_packs')) - - # If we're not done, free whatever memory we can - if not final: - gc.collect() - - def _get_working_trees(self, branches): - """Get the working trees for branches in the repository.""" - result = [] - wt_expected = self.repo.make_working_trees() - for br in branches: - if br is None: - continue - elif br == self.branch: - if self.working_tree: - result.append(self.working_tree) - elif wt_expected: - try: - result.append(br.bzrdir.open_workingtree()) - except errors.NoWorkingTree: - self.warning("No working tree for branch %s", br) - return result - - def dump_stats(self): - time_required = progress.str_tdelta(time.time() - self._start_time) - rc = self._revision_count - self.skip_total - bc = self._branch_count - wtc = self._tree_count - self.note("Imported %d %s, updating %d %s and %d %s in %s", - rc, helpers.single_plural(rc, "revision", "revisions"), - bc, helpers.single_plural(bc, "branch", "branches"), - wtc, helpers.single_plural(wtc, "tree", "trees"), - time_required) - - def _init_id_map(self): - """Load the id-map and check it matches the repository. - - :return: the number of entries in the map - """ - # Currently, we just check the size. In the future, we might - # decide to be more paranoid and check that the revision-ids - # are identical as well. - self.cache_mgr.revision_ids, known = idmapfile.load_id_map( - self.id_map_path) - existing_count = len(self.repo.all_revision_ids()) - if existing_count < known: - raise plugin_errors.BadRepositorySize(known, existing_count) - return known - - def _save_id_map(self): - """Save the id-map.""" - # Save the whole lot every time. If this proves a problem, we can - # change to 'append just the new ones' at a later time. - idmapfile.save_id_map(self.id_map_path, self.cache_mgr.revision_ids) - - def blob_handler(self, cmd): - """Process a BlobCommand.""" - if cmd.mark is not None: - dataref = cmd.id - else: - dataref = osutils.sha_strings(cmd.data) - self.cache_mgr.store_blob(dataref, cmd.data) - - def checkpoint_handler(self, cmd): - """Process a CheckpointCommand.""" - # Commit the current write group and start a new one - self.repo.commit_write_group() - self._save_id_map() - # track the number of automatic checkpoints done - if cmd is None: - self.checkpoint_count += 1 - if self.checkpoint_count % self.autopack_every == 0: - self._pack_repository(final=False) - self.repo.start_write_group() - - def commit_handler(self, cmd): - """Process a CommitCommand.""" - if self.skip_total and self._revision_count < self.skip_total: - self.cache_mgr.track_heads(cmd) - # Check that we really do know about this commit-id - if not self.cache_mgr.revision_ids.has_key(cmd.id): - raise plugin_errors.BadRestart(cmd.id) - # Consume the file commands and free any non-sticky blobs - for fc in cmd.file_iter(): - pass - self.cache_mgr._blobs = {} - self._revision_count += 1 - if cmd.ref.startswith('refs/tags/'): - tag_name = cmd.ref[len('refs/tags/'):] - self._set_tag(tag_name, cmd.id) - return - if self.first_incremental_commit: - self.first_incremental_commit = None - parents = self.cache_mgr.track_heads(cmd) - - # 'Commit' the revision and report progress - handler = self.commit_handler_factory(cmd, self.cache_mgr, - self.rev_store, verbose=self.verbose, - prune_empty_dirs=self.prune_empty_dirs) - try: - handler.process() - except: - print "ABORT: exception occurred processing commit %s" % (cmd.id) - raise - self.cache_mgr.revision_ids[cmd.id] = handler.revision_id - self._revision_count += 1 - self.report_progress("(%s)" % cmd.id) - - if cmd.ref.startswith('refs/tags/'): - tag_name = cmd.ref[len('refs/tags/'):] - self._set_tag(tag_name, cmd.id) - - # Check if we should finish up or automatically checkpoint - if (self.max_commits is not None and - self._revision_count >= self.max_commits): - self.note("Stopping after reaching requested count of commits") - self.finished = True - elif self._revision_count % self.checkpoint_every == 0: - self.note("%d commits - automatic checkpoint triggered", - self._revision_count) - self.checkpoint_handler(None) - - def report_progress(self, details=''): - if self._revision_count % self.progress_every == 0: - if self.total_commits is not None: - counts = "%d/%d" % (self._revision_count, self.total_commits) - else: - counts = "%d" % (self._revision_count,) - minutes = (time.time() - self._start_time) / 60 - revisions_added = self._revision_count - self.skip_total - rate = revisions_added * 1.0 / minutes - if rate > 10: - rate_str = "at %.0f/minute " % rate - else: - rate_str = "at %.1f/minute " % rate - self.note("%s commits processed %s%s" % (counts, rate_str, details)) - - def progress_handler(self, cmd): - """Process a ProgressCommand.""" - # Most progress messages embedded in streams are annoying. - # Ignore them unless in verbose mode. - if self.verbose: - self.note("progress %s" % (cmd.message,)) - - def reset_handler(self, cmd): - """Process a ResetCommand.""" - if cmd.ref.startswith('refs/tags/'): - tag_name = cmd.ref[len('refs/tags/'):] - if cmd.from_ is not None: - self._set_tag(tag_name, cmd.from_) - elif self.verbose: - self.warning("ignoring reset refs/tags/%s - no from clause" - % tag_name) - return - - if cmd.from_ is not None: - self.cache_mgr.track_heads_for_ref(cmd.ref, cmd.from_) - - def tag_handler(self, cmd): - """Process a TagCommand.""" - if cmd.from_ is not None: - self._set_tag(cmd.id, cmd.from_) - else: - self.warning("ignoring tag %s - no from clause" % cmd.id) - - def _set_tag(self, name, from_): - """Define a tag given a name and import 'from' reference.""" - bzr_tag_name = name.decode('utf-8', 'replace') - bzr_rev_id = self.cache_mgr.revision_ids[from_] - self.tags[bzr_tag_name] = bzr_rev_id - - def feature_handler(self, cmd): - """Process a FeatureCommand.""" - feature = cmd.feature_name - if feature not in commands.FEATURE_NAMES: - raise plugin_errors.UnknownFeature(feature) |