summaryrefslogtreecommitdiff
path: root/bzrlib/tests/test_transform.py
diff options
context:
space:
mode:
Diffstat (limited to 'bzrlib/tests/test_transform.py')
-rw-r--r--bzrlib/tests/test_transform.py3729
1 files changed, 3729 insertions, 0 deletions
diff --git a/bzrlib/tests/test_transform.py b/bzrlib/tests/test_transform.py
new file mode 100644
index 0000000..d1a0b33
--- /dev/null
+++ b/bzrlib/tests/test_transform.py
@@ -0,0 +1,3729 @@
+# Copyright (C) 2006-2011 Canonical Ltd
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+import errno
+import os
+from StringIO import StringIO
+import sys
+import time
+
+from bzrlib import (
+ bencode,
+ errors,
+ filters,
+ generate_ids,
+ osutils,
+ revision as _mod_revision,
+ rules,
+ symbol_versioning,
+ tests,
+ trace,
+ transform,
+ urlutils,
+ )
+from bzrlib.conflicts import (
+ DeletingParent,
+ DuplicateEntry,
+ DuplicateID,
+ MissingParent,
+ NonDirectoryParent,
+ ParentLoop,
+ UnversionedParent,
+)
+from bzrlib.controldir import ControlDir
+from bzrlib.diff import show_diff_trees
+from bzrlib.errors import (
+ DuplicateKey,
+ ExistingLimbo,
+ ExistingPendingDeletion,
+ ImmortalLimbo,
+ ImmortalPendingDeletion,
+ LockError,
+ MalformedTransform,
+ ReusingTransform,
+)
+from bzrlib.osutils import (
+ file_kind,
+ pathjoin,
+)
+from bzrlib.merge import Merge3Merger, Merger
+from bzrlib.mutabletree import MutableTree
+from bzrlib.tests import (
+ features,
+ TestCaseInTempDir,
+ TestSkipped,
+ )
+from bzrlib.tests.features import (
+ HardlinkFeature,
+ SymlinkFeature,
+ )
+from bzrlib.transform import (
+ build_tree,
+ create_from_tree,
+ cook_conflicts,
+ _FileMover,
+ FinalPaths,
+ resolve_conflicts,
+ resolve_checkout,
+ ROOT_PARENT,
+ TransformPreview,
+ TreeTransform,
+)
+
+
+class TestTreeTransform(tests.TestCaseWithTransport):
+
+ def setUp(self):
+ super(TestTreeTransform, self).setUp()
+ self.wt = self.make_branch_and_tree('.', format='development-subtree')
+ os.chdir('..')
+
+ def get_transform(self):
+ transform = TreeTransform(self.wt)
+ self.addCleanup(transform.finalize)
+ return transform, transform.root
+
+ def get_transform_for_sha1_test(self):
+ trans, root = self.get_transform()
+ self.wt.lock_tree_write()
+ self.addCleanup(self.wt.unlock)
+ contents = ['just some content\n']
+ sha1 = osutils.sha_strings(contents)
+ # Roll back the clock
+ trans._creation_mtime = time.time() - 20.0
+ return trans, root, contents, sha1
+
+ def test_existing_limbo(self):
+ transform, root = self.get_transform()
+ limbo_name = transform._limbodir
+ deletion_path = transform._deletiondir
+ os.mkdir(pathjoin(limbo_name, 'hehe'))
+ self.assertRaises(ImmortalLimbo, transform.apply)
+ self.assertRaises(LockError, self.wt.unlock)
+ self.assertRaises(ExistingLimbo, self.get_transform)
+ self.assertRaises(LockError, self.wt.unlock)
+ os.rmdir(pathjoin(limbo_name, 'hehe'))
+ os.rmdir(limbo_name)
+ os.rmdir(deletion_path)
+ transform, root = self.get_transform()
+ transform.apply()
+
+ def test_existing_pending_deletion(self):
+ transform, root = self.get_transform()
+ deletion_path = self._limbodir = urlutils.local_path_from_url(
+ transform._tree._transport.abspath('pending-deletion'))
+ os.mkdir(pathjoin(deletion_path, 'blocking-directory'))
+ self.assertRaises(ImmortalPendingDeletion, transform.apply)
+ self.assertRaises(LockError, self.wt.unlock)
+ self.assertRaises(ExistingPendingDeletion, self.get_transform)
+
+ def test_build(self):
+ transform, root = self.get_transform()
+ self.wt.lock_tree_write()
+ self.addCleanup(self.wt.unlock)
+ self.assertIs(transform.get_tree_parent(root), ROOT_PARENT)
+ imaginary_id = transform.trans_id_tree_path('imaginary')
+ imaginary_id2 = transform.trans_id_tree_path('imaginary/')
+ self.assertEqual(imaginary_id, imaginary_id2)
+ self.assertEqual(root, transform.get_tree_parent(imaginary_id))
+ self.assertEqual('directory', transform.final_kind(root))
+ self.assertEqual(self.wt.get_root_id(), transform.final_file_id(root))
+ trans_id = transform.create_path('name', root)
+ self.assertIs(transform.final_file_id(trans_id), None)
+ self.assertIs(None, transform.final_kind(trans_id))
+ transform.create_file('contents', trans_id)
+ transform.set_executability(True, trans_id)
+ transform.version_file('my_pretties', trans_id)
+ self.assertRaises(DuplicateKey, transform.version_file,
+ 'my_pretties', trans_id)
+ self.assertEqual(transform.final_file_id(trans_id), 'my_pretties')
+ self.assertEqual(transform.final_parent(trans_id), root)
+ self.assertIs(transform.final_parent(root), ROOT_PARENT)
+ self.assertIs(transform.get_tree_parent(root), ROOT_PARENT)
+ oz_id = transform.create_path('oz', root)
+ transform.create_directory(oz_id)
+ transform.version_file('ozzie', oz_id)
+ trans_id2 = transform.create_path('name2', root)
+ transform.create_file('contents', trans_id2)
+ transform.set_executability(False, trans_id2)
+ transform.version_file('my_pretties2', trans_id2)
+ modified_paths = transform.apply().modified_paths
+ self.assertEqual('contents', self.wt.get_file_byname('name').read())
+ self.assertEqual(self.wt.path2id('name'), 'my_pretties')
+ self.assertIs(self.wt.is_executable('my_pretties'), True)
+ self.assertIs(self.wt.is_executable('my_pretties2'), False)
+ self.assertEqual('directory', file_kind(self.wt.abspath('oz')))
+ self.assertEqual(len(modified_paths), 3)
+ tree_mod_paths = [self.wt.id2abspath(f) for f in
+ ('ozzie', 'my_pretties', 'my_pretties2')]
+ self.assertSubset(tree_mod_paths, modified_paths)
+ # is it safe to finalize repeatedly?
+ transform.finalize()
+ transform.finalize()
+
+ def test_apply_informs_tree_of_observed_sha1(self):
+ trans, root, contents, sha1 = self.get_transform_for_sha1_test()
+ trans_id = trans.new_file('file1', root, contents, file_id='file1-id',
+ sha1=sha1)
+ calls = []
+ orig = self.wt._observed_sha1
+ def _observed_sha1(*args):
+ calls.append(args)
+ orig(*args)
+ self.wt._observed_sha1 = _observed_sha1
+ trans.apply()
+ self.assertEqual([(None, 'file1', trans._observed_sha1s[trans_id])],
+ calls)
+
+ def test_create_file_caches_sha1(self):
+ trans, root, contents, sha1 = self.get_transform_for_sha1_test()
+ trans_id = trans.create_path('file1', root)
+ trans.create_file(contents, trans_id, sha1=sha1)
+ st_val = osutils.lstat(trans._limbo_name(trans_id))
+ o_sha1, o_st_val = trans._observed_sha1s[trans_id]
+ self.assertEqual(o_sha1, sha1)
+ self.assertEqualStat(o_st_val, st_val)
+
+ def test__apply_insertions_updates_sha1(self):
+ trans, root, contents, sha1 = self.get_transform_for_sha1_test()
+ trans_id = trans.create_path('file1', root)
+ trans.create_file(contents, trans_id, sha1=sha1)
+ st_val = osutils.lstat(trans._limbo_name(trans_id))
+ o_sha1, o_st_val = trans._observed_sha1s[trans_id]
+ self.assertEqual(o_sha1, sha1)
+ self.assertEqualStat(o_st_val, st_val)
+ creation_mtime = trans._creation_mtime + 10.0
+ # We fake a time difference from when the file was created until now it
+ # is being renamed by using os.utime. Note that the change we actually
+ # want to see is the real ctime change from 'os.rename()', but as long
+ # as we observe a new stat value, we should be fine.
+ os.utime(trans._limbo_name(trans_id), (creation_mtime, creation_mtime))
+ trans.apply()
+ new_st_val = osutils.lstat(self.wt.abspath('file1'))
+ o_sha1, o_st_val = trans._observed_sha1s[trans_id]
+ self.assertEqual(o_sha1, sha1)
+ self.assertEqualStat(o_st_val, new_st_val)
+ self.assertNotEqual(st_val.st_mtime, new_st_val.st_mtime)
+
+ def test_new_file_caches_sha1(self):
+ trans, root, contents, sha1 = self.get_transform_for_sha1_test()
+ trans_id = trans.new_file('file1', root, contents, file_id='file1-id',
+ sha1=sha1)
+ st_val = osutils.lstat(trans._limbo_name(trans_id))
+ o_sha1, o_st_val = trans._observed_sha1s[trans_id]
+ self.assertEqual(o_sha1, sha1)
+ self.assertEqualStat(o_st_val, st_val)
+
+ def test_cancel_creation_removes_observed_sha1(self):
+ trans, root, contents, sha1 = self.get_transform_for_sha1_test()
+ trans_id = trans.new_file('file1', root, contents, file_id='file1-id',
+ sha1=sha1)
+ self.assertTrue(trans_id in trans._observed_sha1s)
+ trans.cancel_creation(trans_id)
+ self.assertFalse(trans_id in trans._observed_sha1s)
+
+ def test_create_files_same_timestamp(self):
+ transform, root = self.get_transform()
+ self.wt.lock_tree_write()
+ self.addCleanup(self.wt.unlock)
+ # Roll back the clock, so that we know everything is being set to the
+ # exact time
+ transform._creation_mtime = creation_mtime = time.time() - 20.0
+ transform.create_file('content-one',
+ transform.create_path('one', root))
+ time.sleep(1) # *ugly*
+ transform.create_file('content-two',
+ transform.create_path('two', root))
+ transform.apply()
+ fo, st1 = self.wt.get_file_with_stat(None, path='one', filtered=False)
+ fo.close()
+ fo, st2 = self.wt.get_file_with_stat(None, path='two', filtered=False)
+ fo.close()
+ # We only guarantee 2s resolution
+ self.assertTrue(abs(creation_mtime - st1.st_mtime) < 2.0,
+ "%s != %s within 2 seconds" % (creation_mtime, st1.st_mtime))
+ # But if we have more than that, all files should get the same result
+ self.assertEqual(st1.st_mtime, st2.st_mtime)
+
+ def test_change_root_id(self):
+ transform, root = self.get_transform()
+ self.assertNotEqual('new-root-id', self.wt.get_root_id())
+ transform.new_directory('', ROOT_PARENT, 'new-root-id')
+ transform.delete_contents(root)
+ transform.unversion_file(root)
+ transform.fixup_new_roots()
+ transform.apply()
+ self.assertEqual('new-root-id', self.wt.get_root_id())
+
+ def test_change_root_id_add_files(self):
+ transform, root = self.get_transform()
+ self.assertNotEqual('new-root-id', self.wt.get_root_id())
+ new_trans_id = transform.new_directory('', ROOT_PARENT, 'new-root-id')
+ transform.new_file('file', new_trans_id, ['new-contents\n'],
+ 'new-file-id')
+ transform.delete_contents(root)
+ transform.unversion_file(root)
+ transform.fixup_new_roots()
+ transform.apply()
+ self.assertEqual('new-root-id', self.wt.get_root_id())
+ self.assertEqual('new-file-id', self.wt.path2id('file'))
+ self.assertFileEqual('new-contents\n', self.wt.abspath('file'))
+
+ def test_add_two_roots(self):
+ transform, root = self.get_transform()
+ new_trans_id = transform.new_directory('', ROOT_PARENT, 'new-root-id')
+ new_trans_id = transform.new_directory('', ROOT_PARENT, 'alt-root-id')
+ self.assertRaises(ValueError, transform.fixup_new_roots)
+
+ def test_retain_existing_root(self):
+ tt, root = self.get_transform()
+ with tt:
+ tt.new_directory('', ROOT_PARENT, 'new-root-id')
+ tt.fixup_new_roots()
+ self.assertNotEqual('new-root-id', tt.final_file_id(tt.root))
+
+ def test_retain_existing_root_added_file(self):
+ tt, root = self.get_transform()
+ new_trans_id = tt.new_directory('', ROOT_PARENT, 'new-root-id')
+ child = tt.new_directory('child', new_trans_id, 'child-id')
+ tt.fixup_new_roots()
+ self.assertEqual(tt.root, tt.final_parent(child))
+
+ def test_add_unversioned_root(self):
+ transform, root = self.get_transform()
+ new_trans_id = transform.new_directory('', ROOT_PARENT, None)
+ transform.delete_contents(transform.root)
+ transform.fixup_new_roots()
+ self.assertNotIn(transform.root, transform._new_id)
+
+ def test_remove_root_fixup(self):
+ transform, root = self.get_transform()
+ old_root_id = self.wt.get_root_id()
+ self.assertNotEqual('new-root-id', old_root_id)
+ transform.delete_contents(root)
+ transform.unversion_file(root)
+ transform.fixup_new_roots()
+ transform.apply()
+ self.assertEqual(old_root_id, self.wt.get_root_id())
+
+ transform, root = self.get_transform()
+ new_trans_id = transform.new_directory('', ROOT_PARENT, 'new-root-id')
+ new_trans_id = transform.new_directory('', ROOT_PARENT, 'alt-root-id')
+ self.assertRaises(ValueError, transform.fixup_new_roots)
+
+ def test_fixup_new_roots_permits_empty_tree(self):
+ transform, root = self.get_transform()
+ transform.delete_contents(root)
+ transform.unversion_file(root)
+ transform.fixup_new_roots()
+ self.assertIs(None, transform.final_kind(root))
+ self.assertIs(None, transform.final_file_id(root))
+
+ def test_apply_retains_root_directory(self):
+ # Do not attempt to delete the physical root directory, because that
+ # is impossible.
+ transform, root = self.get_transform()
+ with transform:
+ transform.delete_contents(root)
+ e = self.assertRaises(AssertionError, self.assertRaises,
+ errors.TransformRenameFailed,
+ transform.apply)
+ self.assertContainsRe('TransformRenameFailed not raised', str(e))
+
+ def test_apply_retains_file_id(self):
+ transform, root = self.get_transform()
+ old_root_id = transform.tree_file_id(root)
+ transform.unversion_file(root)
+ transform.apply()
+ self.assertEqual(old_root_id, self.wt.get_root_id())
+
+ def test_hardlink(self):
+ self.requireFeature(HardlinkFeature)
+ transform, root = self.get_transform()
+ transform.new_file('file1', root, 'contents')
+ transform.apply()
+ target = self.make_branch_and_tree('target')
+ target_transform = TreeTransform(target)
+ trans_id = target_transform.create_path('file1', target_transform.root)
+ target_transform.create_hardlink(self.wt.abspath('file1'), trans_id)
+ target_transform.apply()
+ self.assertPathExists('target/file1')
+ source_stat = os.stat(self.wt.abspath('file1'))
+ target_stat = os.stat('target/file1')
+ self.assertEqual(source_stat, target_stat)
+
+ def test_convenience(self):
+ transform, root = self.get_transform()
+ self.wt.lock_tree_write()
+ self.addCleanup(self.wt.unlock)
+ trans_id = transform.new_file('name', root, 'contents',
+ 'my_pretties', True)
+ oz = transform.new_directory('oz', root, 'oz-id')
+ dorothy = transform.new_directory('dorothy', oz, 'dorothy-id')
+ toto = transform.new_file('toto', dorothy, 'toto-contents',
+ 'toto-id', False)
+
+ self.assertEqual(len(transform.find_conflicts()), 0)
+ transform.apply()
+ self.assertRaises(ReusingTransform, transform.find_conflicts)
+ self.assertEqual('contents', file(self.wt.abspath('name')).read())
+ self.assertEqual(self.wt.path2id('name'), 'my_pretties')
+ self.assertIs(self.wt.is_executable('my_pretties'), True)
+ self.assertEqual(self.wt.path2id('oz'), 'oz-id')
+ self.assertEqual(self.wt.path2id('oz/dorothy'), 'dorothy-id')
+ self.assertEqual(self.wt.path2id('oz/dorothy/toto'), 'toto-id')
+
+ self.assertEqual('toto-contents',
+ self.wt.get_file_byname('oz/dorothy/toto').read())
+ self.assertIs(self.wt.is_executable('toto-id'), False)
+
+ def test_tree_reference(self):
+ transform, root = self.get_transform()
+ tree = transform._tree
+ trans_id = transform.new_directory('reference', root, 'subtree-id')
+ transform.set_tree_reference('subtree-revision', trans_id)
+ transform.apply()
+ tree.lock_read()
+ self.addCleanup(tree.unlock)
+ self.assertEqual('subtree-revision',
+ tree.root_inventory['subtree-id'].reference_revision)
+
+ def test_conflicts(self):
+ transform, root = self.get_transform()
+ trans_id = transform.new_file('name', root, 'contents',
+ 'my_pretties')
+ self.assertEqual(len(transform.find_conflicts()), 0)
+ trans_id2 = transform.new_file('name', root, 'Crontents', 'toto')
+ self.assertEqual(transform.find_conflicts(),
+ [('duplicate', trans_id, trans_id2, 'name')])
+ self.assertRaises(MalformedTransform, transform.apply)
+ transform.adjust_path('name', trans_id, trans_id2)
+ self.assertEqual(transform.find_conflicts(),
+ [('non-directory parent', trans_id)])
+ tinman_id = transform.trans_id_tree_path('tinman')
+ transform.adjust_path('name', tinman_id, trans_id2)
+ self.assertEqual(transform.find_conflicts(),
+ [('unversioned parent', tinman_id),
+ ('missing parent', tinman_id)])
+ lion_id = transform.create_path('lion', root)
+ self.assertEqual(transform.find_conflicts(),
+ [('unversioned parent', tinman_id),
+ ('missing parent', tinman_id)])
+ transform.adjust_path('name', lion_id, trans_id2)
+ self.assertEqual(transform.find_conflicts(),
+ [('unversioned parent', lion_id),
+ ('missing parent', lion_id)])
+ transform.version_file("Courage", lion_id)
+ self.assertEqual(transform.find_conflicts(),
+ [('missing parent', lion_id),
+ ('versioning no contents', lion_id)])
+ transform.adjust_path('name2', root, trans_id2)
+ self.assertEqual(transform.find_conflicts(),
+ [('versioning no contents', lion_id)])
+ transform.create_file('Contents, okay?', lion_id)
+ transform.adjust_path('name2', trans_id2, trans_id2)
+ self.assertEqual(transform.find_conflicts(),
+ [('parent loop', trans_id2),
+ ('non-directory parent', trans_id2)])
+ transform.adjust_path('name2', root, trans_id2)
+ oz_id = transform.new_directory('oz', root)
+ transform.set_executability(True, oz_id)
+ self.assertEqual(transform.find_conflicts(),
+ [('unversioned executability', oz_id)])
+ transform.version_file('oz-id', oz_id)
+ self.assertEqual(transform.find_conflicts(),
+ [('non-file executability', oz_id)])
+ transform.set_executability(None, oz_id)
+ tip_id = transform.new_file('tip', oz_id, 'ozma', 'tip-id')
+ transform.apply()
+ self.assertEqual(self.wt.path2id('name'), 'my_pretties')
+ self.assertEqual('contents', file(self.wt.abspath('name')).read())
+ transform2, root = self.get_transform()
+ oz_id = transform2.trans_id_tree_file_id('oz-id')
+ newtip = transform2.new_file('tip', oz_id, 'other', 'tip-id')
+ result = transform2.find_conflicts()
+ fp = FinalPaths(transform2)
+ self.assert_('oz/tip' in transform2._tree_path_ids)
+ self.assertEqual(fp.get_path(newtip), pathjoin('oz', 'tip'))
+ self.assertEqual(len(result), 2)
+ self.assertEqual((result[0][0], result[0][1]),
+ ('duplicate', newtip))
+ self.assertEqual((result[1][0], result[1][2]),
+ ('duplicate id', newtip))
+ transform2.finalize()
+ transform3 = TreeTransform(self.wt)
+ self.addCleanup(transform3.finalize)
+ oz_id = transform3.trans_id_tree_file_id('oz-id')
+ transform3.delete_contents(oz_id)
+ self.assertEqual(transform3.find_conflicts(),
+ [('missing parent', oz_id)])
+ root_id = transform3.root
+ tip_id = transform3.trans_id_tree_file_id('tip-id')
+ transform3.adjust_path('tip', root_id, tip_id)
+ transform3.apply()
+
+ def test_conflict_on_case_insensitive(self):
+ tree = self.make_branch_and_tree('tree')
+ # Don't try this at home, kids!
+ # Force the tree to report that it is case sensitive, for conflict
+ # resolution tests
+ tree.case_sensitive = True
+ transform = TreeTransform(tree)
+ self.addCleanup(transform.finalize)
+ transform.new_file('file', transform.root, 'content')
+ transform.new_file('FiLe', transform.root, 'content')
+ result = transform.find_conflicts()
+ self.assertEqual([], result)
+ transform.finalize()
+ # Force the tree to report that it is case insensitive, for conflict
+ # generation tests
+ tree.case_sensitive = False
+ transform = TreeTransform(tree)
+ self.addCleanup(transform.finalize)
+ transform.new_file('file', transform.root, 'content')
+ transform.new_file('FiLe', transform.root, 'content')
+ result = transform.find_conflicts()
+ self.assertEqual([('duplicate', 'new-1', 'new-2', 'file')], result)
+
+ def test_conflict_on_case_insensitive_existing(self):
+ tree = self.make_branch_and_tree('tree')
+ self.build_tree(['tree/FiLe'])
+ # Don't try this at home, kids!
+ # Force the tree to report that it is case sensitive, for conflict
+ # resolution tests
+ tree.case_sensitive = True
+ transform = TreeTransform(tree)
+ self.addCleanup(transform.finalize)
+ transform.new_file('file', transform.root, 'content')
+ result = transform.find_conflicts()
+ self.assertEqual([], result)
+ transform.finalize()
+ # Force the tree to report that it is case insensitive, for conflict
+ # generation tests
+ tree.case_sensitive = False
+ transform = TreeTransform(tree)
+ self.addCleanup(transform.finalize)
+ transform.new_file('file', transform.root, 'content')
+ result = transform.find_conflicts()
+ self.assertEqual([('duplicate', 'new-1', 'new-2', 'file')], result)
+
+ def test_resolve_case_insensitive_conflict(self):
+ tree = self.make_branch_and_tree('tree')
+ # Don't try this at home, kids!
+ # Force the tree to report that it is case insensitive, for conflict
+ # resolution tests
+ tree.case_sensitive = False
+ transform = TreeTransform(tree)
+ self.addCleanup(transform.finalize)
+ transform.new_file('file', transform.root, 'content')
+ transform.new_file('FiLe', transform.root, 'content')
+ resolve_conflicts(transform)
+ transform.apply()
+ self.assertPathExists('tree/file')
+ self.assertPathExists('tree/FiLe.moved')
+
+ def test_resolve_checkout_case_conflict(self):
+ tree = self.make_branch_and_tree('tree')
+ # Don't try this at home, kids!
+ # Force the tree to report that it is case insensitive, for conflict
+ # resolution tests
+ tree.case_sensitive = False
+ transform = TreeTransform(tree)
+ self.addCleanup(transform.finalize)
+ transform.new_file('file', transform.root, 'content')
+ transform.new_file('FiLe', transform.root, 'content')
+ resolve_conflicts(transform,
+ pass_func=lambda t, c: resolve_checkout(t, c, []))
+ transform.apply()
+ self.assertPathExists('tree/file')
+ self.assertPathExists('tree/FiLe.moved')
+
+ def test_apply_case_conflict(self):
+ """Ensure that a transform with case conflicts can always be applied"""
+ tree = self.make_branch_and_tree('tree')
+ transform = TreeTransform(tree)
+ self.addCleanup(transform.finalize)
+ transform.new_file('file', transform.root, 'content')
+ transform.new_file('FiLe', transform.root, 'content')
+ dir = transform.new_directory('dir', transform.root)
+ transform.new_file('dirfile', dir, 'content')
+ transform.new_file('dirFiLe', dir, 'content')
+ resolve_conflicts(transform)
+ transform.apply()
+ self.assertPathExists('tree/file')
+ if not os.path.exists('tree/FiLe.moved'):
+ self.assertPathExists('tree/FiLe')
+ self.assertPathExists('tree/dir/dirfile')
+ if not os.path.exists('tree/dir/dirFiLe.moved'):
+ self.assertPathExists('tree/dir/dirFiLe')
+
+ def test_case_insensitive_limbo(self):
+ tree = self.make_branch_and_tree('tree')
+ # Don't try this at home, kids!
+ # Force the tree to report that it is case insensitive
+ tree.case_sensitive = False
+ transform = TreeTransform(tree)
+ self.addCleanup(transform.finalize)
+ dir = transform.new_directory('dir', transform.root)
+ first = transform.new_file('file', dir, 'content')
+ second = transform.new_file('FiLe', dir, 'content')
+ self.assertContainsRe(transform._limbo_name(first), 'new-1/file')
+ self.assertNotContainsRe(transform._limbo_name(second), 'new-1/FiLe')
+
+ def test_adjust_path_updates_child_limbo_names(self):
+ tree = self.make_branch_and_tree('tree')
+ transform = TreeTransform(tree)
+ self.addCleanup(transform.finalize)
+ foo_id = transform.new_directory('foo', transform.root)
+ bar_id = transform.new_directory('bar', foo_id)
+ baz_id = transform.new_directory('baz', bar_id)
+ qux_id = transform.new_directory('qux', baz_id)
+ transform.adjust_path('quxx', foo_id, bar_id)
+ self.assertStartsWith(transform._limbo_name(qux_id),
+ transform._limbo_name(bar_id))
+
+ def test_add_del(self):
+ start, root = self.get_transform()
+ start.new_directory('a', root, 'a')
+ start.apply()
+ transform, root = self.get_transform()
+ transform.delete_versioned(transform.trans_id_tree_file_id('a'))
+ transform.new_directory('a', root, 'a')
+ transform.apply()
+
+ def test_unversioning(self):
+ create_tree, root = self.get_transform()
+ parent_id = create_tree.new_directory('parent', root, 'parent-id')
+ create_tree.new_file('child', parent_id, 'child', 'child-id')
+ create_tree.apply()
+ unversion = TreeTransform(self.wt)
+ self.addCleanup(unversion.finalize)
+ parent = unversion.trans_id_tree_path('parent')
+ unversion.unversion_file(parent)
+ self.assertEqual(unversion.find_conflicts(),
+ [('unversioned parent', parent_id)])
+ file_id = unversion.trans_id_tree_file_id('child-id')
+ unversion.unversion_file(file_id)
+ unversion.apply()
+
+ def test_name_invariants(self):
+ create_tree, root = self.get_transform()
+ # prepare tree
+ root = create_tree.root
+ create_tree.new_file('name1', root, 'hello1', 'name1')
+ create_tree.new_file('name2', root, 'hello2', 'name2')
+ ddir = create_tree.new_directory('dying_directory', root, 'ddir')
+ create_tree.new_file('dying_file', ddir, 'goodbye1', 'dfile')
+ create_tree.new_file('moving_file', ddir, 'later1', 'mfile')
+ create_tree.new_file('moving_file2', root, 'later2', 'mfile2')
+ create_tree.apply()
+
+ mangle_tree,root = self.get_transform()
+ root = mangle_tree.root
+ #swap names
+ name1 = mangle_tree.trans_id_tree_file_id('name1')
+ name2 = mangle_tree.trans_id_tree_file_id('name2')
+ mangle_tree.adjust_path('name2', root, name1)
+ mangle_tree.adjust_path('name1', root, name2)
+
+ #tests for deleting parent directories
+ ddir = mangle_tree.trans_id_tree_file_id('ddir')
+ mangle_tree.delete_contents(ddir)
+ dfile = mangle_tree.trans_id_tree_file_id('dfile')
+ mangle_tree.delete_versioned(dfile)
+ mangle_tree.unversion_file(dfile)
+ mfile = mangle_tree.trans_id_tree_file_id('mfile')
+ mangle_tree.adjust_path('mfile', root, mfile)
+
+ #tests for adding parent directories
+ newdir = mangle_tree.new_directory('new_directory', root, 'newdir')
+ mfile2 = mangle_tree.trans_id_tree_file_id('mfile2')
+ mangle_tree.adjust_path('mfile2', newdir, mfile2)
+ mangle_tree.new_file('newfile', newdir, 'hello3', 'dfile')
+ self.assertEqual(mangle_tree.final_file_id(mfile2), 'mfile2')
+ self.assertEqual(mangle_tree.final_parent(mfile2), newdir)
+ self.assertEqual(mangle_tree.final_file_id(mfile2), 'mfile2')
+ mangle_tree.apply()
+ self.assertEqual(file(self.wt.abspath('name1')).read(), 'hello2')
+ self.assertEqual(file(self.wt.abspath('name2')).read(), 'hello1')
+ mfile2_path = self.wt.abspath(pathjoin('new_directory','mfile2'))
+ self.assertEqual(mangle_tree.final_parent(mfile2), newdir)
+ self.assertEqual(file(mfile2_path).read(), 'later2')
+ self.assertEqual(self.wt.id2path('mfile2'), 'new_directory/mfile2')
+ self.assertEqual(self.wt.path2id('new_directory/mfile2'), 'mfile2')
+ newfile_path = self.wt.abspath(pathjoin('new_directory','newfile'))
+ self.assertEqual(file(newfile_path).read(), 'hello3')
+ self.assertEqual(self.wt.path2id('dying_directory'), 'ddir')
+ self.assertIs(self.wt.path2id('dying_directory/dying_file'), None)
+ mfile2_path = self.wt.abspath(pathjoin('new_directory','mfile2'))
+
+ def test_both_rename(self):
+ create_tree,root = self.get_transform()
+ newdir = create_tree.new_directory('selftest', root, 'selftest-id')
+ create_tree.new_file('blackbox.py', newdir, 'hello1', 'blackbox-id')
+ create_tree.apply()
+ mangle_tree,root = self.get_transform()
+ selftest = mangle_tree.trans_id_tree_file_id('selftest-id')
+ blackbox = mangle_tree.trans_id_tree_file_id('blackbox-id')
+ mangle_tree.adjust_path('test', root, selftest)
+ mangle_tree.adjust_path('test_too_much', root, selftest)
+ mangle_tree.set_executability(True, blackbox)
+ mangle_tree.apply()
+
+ def test_both_rename2(self):
+ create_tree,root = self.get_transform()
+ bzrlib = create_tree.new_directory('bzrlib', root, 'bzrlib-id')
+ tests = create_tree.new_directory('tests', bzrlib, 'tests-id')
+ blackbox = create_tree.new_directory('blackbox', tests, 'blackbox-id')
+ create_tree.new_file('test_too_much.py', blackbox, 'hello1',
+ 'test_too_much-id')
+ create_tree.apply()
+ mangle_tree,root = self.get_transform()
+ bzrlib = mangle_tree.trans_id_tree_file_id('bzrlib-id')
+ tests = mangle_tree.trans_id_tree_file_id('tests-id')
+ test_too_much = mangle_tree.trans_id_tree_file_id('test_too_much-id')
+ mangle_tree.adjust_path('selftest', bzrlib, tests)
+ mangle_tree.adjust_path('blackbox.py', tests, test_too_much)
+ mangle_tree.set_executability(True, test_too_much)
+ mangle_tree.apply()
+
+ def test_both_rename3(self):
+ create_tree,root = self.get_transform()
+ tests = create_tree.new_directory('tests', root, 'tests-id')
+ create_tree.new_file('test_too_much.py', tests, 'hello1',
+ 'test_too_much-id')
+ create_tree.apply()
+ mangle_tree,root = self.get_transform()
+ tests = mangle_tree.trans_id_tree_file_id('tests-id')
+ test_too_much = mangle_tree.trans_id_tree_file_id('test_too_much-id')
+ mangle_tree.adjust_path('selftest', root, tests)
+ mangle_tree.adjust_path('blackbox.py', tests, test_too_much)
+ mangle_tree.set_executability(True, test_too_much)
+ mangle_tree.apply()
+
+ def test_move_dangling_ie(self):
+ create_tree, root = self.get_transform()
+ # prepare tree
+ root = create_tree.root
+ create_tree.new_file('name1', root, 'hello1', 'name1')
+ create_tree.apply()
+ delete_contents, root = self.get_transform()
+ file = delete_contents.trans_id_tree_file_id('name1')
+ delete_contents.delete_contents(file)
+ delete_contents.apply()
+ move_id, root = self.get_transform()
+ name1 = move_id.trans_id_tree_file_id('name1')
+ newdir = move_id.new_directory('dir', root, 'newdir')
+ move_id.adjust_path('name2', newdir, name1)
+ move_id.apply()
+
+ def test_replace_dangling_ie(self):
+ create_tree, root = self.get_transform()
+ # prepare tree
+ root = create_tree.root
+ create_tree.new_file('name1', root, 'hello1', 'name1')
+ create_tree.apply()
+ delete_contents = TreeTransform(self.wt)
+ self.addCleanup(delete_contents.finalize)
+ file = delete_contents.trans_id_tree_file_id('name1')
+ delete_contents.delete_contents(file)
+ delete_contents.apply()
+ delete_contents.finalize()
+ replace = TreeTransform(self.wt)
+ self.addCleanup(replace.finalize)
+ name2 = replace.new_file('name2', root, 'hello2', 'name1')
+ conflicts = replace.find_conflicts()
+ name1 = replace.trans_id_tree_file_id('name1')
+ self.assertEqual(conflicts, [('duplicate id', name1, name2)])
+ resolve_conflicts(replace)
+ replace.apply()
+
+ def _test_symlinks(self, link_name1,link_target1,
+ link_name2, link_target2):
+
+ def ozpath(p): return 'oz/' + p
+
+ self.requireFeature(SymlinkFeature)
+ transform, root = self.get_transform()
+ oz_id = transform.new_directory('oz', root, 'oz-id')
+ wizard = transform.new_symlink(link_name1, oz_id, link_target1,
+ 'wizard-id')
+ wiz_id = transform.create_path(link_name2, oz_id)
+ transform.create_symlink(link_target2, wiz_id)
+ transform.version_file('wiz-id2', wiz_id)
+ transform.set_executability(True, wiz_id)
+ self.assertEqual(transform.find_conflicts(),
+ [('non-file executability', wiz_id)])
+ transform.set_executability(None, wiz_id)
+ transform.apply()
+ self.assertEqual(self.wt.path2id(ozpath(link_name1)), 'wizard-id')
+ self.assertEqual('symlink',
+ file_kind(self.wt.abspath(ozpath(link_name1))))
+ self.assertEqual(link_target2,
+ osutils.readlink(self.wt.abspath(ozpath(link_name2))))
+ self.assertEqual(link_target1,
+ osutils.readlink(self.wt.abspath(ozpath(link_name1))))
+
+ def test_symlinks(self):
+ self._test_symlinks('wizard', 'wizard-target',
+ 'wizard2', 'behind_curtain')
+
+ def test_symlinks_unicode(self):
+ self.requireFeature(features.UnicodeFilenameFeature)
+ self._test_symlinks(u'\N{Euro Sign}wizard',
+ u'wizard-targ\N{Euro Sign}t',
+ u'\N{Euro Sign}wizard2',
+ u'b\N{Euro Sign}hind_curtain')
+
+ def test_unable_create_symlink(self):
+ def tt_helper():
+ wt = self.make_branch_and_tree('.')
+ tt = TreeTransform(wt) # TreeTransform obtains write lock
+ try:
+ tt.new_symlink('foo', tt.root, 'bar')
+ tt.apply()
+ finally:
+ wt.unlock()
+ os_symlink = getattr(os, 'symlink', None)
+ os.symlink = None
+ try:
+ err = self.assertRaises(errors.UnableCreateSymlink, tt_helper)
+ self.assertEquals(
+ "Unable to create symlink 'foo' on this platform",
+ str(err))
+ finally:
+ if os_symlink:
+ os.symlink = os_symlink
+
+ def get_conflicted(self):
+ create,root = self.get_transform()
+ create.new_file('dorothy', root, 'dorothy', 'dorothy-id')
+ oz = create.new_directory('oz', root, 'oz-id')
+ create.new_directory('emeraldcity', oz, 'emerald-id')
+ create.apply()
+ conflicts,root = self.get_transform()
+ # set up duplicate entry, duplicate id
+ new_dorothy = conflicts.new_file('dorothy', root, 'dorothy',
+ 'dorothy-id')
+ old_dorothy = conflicts.trans_id_tree_file_id('dorothy-id')
+ oz = conflicts.trans_id_tree_file_id('oz-id')
+ # set up DeletedParent parent conflict
+ conflicts.delete_versioned(oz)
+ emerald = conflicts.trans_id_tree_file_id('emerald-id')
+ # set up MissingParent conflict
+ munchkincity = conflicts.trans_id_file_id('munchkincity-id')
+ conflicts.adjust_path('munchkincity', root, munchkincity)
+ conflicts.new_directory('auntem', munchkincity, 'auntem-id')
+ # set up parent loop
+ conflicts.adjust_path('emeraldcity', emerald, emerald)
+ return conflicts, emerald, oz, old_dorothy, new_dorothy
+
+ def test_conflict_resolution(self):
+ conflicts, emerald, oz, old_dorothy, new_dorothy =\
+ self.get_conflicted()
+ resolve_conflicts(conflicts)
+ self.assertEqual(conflicts.final_name(old_dorothy), 'dorothy.moved')
+ self.assertIs(conflicts.final_file_id(old_dorothy), None)
+ self.assertEqual(conflicts.final_name(new_dorothy), 'dorothy')
+ self.assertEqual(conflicts.final_file_id(new_dorothy), 'dorothy-id')
+ self.assertEqual(conflicts.final_parent(emerald), oz)
+ conflicts.apply()
+
+ def test_cook_conflicts(self):
+ tt, emerald, oz, old_dorothy, new_dorothy = self.get_conflicted()
+ raw_conflicts = resolve_conflicts(tt)
+ cooked_conflicts = cook_conflicts(raw_conflicts, tt)
+ duplicate = DuplicateEntry('Moved existing file to', 'dorothy.moved',
+ 'dorothy', None, 'dorothy-id')
+ self.assertEqual(cooked_conflicts[0], duplicate)
+ duplicate_id = DuplicateID('Unversioned existing file',
+ 'dorothy.moved', 'dorothy', None,
+ 'dorothy-id')
+ self.assertEqual(cooked_conflicts[1], duplicate_id)
+ missing_parent = MissingParent('Created directory', 'munchkincity',
+ 'munchkincity-id')
+ deleted_parent = DeletingParent('Not deleting', 'oz', 'oz-id')
+ self.assertEqual(cooked_conflicts[2], missing_parent)
+ unversioned_parent = UnversionedParent('Versioned directory',
+ 'munchkincity',
+ 'munchkincity-id')
+ unversioned_parent2 = UnversionedParent('Versioned directory', 'oz',
+ 'oz-id')
+ self.assertEqual(cooked_conflicts[3], unversioned_parent)
+ parent_loop = ParentLoop('Cancelled move', 'oz/emeraldcity',
+ 'oz/emeraldcity', 'emerald-id', 'emerald-id')
+ self.assertEqual(cooked_conflicts[4], deleted_parent)
+ self.assertEqual(cooked_conflicts[5], unversioned_parent2)
+ self.assertEqual(cooked_conflicts[6], parent_loop)
+ self.assertEqual(len(cooked_conflicts), 7)
+ tt.finalize()
+
+ def test_string_conflicts(self):
+ tt, emerald, oz, old_dorothy, new_dorothy = self.get_conflicted()
+ raw_conflicts = resolve_conflicts(tt)
+ cooked_conflicts = cook_conflicts(raw_conflicts, tt)
+ tt.finalize()
+ conflicts_s = [unicode(c) for c in cooked_conflicts]
+ self.assertEqual(len(cooked_conflicts), len(conflicts_s))
+ self.assertEqual(conflicts_s[0], 'Conflict adding file dorothy. '
+ 'Moved existing file to '
+ 'dorothy.moved.')
+ self.assertEqual(conflicts_s[1], 'Conflict adding id to dorothy. '
+ 'Unversioned existing file '
+ 'dorothy.moved.')
+ self.assertEqual(conflicts_s[2], 'Conflict adding files to'
+ ' munchkincity. Created directory.')
+ self.assertEqual(conflicts_s[3], 'Conflict because munchkincity is not'
+ ' versioned, but has versioned'
+ ' children. Versioned directory.')
+ self.assertEqualDiff(conflicts_s[4], "Conflict: can't delete oz because it"
+ " is not empty. Not deleting.")
+ self.assertEqual(conflicts_s[5], 'Conflict because oz is not'
+ ' versioned, but has versioned'
+ ' children. Versioned directory.')
+ self.assertEqual(conflicts_s[6], 'Conflict moving oz/emeraldcity into'
+ ' oz/emeraldcity. Cancelled move.')
+
+ def prepare_wrong_parent_kind(self):
+ tt, root = self.get_transform()
+ tt.new_file('parent', root, 'contents', 'parent-id')
+ tt.apply()
+ tt, root = self.get_transform()
+ parent_id = tt.trans_id_file_id('parent-id')
+ tt.new_file('child,', parent_id, 'contents2', 'file-id')
+ return tt
+
+ def test_find_conflicts_wrong_parent_kind(self):
+ tt = self.prepare_wrong_parent_kind()
+ tt.find_conflicts()
+
+ def test_resolve_conflicts_wrong_existing_parent_kind(self):
+ tt = self.prepare_wrong_parent_kind()
+ raw_conflicts = resolve_conflicts(tt)
+ self.assertEqual(set([('non-directory parent', 'Created directory',
+ 'new-3')]), raw_conflicts)
+ cooked_conflicts = cook_conflicts(raw_conflicts, tt)
+ self.assertEqual([NonDirectoryParent('Created directory', 'parent.new',
+ 'parent-id')], cooked_conflicts)
+ tt.apply()
+ self.assertEqual(None, self.wt.path2id('parent'))
+ self.assertEqual('parent-id', self.wt.path2id('parent.new'))
+
+ def test_resolve_conflicts_wrong_new_parent_kind(self):
+ tt, root = self.get_transform()
+ parent_id = tt.new_directory('parent', root, 'parent-id')
+ tt.new_file('child,', parent_id, 'contents2', 'file-id')
+ tt.apply()
+ tt, root = self.get_transform()
+ parent_id = tt.trans_id_file_id('parent-id')
+ tt.delete_contents(parent_id)
+ tt.create_file('contents', parent_id)
+ raw_conflicts = resolve_conflicts(tt)
+ self.assertEqual(set([('non-directory parent', 'Created directory',
+ 'new-3')]), raw_conflicts)
+ tt.apply()
+ self.assertEqual(None, self.wt.path2id('parent'))
+ self.assertEqual('parent-id', self.wt.path2id('parent.new'))
+
+ def test_resolve_conflicts_wrong_parent_kind_unversioned(self):
+ tt, root = self.get_transform()
+ parent_id = tt.new_directory('parent', root)
+ tt.new_file('child,', parent_id, 'contents2')
+ tt.apply()
+ tt, root = self.get_transform()
+ parent_id = tt.trans_id_tree_path('parent')
+ tt.delete_contents(parent_id)
+ tt.create_file('contents', parent_id)
+ resolve_conflicts(tt)
+ tt.apply()
+ self.assertIs(None, self.wt.path2id('parent'))
+ self.assertIs(None, self.wt.path2id('parent.new'))
+
+ def test_resolve_conflicts_missing_parent(self):
+ wt = self.make_branch_and_tree('.')
+ tt = TreeTransform(wt)
+ self.addCleanup(tt.finalize)
+ parent = tt.trans_id_file_id('parent-id')
+ tt.new_file('file', parent, 'Contents')
+ raw_conflicts = resolve_conflicts(tt)
+ # Since the directory doesn't exist it's seen as 'missing'. So
+ # 'resolve_conflicts' create a conflict asking for it to be created.
+ self.assertLength(1, raw_conflicts)
+ self.assertEqual(('missing parent', 'Created directory', 'new-1'),
+ raw_conflicts.pop())
+ # apply fail since the missing directory doesn't exist
+ self.assertRaises(errors.NoFinalPath, tt.apply)
+
+ def test_moving_versioned_directories(self):
+ create, root = self.get_transform()
+ kansas = create.new_directory('kansas', root, 'kansas-id')
+ create.new_directory('house', kansas, 'house-id')
+ create.new_directory('oz', root, 'oz-id')
+ create.apply()
+ cyclone, root = self.get_transform()
+ oz = cyclone.trans_id_tree_file_id('oz-id')
+ house = cyclone.trans_id_tree_file_id('house-id')
+ cyclone.adjust_path('house', oz, house)
+ cyclone.apply()
+
+ def test_moving_root(self):
+ create, root = self.get_transform()
+ fun = create.new_directory('fun', root, 'fun-id')
+ create.new_directory('sun', root, 'sun-id')
+ create.new_directory('moon', root, 'moon')
+ create.apply()
+ transform, root = self.get_transform()
+ transform.adjust_root_path('oldroot', fun)
+ new_root = transform.trans_id_tree_path('')
+ transform.version_file('new-root', new_root)
+ transform.apply()
+
+ def test_renames(self):
+ create, root = self.get_transform()
+ old = create.new_directory('old-parent', root, 'old-id')
+ intermediate = create.new_directory('intermediate', old, 'im-id')
+ myfile = create.new_file('myfile', intermediate, 'myfile-text',
+ 'myfile-id')
+ create.apply()
+ rename, root = self.get_transform()
+ old = rename.trans_id_file_id('old-id')
+ rename.adjust_path('new', root, old)
+ myfile = rename.trans_id_file_id('myfile-id')
+ rename.set_executability(True, myfile)
+ rename.apply()
+
+ def test_rename_fails(self):
+ self.requireFeature(features.not_running_as_root)
+ # see https://bugs.launchpad.net/bzr/+bug/491763
+ create, root_id = self.get_transform()
+ first_dir = create.new_directory('first-dir', root_id, 'first-id')
+ myfile = create.new_file('myfile', root_id, 'myfile-text',
+ 'myfile-id')
+ create.apply()
+ if os.name == "posix" and sys.platform != "cygwin":
+ # posix filesystems fail on renaming if the readonly bit is set
+ osutils.make_readonly(self.wt.abspath('first-dir'))
+ elif os.name == "nt":
+ # windows filesystems fail on renaming open files
+ self.addCleanup(file(self.wt.abspath('myfile')).close)
+ else:
+ self.skip("Don't know how to force a permissions error on rename")
+ # now transform to rename
+ rename_transform, root_id = self.get_transform()
+ file_trans_id = rename_transform.trans_id_file_id('myfile-id')
+ dir_id = rename_transform.trans_id_file_id('first-id')
+ rename_transform.adjust_path('newname', dir_id, file_trans_id)
+ e = self.assertRaises(errors.TransformRenameFailed,
+ rename_transform.apply)
+ # On nix looks like:
+ # "Failed to rename .../work/.bzr/checkout/limbo/new-1
+ # to .../first-dir/newname: [Errno 13] Permission denied"
+ # On windows looks like:
+ # "Failed to rename .../work/myfile to
+ # .../work/.bzr/checkout/limbo/new-1: [Errno 13] Permission denied"
+ # This test isn't concerned with exactly what the error looks like,
+ # and the strerror will vary across OS and locales, but the assert
+ # that the exeception attributes are what we expect
+ self.assertEqual(e.errno, errno.EACCES)
+ if os.name == "posix":
+ self.assertEndsWith(e.to_path, "/first-dir/newname")
+ else:
+ self.assertEqual(os.path.basename(e.from_path), "myfile")
+
+ def test_set_executability_order(self):
+ """Ensure that executability behaves the same, no matter what order.
+
+ - create file and set executability simultaneously
+ - create file and set executability afterward
+ - unsetting the executability of a file whose executability has not been
+ declared should throw an exception (this may happen when a
+ merge attempts to create a file with a duplicate ID)
+ """
+ transform, root = self.get_transform()
+ wt = transform._tree
+ wt.lock_read()
+ self.addCleanup(wt.unlock)
+ transform.new_file('set_on_creation', root, 'Set on creation', 'soc',
+ True)
+ sac = transform.new_file('set_after_creation', root,
+ 'Set after creation', 'sac')
+ transform.set_executability(True, sac)
+ uws = transform.new_file('unset_without_set', root, 'Unset badly',
+ 'uws')
+ self.assertRaises(KeyError, transform.set_executability, None, uws)
+ transform.apply()
+ self.assertTrue(wt.is_executable('soc'))
+ self.assertTrue(wt.is_executable('sac'))
+
+ def test_preserve_mode(self):
+ """File mode is preserved when replacing content"""
+ if sys.platform == 'win32':
+ raise TestSkipped('chmod has no effect on win32')
+ transform, root = self.get_transform()
+ transform.new_file('file1', root, 'contents', 'file1-id', True)
+ transform.apply()
+ self.wt.lock_write()
+ self.addCleanup(self.wt.unlock)
+ self.assertTrue(self.wt.is_executable('file1-id'))
+ transform, root = self.get_transform()
+ file1_id = transform.trans_id_tree_file_id('file1-id')
+ transform.delete_contents(file1_id)
+ transform.create_file('contents2', file1_id)
+ transform.apply()
+ self.assertTrue(self.wt.is_executable('file1-id'))
+
+ def test__set_mode_stats_correctly(self):
+ """_set_mode stats to determine file mode."""
+ if sys.platform == 'win32':
+ raise TestSkipped('chmod has no effect on win32')
+
+ stat_paths = []
+ real_stat = os.stat
+ def instrumented_stat(path):
+ stat_paths.append(path)
+ return real_stat(path)
+
+ transform, root = self.get_transform()
+
+ bar1_id = transform.new_file('bar', root, 'bar contents 1\n',
+ file_id='bar-id-1', executable=False)
+ transform.apply()
+
+ transform, root = self.get_transform()
+ bar1_id = transform.trans_id_tree_path('bar')
+ bar2_id = transform.trans_id_tree_path('bar2')
+ try:
+ os.stat = instrumented_stat
+ transform.create_file('bar2 contents\n', bar2_id, mode_id=bar1_id)
+ finally:
+ os.stat = real_stat
+ transform.finalize()
+
+ bar1_abspath = self.wt.abspath('bar')
+ self.assertEqual([bar1_abspath], stat_paths)
+
+ def test_iter_changes(self):
+ self.wt.set_root_id('eert_toor')
+ transform, root = self.get_transform()
+ transform.new_file('old', root, 'blah', 'id-1', True)
+ transform.apply()
+ transform, root = self.get_transform()
+ try:
+ self.assertEqual([], list(transform.iter_changes()))
+ old = transform.trans_id_tree_file_id('id-1')
+ transform.unversion_file(old)
+ self.assertEqual([('id-1', ('old', None), False, (True, False),
+ ('eert_toor', 'eert_toor'), ('old', 'old'), ('file', 'file'),
+ (True, True))], list(transform.iter_changes()))
+ transform.new_directory('new', root, 'id-1')
+ self.assertEqual([('id-1', ('old', 'new'), True, (True, True),
+ ('eert_toor', 'eert_toor'), ('old', 'new'),
+ ('file', 'directory'),
+ (True, False))], list(transform.iter_changes()))
+ finally:
+ transform.finalize()
+
+ def test_iter_changes_new(self):
+ self.wt.set_root_id('eert_toor')
+ transform, root = self.get_transform()
+ transform.new_file('old', root, 'blah')
+ transform.apply()
+ transform, root = self.get_transform()
+ try:
+ old = transform.trans_id_tree_path('old')
+ transform.version_file('id-1', old)
+ self.assertEqual([('id-1', (None, 'old'), False, (False, True),
+ ('eert_toor', 'eert_toor'), ('old', 'old'), ('file', 'file'),
+ (False, False))], list(transform.iter_changes()))
+ finally:
+ transform.finalize()
+
+ def test_iter_changes_modifications(self):
+ self.wt.set_root_id('eert_toor')
+ transform, root = self.get_transform()
+ transform.new_file('old', root, 'blah', 'id-1')
+ transform.new_file('new', root, 'blah')
+ transform.new_directory('subdir', root, 'subdir-id')
+ transform.apply()
+ transform, root = self.get_transform()
+ try:
+ old = transform.trans_id_tree_path('old')
+ subdir = transform.trans_id_tree_file_id('subdir-id')
+ new = transform.trans_id_tree_path('new')
+ self.assertEqual([], list(transform.iter_changes()))
+
+ #content deletion
+ transform.delete_contents(old)
+ self.assertEqual([('id-1', ('old', 'old'), True, (True, True),
+ ('eert_toor', 'eert_toor'), ('old', 'old'), ('file', None),
+ (False, False))], list(transform.iter_changes()))
+
+ #content change
+ transform.create_file('blah', old)
+ self.assertEqual([('id-1', ('old', 'old'), True, (True, True),
+ ('eert_toor', 'eert_toor'), ('old', 'old'), ('file', 'file'),
+ (False, False))], list(transform.iter_changes()))
+ transform.cancel_deletion(old)
+ self.assertEqual([('id-1', ('old', 'old'), True, (True, True),
+ ('eert_toor', 'eert_toor'), ('old', 'old'), ('file', 'file'),
+ (False, False))], list(transform.iter_changes()))
+ transform.cancel_creation(old)
+
+ # move file_id to a different file
+ self.assertEqual([], list(transform.iter_changes()))
+ transform.unversion_file(old)
+ transform.version_file('id-1', new)
+ transform.adjust_path('old', root, new)
+ self.assertEqual([('id-1', ('old', 'old'), True, (True, True),
+ ('eert_toor', 'eert_toor'), ('old', 'old'), ('file', 'file'),
+ (False, False))], list(transform.iter_changes()))
+ transform.cancel_versioning(new)
+ transform._removed_id = set()
+
+ #execute bit
+ self.assertEqual([], list(transform.iter_changes()))
+ transform.set_executability(True, old)
+ self.assertEqual([('id-1', ('old', 'old'), False, (True, True),
+ ('eert_toor', 'eert_toor'), ('old', 'old'), ('file', 'file'),
+ (False, True))], list(transform.iter_changes()))
+ transform.set_executability(None, old)
+
+ # filename
+ self.assertEqual([], list(transform.iter_changes()))
+ transform.adjust_path('new', root, old)
+ transform._new_parent = {}
+ self.assertEqual([('id-1', ('old', 'new'), False, (True, True),
+ ('eert_toor', 'eert_toor'), ('old', 'new'), ('file', 'file'),
+ (False, False))], list(transform.iter_changes()))
+ transform._new_name = {}
+
+ # parent directory
+ self.assertEqual([], list(transform.iter_changes()))
+ transform.adjust_path('new', subdir, old)
+ transform._new_name = {}
+ self.assertEqual([('id-1', ('old', 'subdir/old'), False,
+ (True, True), ('eert_toor', 'subdir-id'), ('old', 'old'),
+ ('file', 'file'), (False, False))],
+ list(transform.iter_changes()))
+ transform._new_path = {}
+
+ finally:
+ transform.finalize()
+
+ def test_iter_changes_modified_bleed(self):
+ self.wt.set_root_id('eert_toor')
+ """Modified flag should not bleed from one change to another"""
+ # unfortunately, we have no guarantee that file1 (which is modified)
+ # will be applied before file2. And if it's applied after file2, it
+ # obviously can't bleed into file2's change output. But for now, it
+ # works.
+ transform, root = self.get_transform()
+ transform.new_file('file1', root, 'blah', 'id-1')
+ transform.new_file('file2', root, 'blah', 'id-2')
+ transform.apply()
+ transform, root = self.get_transform()
+ try:
+ transform.delete_contents(transform.trans_id_file_id('id-1'))
+ transform.set_executability(True,
+ transform.trans_id_file_id('id-2'))
+ self.assertEqual([('id-1', (u'file1', u'file1'), True, (True, True),
+ ('eert_toor', 'eert_toor'), ('file1', u'file1'),
+ ('file', None), (False, False)),
+ ('id-2', (u'file2', u'file2'), False, (True, True),
+ ('eert_toor', 'eert_toor'), ('file2', u'file2'),
+ ('file', 'file'), (False, True))],
+ list(transform.iter_changes()))
+ finally:
+ transform.finalize()
+
+ def test_iter_changes_move_missing(self):
+ """Test moving ids with no files around"""
+ self.wt.set_root_id('toor_eert')
+ # Need two steps because versioning a non-existant file is a conflict.
+ transform, root = self.get_transform()
+ transform.new_directory('floater', root, 'floater-id')
+ transform.apply()
+ transform, root = self.get_transform()
+ transform.delete_contents(transform.trans_id_tree_path('floater'))
+ transform.apply()
+ transform, root = self.get_transform()
+ floater = transform.trans_id_tree_path('floater')
+ try:
+ transform.adjust_path('flitter', root, floater)
+ self.assertEqual([('floater-id', ('floater', 'flitter'), False,
+ (True, True), ('toor_eert', 'toor_eert'), ('floater', 'flitter'),
+ (None, None), (False, False))], list(transform.iter_changes()))
+ finally:
+ transform.finalize()
+
+ def test_iter_changes_pointless(self):
+ """Ensure that no-ops are not treated as modifications"""
+ self.wt.set_root_id('eert_toor')
+ transform, root = self.get_transform()
+ transform.new_file('old', root, 'blah', 'id-1')
+ transform.new_directory('subdir', root, 'subdir-id')
+ transform.apply()
+ transform, root = self.get_transform()
+ try:
+ old = transform.trans_id_tree_path('old')
+ subdir = transform.trans_id_tree_file_id('subdir-id')
+ self.assertEqual([], list(transform.iter_changes()))
+ transform.delete_contents(subdir)
+ transform.create_directory(subdir)
+ transform.set_executability(False, old)
+ transform.unversion_file(old)
+ transform.version_file('id-1', old)
+ transform.adjust_path('old', root, old)
+ self.assertEqual([], list(transform.iter_changes()))
+ finally:
+ transform.finalize()
+
+ def test_rename_count(self):
+ transform, root = self.get_transform()
+ transform.new_file('name1', root, 'contents')
+ self.assertEqual(transform.rename_count, 0)
+ transform.apply()
+ self.assertEqual(transform.rename_count, 1)
+ transform2, root = self.get_transform()
+ transform2.adjust_path('name2', root,
+ transform2.trans_id_tree_path('name1'))
+ self.assertEqual(transform2.rename_count, 0)
+ transform2.apply()
+ self.assertEqual(transform2.rename_count, 2)
+
+ def test_change_parent(self):
+ """Ensure that after we change a parent, the results are still right.
+
+ Renames and parent changes on pending transforms can happen as part
+ of conflict resolution, and are explicitly permitted by the
+ TreeTransform API.
+
+ This test ensures they work correctly with the rename-avoidance
+ optimization.
+ """
+ transform, root = self.get_transform()
+ parent1 = transform.new_directory('parent1', root)
+ child1 = transform.new_file('child1', parent1, 'contents')
+ parent2 = transform.new_directory('parent2', root)
+ transform.adjust_path('child1', parent2, child1)
+ transform.apply()
+ self.assertPathDoesNotExist(self.wt.abspath('parent1/child1'))
+ self.assertPathExists(self.wt.abspath('parent2/child1'))
+ # rename limbo/new-1 => parent1, rename limbo/new-3 => parent2
+ # no rename for child1 (counting only renames during apply)
+ self.assertEqual(2, transform.rename_count)
+
+ def test_cancel_parent(self):
+ """Cancelling a parent doesn't cause deletion of a non-empty directory
+
+ This is like the test_change_parent, except that we cancel the parent
+ before adjusting the path. The transform must detect that the
+ directory is non-empty, and move children to safe locations.
+ """
+ transform, root = self.get_transform()
+ parent1 = transform.new_directory('parent1', root)
+ child1 = transform.new_file('child1', parent1, 'contents')
+ child2 = transform.new_file('child2', parent1, 'contents')
+ try:
+ transform.cancel_creation(parent1)
+ except OSError:
+ self.fail('Failed to move child1 before deleting parent1')
+ transform.cancel_creation(child2)
+ transform.create_directory(parent1)
+ try:
+ transform.cancel_creation(parent1)
+ # If the transform incorrectly believes that child2 is still in
+ # parent1's limbo directory, it will try to rename it and fail
+ # because was already moved by the first cancel_creation.
+ except OSError:
+ self.fail('Transform still thinks child2 is a child of parent1')
+ parent2 = transform.new_directory('parent2', root)
+ transform.adjust_path('child1', parent2, child1)
+ transform.apply()
+ self.assertPathDoesNotExist(self.wt.abspath('parent1'))
+ self.assertPathExists(self.wt.abspath('parent2/child1'))
+ # rename limbo/new-3 => parent2, rename limbo/new-2 => child1
+ self.assertEqual(2, transform.rename_count)
+
+ def test_adjust_and_cancel(self):
+ """Make sure adjust_path keeps track of limbo children properly"""
+ transform, root = self.get_transform()
+ parent1 = transform.new_directory('parent1', root)
+ child1 = transform.new_file('child1', parent1, 'contents')
+ parent2 = transform.new_directory('parent2', root)
+ transform.adjust_path('child1', parent2, child1)
+ transform.cancel_creation(child1)
+ try:
+ transform.cancel_creation(parent1)
+ # if the transform thinks child1 is still in parent1's limbo
+ # directory, it will attempt to move it and fail.
+ except OSError:
+ self.fail('Transform still thinks child1 is a child of parent1')
+ transform.finalize()
+
+ def test_noname_contents(self):
+ """TreeTransform should permit deferring naming files."""
+ transform, root = self.get_transform()
+ parent = transform.trans_id_file_id('parent-id')
+ try:
+ transform.create_directory(parent)
+ except KeyError:
+ self.fail("Can't handle contents with no name")
+ transform.finalize()
+
+ def test_noname_contents_nested(self):
+ """TreeTransform should permit deferring naming files."""
+ transform, root = self.get_transform()
+ parent = transform.trans_id_file_id('parent-id')
+ try:
+ transform.create_directory(parent)
+ except KeyError:
+ self.fail("Can't handle contents with no name")
+ child = transform.new_directory('child', parent)
+ transform.adjust_path('parent', root, parent)
+ transform.apply()
+ self.assertPathExists(self.wt.abspath('parent/child'))
+ self.assertEqual(1, transform.rename_count)
+
+ def test_reuse_name(self):
+ """Avoid reusing the same limbo name for different files"""
+ transform, root = self.get_transform()
+ parent = transform.new_directory('parent', root)
+ child1 = transform.new_directory('child', parent)
+ try:
+ child2 = transform.new_directory('child', parent)
+ except OSError:
+ self.fail('Tranform tried to use the same limbo name twice')
+ transform.adjust_path('child2', parent, child2)
+ transform.apply()
+ # limbo/new-1 => parent, limbo/new-3 => parent/child2
+ # child2 is put into top-level limbo because child1 has already
+ # claimed the direct limbo path when child2 is created. There is no
+ # advantage in renaming files once they're in top-level limbo, except
+ # as part of apply.
+ self.assertEqual(2, transform.rename_count)
+
+ def test_reuse_when_first_moved(self):
+ """Don't avoid direct paths when it is safe to use them"""
+ transform, root = self.get_transform()
+ parent = transform.new_directory('parent', root)
+ child1 = transform.new_directory('child', parent)
+ transform.adjust_path('child1', parent, child1)
+ child2 = transform.new_directory('child', parent)
+ transform.apply()
+ # limbo/new-1 => parent
+ self.assertEqual(1, transform.rename_count)
+
+ def test_reuse_after_cancel(self):
+ """Don't avoid direct paths when it is safe to use them"""
+ transform, root = self.get_transform()
+ parent2 = transform.new_directory('parent2', root)
+ child1 = transform.new_directory('child1', parent2)
+ transform.cancel_creation(parent2)
+ transform.create_directory(parent2)
+ child2 = transform.new_directory('child1', parent2)
+ transform.adjust_path('child2', parent2, child1)
+ transform.apply()
+ # limbo/new-1 => parent2, limbo/new-2 => parent2/child1
+ self.assertEqual(2, transform.rename_count)
+
+ def test_finalize_order(self):
+ """Finalize must be done in child-to-parent order"""
+ transform, root = self.get_transform()
+ parent = transform.new_directory('parent', root)
+ child = transform.new_directory('child', parent)
+ try:
+ transform.finalize()
+ except OSError:
+ self.fail('Tried to remove parent before child1')
+
+ def test_cancel_with_cancelled_child_should_succeed(self):
+ transform, root = self.get_transform()
+ parent = transform.new_directory('parent', root)
+ child = transform.new_directory('child', parent)
+ transform.cancel_creation(child)
+ transform.cancel_creation(parent)
+ transform.finalize()
+
+ def test_rollback_on_directory_clash(self):
+ def tt_helper():
+ wt = self.make_branch_and_tree('.')
+ tt = TreeTransform(wt) # TreeTransform obtains write lock
+ try:
+ foo = tt.new_directory('foo', tt.root)
+ tt.new_file('bar', foo, 'foobar')
+ baz = tt.new_directory('baz', tt.root)
+ tt.new_file('qux', baz, 'quux')
+ # Ask for a rename 'foo' -> 'baz'
+ tt.adjust_path('baz', tt.root, foo)
+ # Lie to tt that we've already resolved all conflicts.
+ tt.apply(no_conflicts=True)
+ except:
+ wt.unlock()
+ raise
+ # The rename will fail because the target directory is not empty (but
+ # raises FileExists anyway).
+ err = self.assertRaises(errors.FileExists, tt_helper)
+ self.assertEndsWith(err.path, "/baz")
+
+ def test_two_directories_clash(self):
+ def tt_helper():
+ wt = self.make_branch_and_tree('.')
+ tt = TreeTransform(wt) # TreeTransform obtains write lock
+ try:
+ foo_1 = tt.new_directory('foo', tt.root)
+ tt.new_directory('bar', foo_1)
+ # Adding the same directory with a different content
+ foo_2 = tt.new_directory('foo', tt.root)
+ tt.new_directory('baz', foo_2)
+ # Lie to tt that we've already resolved all conflicts.
+ tt.apply(no_conflicts=True)
+ except:
+ wt.unlock()
+ raise
+ err = self.assertRaises(errors.FileExists, tt_helper)
+ self.assertEndsWith(err.path, "/foo")
+
+ def test_two_directories_clash_finalize(self):
+ def tt_helper():
+ wt = self.make_branch_and_tree('.')
+ tt = TreeTransform(wt) # TreeTransform obtains write lock
+ try:
+ foo_1 = tt.new_directory('foo', tt.root)
+ tt.new_directory('bar', foo_1)
+ # Adding the same directory with a different content
+ foo_2 = tt.new_directory('foo', tt.root)
+ tt.new_directory('baz', foo_2)
+ # Lie to tt that we've already resolved all conflicts.
+ tt.apply(no_conflicts=True)
+ except:
+ tt.finalize()
+ raise
+ err = self.assertRaises(errors.FileExists, tt_helper)
+ self.assertEndsWith(err.path, "/foo")
+
+ def test_file_to_directory(self):
+ wt = self.make_branch_and_tree('.')
+ self.build_tree(['foo'])
+ wt.add(['foo'])
+ wt.commit("one")
+ tt = TreeTransform(wt)
+ self.addCleanup(tt.finalize)
+ foo_trans_id = tt.trans_id_tree_path("foo")
+ tt.delete_contents(foo_trans_id)
+ tt.create_directory(foo_trans_id)
+ bar_trans_id = tt.trans_id_tree_path("foo/bar")
+ tt.create_file(["aa\n"], bar_trans_id)
+ tt.version_file("bar-1", bar_trans_id)
+ tt.apply()
+ self.assertPathExists("foo/bar")
+ wt.lock_read()
+ try:
+ self.assertEqual(wt.kind(wt.path2id("foo")), "directory")
+ finally:
+ wt.unlock()
+ wt.commit("two")
+ changes = wt.changes_from(wt.basis_tree())
+ self.assertFalse(changes.has_changed(), changes)
+
+ def test_file_to_symlink(self):
+ self.requireFeature(SymlinkFeature)
+ wt = self.make_branch_and_tree('.')
+ self.build_tree(['foo'])
+ wt.add(['foo'])
+ wt.commit("one")
+ tt = TreeTransform(wt)
+ self.addCleanup(tt.finalize)
+ foo_trans_id = tt.trans_id_tree_path("foo")
+ tt.delete_contents(foo_trans_id)
+ tt.create_symlink("bar", foo_trans_id)
+ tt.apply()
+ self.assertPathExists("foo")
+ wt.lock_read()
+ self.addCleanup(wt.unlock)
+ self.assertEqual(wt.kind(wt.path2id("foo")), "symlink")
+
+ def test_dir_to_file(self):
+ wt = self.make_branch_and_tree('.')
+ self.build_tree(['foo/', 'foo/bar'])
+ wt.add(['foo', 'foo/bar'])
+ wt.commit("one")
+ tt = TreeTransform(wt)
+ self.addCleanup(tt.finalize)
+ foo_trans_id = tt.trans_id_tree_path("foo")
+ bar_trans_id = tt.trans_id_tree_path("foo/bar")
+ tt.delete_contents(foo_trans_id)
+ tt.delete_versioned(bar_trans_id)
+ tt.create_file(["aa\n"], foo_trans_id)
+ tt.apply()
+ self.assertPathExists("foo")
+ wt.lock_read()
+ self.addCleanup(wt.unlock)
+ self.assertEqual(wt.kind(wt.path2id("foo")), "file")
+
+ def test_dir_to_hardlink(self):
+ self.requireFeature(HardlinkFeature)
+ wt = self.make_branch_and_tree('.')
+ self.build_tree(['foo/', 'foo/bar'])
+ wt.add(['foo', 'foo/bar'])
+ wt.commit("one")
+ tt = TreeTransform(wt)
+ self.addCleanup(tt.finalize)
+ foo_trans_id = tt.trans_id_tree_path("foo")
+ bar_trans_id = tt.trans_id_tree_path("foo/bar")
+ tt.delete_contents(foo_trans_id)
+ tt.delete_versioned(bar_trans_id)
+ self.build_tree(['baz'])
+ tt.create_hardlink("baz", foo_trans_id)
+ tt.apply()
+ self.assertPathExists("foo")
+ self.assertPathExists("baz")
+ wt.lock_read()
+ self.addCleanup(wt.unlock)
+ self.assertEqual(wt.kind(wt.path2id("foo")), "file")
+
+ def test_no_final_path(self):
+ transform, root = self.get_transform()
+ trans_id = transform.trans_id_file_id('foo')
+ transform.create_file('bar', trans_id)
+ transform.cancel_creation(trans_id)
+ transform.apply()
+
+ def test_create_from_tree(self):
+ tree1 = self.make_branch_and_tree('tree1')
+ self.build_tree_contents([('tree1/foo/',), ('tree1/bar', 'baz')])
+ tree1.add(['foo', 'bar'], ['foo-id', 'bar-id'])
+ tree2 = self.make_branch_and_tree('tree2')
+ tt = TreeTransform(tree2)
+ foo_trans_id = tt.create_path('foo', tt.root)
+ create_from_tree(tt, foo_trans_id, tree1, 'foo-id')
+ bar_trans_id = tt.create_path('bar', tt.root)
+ create_from_tree(tt, bar_trans_id, tree1, 'bar-id')
+ tt.apply()
+ self.assertEqual('directory', osutils.file_kind('tree2/foo'))
+ self.assertFileEqual('baz', 'tree2/bar')
+
+ def test_create_from_tree_bytes(self):
+ """Provided lines are used instead of tree content."""
+ tree1 = self.make_branch_and_tree('tree1')
+ self.build_tree_contents([('tree1/foo', 'bar'),])
+ tree1.add('foo', 'foo-id')
+ tree2 = self.make_branch_and_tree('tree2')
+ tt = TreeTransform(tree2)
+ foo_trans_id = tt.create_path('foo', tt.root)
+ create_from_tree(tt, foo_trans_id, tree1, 'foo-id', bytes='qux')
+ tt.apply()
+ self.assertFileEqual('qux', 'tree2/foo')
+
+ def test_create_from_tree_symlink(self):
+ self.requireFeature(SymlinkFeature)
+ tree1 = self.make_branch_and_tree('tree1')
+ os.symlink('bar', 'tree1/foo')
+ tree1.add('foo', 'foo-id')
+ tt = TreeTransform(self.make_branch_and_tree('tree2'))
+ foo_trans_id = tt.create_path('foo', tt.root)
+ create_from_tree(tt, foo_trans_id, tree1, 'foo-id')
+ tt.apply()
+ self.assertEqual('bar', os.readlink('tree2/foo'))
+
+
+class TransformGroup(object):
+
+ def __init__(self, dirname, root_id):
+ self.name = dirname
+ os.mkdir(dirname)
+ self.wt = ControlDir.create_standalone_workingtree(dirname)
+ self.wt.set_root_id(root_id)
+ self.b = self.wt.branch
+ self.tt = TreeTransform(self.wt)
+ self.root = self.tt.trans_id_tree_file_id(self.wt.get_root_id())
+
+
+def conflict_text(tree, merge):
+ template = '%s TREE\n%s%s\n%s%s MERGE-SOURCE\n'
+ return template % ('<' * 7, tree, '=' * 7, merge, '>' * 7)
+
+
+class TestInventoryAltered(tests.TestCaseWithTransport):
+
+ def test_inventory_altered_unchanged(self):
+ tree = self.make_branch_and_tree('tree')
+ self.build_tree(['tree/foo'])
+ tree.add('foo', 'foo-id')
+ with TransformPreview(tree) as tt:
+ self.assertEqual([], tt._inventory_altered())
+
+ def test_inventory_altered_changed_parent_id(self):
+ tree = self.make_branch_and_tree('tree')
+ self.build_tree(['tree/foo'])
+ tree.add('foo', 'foo-id')
+ with TransformPreview(tree) as tt:
+ tt.unversion_file(tt.root)
+ tt.version_file('new-id', tt.root)
+ foo_trans_id = tt.trans_id_tree_file_id('foo-id')
+ foo_tuple = ('foo', foo_trans_id)
+ root_tuple = ('', tt.root)
+ self.assertEqual([root_tuple, foo_tuple], tt._inventory_altered())
+
+ def test_inventory_altered_noop_changed_parent_id(self):
+ tree = self.make_branch_and_tree('tree')
+ self.build_tree(['tree/foo'])
+ tree.add('foo', 'foo-id')
+ with TransformPreview(tree) as tt:
+ tt.unversion_file(tt.root)
+ tt.version_file(tree.get_root_id(), tt.root)
+ foo_trans_id = tt.trans_id_tree_file_id('foo-id')
+ self.assertEqual([], tt._inventory_altered())
+
+
+class TestTransformMerge(TestCaseInTempDir):
+
+ def test_text_merge(self):
+ root_id = generate_ids.gen_root_id()
+ base = TransformGroup("base", root_id)
+ base.tt.new_file('a', base.root, 'a\nb\nc\nd\be\n', 'a')
+ base.tt.new_file('b', base.root, 'b1', 'b')
+ base.tt.new_file('c', base.root, 'c', 'c')
+ base.tt.new_file('d', base.root, 'd', 'd')
+ base.tt.new_file('e', base.root, 'e', 'e')
+ base.tt.new_file('f', base.root, 'f', 'f')
+ base.tt.new_directory('g', base.root, 'g')
+ base.tt.new_directory('h', base.root, 'h')
+ base.tt.apply()
+ other = TransformGroup("other", root_id)
+ other.tt.new_file('a', other.root, 'y\nb\nc\nd\be\n', 'a')
+ other.tt.new_file('b', other.root, 'b2', 'b')
+ other.tt.new_file('c', other.root, 'c2', 'c')
+ other.tt.new_file('d', other.root, 'd', 'd')
+ other.tt.new_file('e', other.root, 'e2', 'e')
+ other.tt.new_file('f', other.root, 'f', 'f')
+ other.tt.new_file('g', other.root, 'g', 'g')
+ other.tt.new_file('h', other.root, 'h\ni\nj\nk\n', 'h')
+ other.tt.new_file('i', other.root, 'h\ni\nj\nk\n', 'i')
+ other.tt.apply()
+ this = TransformGroup("this", root_id)
+ this.tt.new_file('a', this.root, 'a\nb\nc\nd\bz\n', 'a')
+ this.tt.new_file('b', this.root, 'b', 'b')
+ this.tt.new_file('c', this.root, 'c', 'c')
+ this.tt.new_file('d', this.root, 'd2', 'd')
+ this.tt.new_file('e', this.root, 'e2', 'e')
+ this.tt.new_file('f', this.root, 'f', 'f')
+ this.tt.new_file('g', this.root, 'g', 'g')
+ this.tt.new_file('h', this.root, '1\n2\n3\n4\n', 'h')
+ this.tt.new_file('i', this.root, '1\n2\n3\n4\n', 'i')
+ this.tt.apply()
+ Merge3Merger(this.wt, this.wt, base.wt, other.wt)
+
+ # textual merge
+ self.assertEqual(this.wt.get_file('a').read(), 'y\nb\nc\nd\bz\n')
+ # three-way text conflict
+ self.assertEqual(this.wt.get_file('b').read(),
+ conflict_text('b', 'b2'))
+ # OTHER wins
+ self.assertEqual(this.wt.get_file('c').read(), 'c2')
+ # THIS wins
+ self.assertEqual(this.wt.get_file('d').read(), 'd2')
+ # Ambigious clean merge
+ self.assertEqual(this.wt.get_file('e').read(), 'e2')
+ # No change
+ self.assertEqual(this.wt.get_file('f').read(), 'f')
+ # Correct correct results when THIS == OTHER
+ self.assertEqual(this.wt.get_file('g').read(), 'g')
+ # Text conflict when THIS & OTHER are text and BASE is dir
+ self.assertEqual(this.wt.get_file('h').read(),
+ conflict_text('1\n2\n3\n4\n', 'h\ni\nj\nk\n'))
+ self.assertEqual(this.wt.get_file_byname('h.THIS').read(),
+ '1\n2\n3\n4\n')
+ self.assertEqual(this.wt.get_file_byname('h.OTHER').read(),
+ 'h\ni\nj\nk\n')
+ self.assertEqual(file_kind(this.wt.abspath('h.BASE')), 'directory')
+ self.assertEqual(this.wt.get_file('i').read(),
+ conflict_text('1\n2\n3\n4\n', 'h\ni\nj\nk\n'))
+ self.assertEqual(this.wt.get_file_byname('i.THIS').read(),
+ '1\n2\n3\n4\n')
+ self.assertEqual(this.wt.get_file_byname('i.OTHER').read(),
+ 'h\ni\nj\nk\n')
+ self.assertEqual(os.path.exists(this.wt.abspath('i.BASE')), False)
+ modified = ['a', 'b', 'c', 'h', 'i']
+ merge_modified = this.wt.merge_modified()
+ self.assertSubset(merge_modified, modified)
+ self.assertEqual(len(merge_modified), len(modified))
+ with file(this.wt.id2abspath('a'), 'wb') as f: f.write('booga')
+ modified.pop(0)
+ merge_modified = this.wt.merge_modified()
+ self.assertSubset(merge_modified, modified)
+ self.assertEqual(len(merge_modified), len(modified))
+ this.wt.remove('b')
+ this.wt.revert()
+
+ def test_file_merge(self):
+ self.requireFeature(SymlinkFeature)
+ root_id = generate_ids.gen_root_id()
+ base = TransformGroup("BASE", root_id)
+ this = TransformGroup("THIS", root_id)
+ other = TransformGroup("OTHER", root_id)
+ for tg in this, base, other:
+ tg.tt.new_directory('a', tg.root, 'a')
+ tg.tt.new_symlink('b', tg.root, 'b', 'b')
+ tg.tt.new_file('c', tg.root, 'c', 'c')
+ tg.tt.new_symlink('d', tg.root, tg.name, 'd')
+ targets = ((base, 'base-e', 'base-f', None, None),
+ (this, 'other-e', 'this-f', 'other-g', 'this-h'),
+ (other, 'other-e', None, 'other-g', 'other-h'))
+ for tg, e_target, f_target, g_target, h_target in targets:
+ for link, target in (('e', e_target), ('f', f_target),
+ ('g', g_target), ('h', h_target)):
+ if target is not None:
+ tg.tt.new_symlink(link, tg.root, target, link)
+
+ for tg in this, base, other:
+ tg.tt.apply()
+ Merge3Merger(this.wt, this.wt, base.wt, other.wt)
+ self.assertIs(os.path.isdir(this.wt.abspath('a')), True)
+ self.assertIs(os.path.islink(this.wt.abspath('b')), True)
+ self.assertIs(os.path.isfile(this.wt.abspath('c')), True)
+ for suffix in ('THIS', 'BASE', 'OTHER'):
+ self.assertEqual(os.readlink(this.wt.abspath('d.'+suffix)), suffix)
+ self.assertIs(os.path.lexists(this.wt.abspath('d')), False)
+ self.assertEqual(this.wt.id2path('d'), 'd.OTHER')
+ self.assertEqual(this.wt.id2path('f'), 'f.THIS')
+ self.assertEqual(os.readlink(this.wt.abspath('e')), 'other-e')
+ self.assertIs(os.path.lexists(this.wt.abspath('e.THIS')), False)
+ self.assertIs(os.path.lexists(this.wt.abspath('e.OTHER')), False)
+ self.assertIs(os.path.lexists(this.wt.abspath('e.BASE')), False)
+ self.assertIs(os.path.lexists(this.wt.abspath('g')), True)
+ self.assertIs(os.path.lexists(this.wt.abspath('g.BASE')), False)
+ self.assertIs(os.path.lexists(this.wt.abspath('h')), False)
+ self.assertIs(os.path.lexists(this.wt.abspath('h.BASE')), False)
+ self.assertIs(os.path.lexists(this.wt.abspath('h.THIS')), True)
+ self.assertIs(os.path.lexists(this.wt.abspath('h.OTHER')), True)
+
+ def test_filename_merge(self):
+ root_id = generate_ids.gen_root_id()
+ base = TransformGroup("BASE", root_id)
+ this = TransformGroup("THIS", root_id)
+ other = TransformGroup("OTHER", root_id)
+ base_a, this_a, other_a = [t.tt.new_directory('a', t.root, 'a')
+ for t in [base, this, other]]
+ base_b, this_b, other_b = [t.tt.new_directory('b', t.root, 'b')
+ for t in [base, this, other]]
+ base.tt.new_directory('c', base_a, 'c')
+ this.tt.new_directory('c1', this_a, 'c')
+ other.tt.new_directory('c', other_b, 'c')
+
+ base.tt.new_directory('d', base_a, 'd')
+ this.tt.new_directory('d1', this_b, 'd')
+ other.tt.new_directory('d', other_a, 'd')
+
+ base.tt.new_directory('e', base_a, 'e')
+ this.tt.new_directory('e', this_a, 'e')
+ other.tt.new_directory('e1', other_b, 'e')
+
+ base.tt.new_directory('f', base_a, 'f')
+ this.tt.new_directory('f1', this_b, 'f')
+ other.tt.new_directory('f1', other_b, 'f')
+
+ for tg in [this, base, other]:
+ tg.tt.apply()
+ Merge3Merger(this.wt, this.wt, base.wt, other.wt)
+ self.assertEqual(this.wt.id2path('c'), pathjoin('b/c1'))
+ self.assertEqual(this.wt.id2path('d'), pathjoin('b/d1'))
+ self.assertEqual(this.wt.id2path('e'), pathjoin('b/e1'))
+ self.assertEqual(this.wt.id2path('f'), pathjoin('b/f1'))
+
+ def test_filename_merge_conflicts(self):
+ root_id = generate_ids.gen_root_id()
+ base = TransformGroup("BASE", root_id)
+ this = TransformGroup("THIS", root_id)
+ other = TransformGroup("OTHER", root_id)
+ base_a, this_a, other_a = [t.tt.new_directory('a', t.root, 'a')
+ for t in [base, this, other]]
+ base_b, this_b, other_b = [t.tt.new_directory('b', t.root, 'b')
+ for t in [base, this, other]]
+
+ base.tt.new_file('g', base_a, 'g', 'g')
+ other.tt.new_file('g1', other_b, 'g1', 'g')
+
+ base.tt.new_file('h', base_a, 'h', 'h')
+ this.tt.new_file('h1', this_b, 'h1', 'h')
+
+ base.tt.new_file('i', base.root, 'i', 'i')
+ other.tt.new_directory('i1', this_b, 'i')
+
+ for tg in [this, base, other]:
+ tg.tt.apply()
+ Merge3Merger(this.wt, this.wt, base.wt, other.wt)
+
+ self.assertEqual(this.wt.id2path('g'), pathjoin('b/g1.OTHER'))
+ self.assertIs(os.path.lexists(this.wt.abspath('b/g1.BASE')), True)
+ self.assertIs(os.path.lexists(this.wt.abspath('b/g1.THIS')), False)
+ self.assertEqual(this.wt.id2path('h'), pathjoin('b/h1.THIS'))
+ self.assertIs(os.path.lexists(this.wt.abspath('b/h1.BASE')), True)
+ self.assertIs(os.path.lexists(this.wt.abspath('b/h1.OTHER')), False)
+ self.assertEqual(this.wt.id2path('i'), pathjoin('b/i1.OTHER'))
+
+
+class TestBuildTree(tests.TestCaseWithTransport):
+
+ def test_build_tree_with_symlinks(self):
+ self.requireFeature(SymlinkFeature)
+ os.mkdir('a')
+ a = ControlDir.create_standalone_workingtree('a')
+ os.mkdir('a/foo')
+ with file('a/foo/bar', 'wb') as f: f.write('contents')
+ os.symlink('a/foo/bar', 'a/foo/baz')
+ a.add(['foo', 'foo/bar', 'foo/baz'])
+ a.commit('initial commit')
+ b = ControlDir.create_standalone_workingtree('b')
+ basis = a.basis_tree()
+ basis.lock_read()
+ self.addCleanup(basis.unlock)
+ build_tree(basis, b)
+ self.assertIs(os.path.isdir('b/foo'), True)
+ self.assertEqual(file('b/foo/bar', 'rb').read(), "contents")
+ self.assertEqual(os.readlink('b/foo/baz'), 'a/foo/bar')
+
+ def test_build_with_references(self):
+ tree = self.make_branch_and_tree('source',
+ format='development-subtree')
+ subtree = self.make_branch_and_tree('source/subtree',
+ format='development-subtree')
+ tree.add_reference(subtree)
+ tree.commit('a revision')
+ tree.branch.create_checkout('target')
+ self.assertPathExists('target')
+ self.assertPathExists('target/subtree')
+
+ def test_file_conflict_handling(self):
+ """Ensure that when building trees, conflict handling is done"""
+ source = self.make_branch_and_tree('source')
+ target = self.make_branch_and_tree('target')
+ self.build_tree(['source/file', 'target/file'])
+ source.add('file', 'new-file')
+ source.commit('added file')
+ build_tree(source.basis_tree(), target)
+ self.assertEqual([DuplicateEntry('Moved existing file to',
+ 'file.moved', 'file', None, 'new-file')],
+ target.conflicts())
+ target2 = self.make_branch_and_tree('target2')
+ target_file = file('target2/file', 'wb')
+ try:
+ source_file = file('source/file', 'rb')
+ try:
+ target_file.write(source_file.read())
+ finally:
+ source_file.close()
+ finally:
+ target_file.close()
+ build_tree(source.basis_tree(), target2)
+ self.assertEqual([], target2.conflicts())
+
+ def test_symlink_conflict_handling(self):
+ """Ensure that when building trees, conflict handling is done"""
+ self.requireFeature(SymlinkFeature)
+ source = self.make_branch_and_tree('source')
+ os.symlink('foo', 'source/symlink')
+ source.add('symlink', 'new-symlink')
+ source.commit('added file')
+ target = self.make_branch_and_tree('target')
+ os.symlink('bar', 'target/symlink')
+ build_tree(source.basis_tree(), target)
+ self.assertEqual([DuplicateEntry('Moved existing file to',
+ 'symlink.moved', 'symlink', None, 'new-symlink')],
+ target.conflicts())
+ target = self.make_branch_and_tree('target2')
+ os.symlink('foo', 'target2/symlink')
+ build_tree(source.basis_tree(), target)
+ self.assertEqual([], target.conflicts())
+
+ def test_directory_conflict_handling(self):
+ """Ensure that when building trees, conflict handling is done"""
+ source = self.make_branch_and_tree('source')
+ target = self.make_branch_and_tree('target')
+ self.build_tree(['source/dir1/', 'source/dir1/file', 'target/dir1/'])
+ source.add(['dir1', 'dir1/file'], ['new-dir1', 'new-file'])
+ source.commit('added file')
+ build_tree(source.basis_tree(), target)
+ self.assertEqual([], target.conflicts())
+ self.assertPathExists('target/dir1/file')
+
+ # Ensure contents are merged
+ target = self.make_branch_and_tree('target2')
+ self.build_tree(['target2/dir1/', 'target2/dir1/file2'])
+ build_tree(source.basis_tree(), target)
+ self.assertEqual([], target.conflicts())
+ self.assertPathExists('target2/dir1/file2')
+ self.assertPathExists('target2/dir1/file')
+
+ # Ensure new contents are suppressed for existing branches
+ target = self.make_branch_and_tree('target3')
+ self.make_branch('target3/dir1')
+ self.build_tree(['target3/dir1/file2'])
+ build_tree(source.basis_tree(), target)
+ self.assertPathDoesNotExist('target3/dir1/file')
+ self.assertPathExists('target3/dir1/file2')
+ self.assertPathExists('target3/dir1.diverted/file')
+ self.assertEqual([DuplicateEntry('Diverted to',
+ 'dir1.diverted', 'dir1', 'new-dir1', None)],
+ target.conflicts())
+
+ target = self.make_branch_and_tree('target4')
+ self.build_tree(['target4/dir1/'])
+ self.make_branch('target4/dir1/file')
+ build_tree(source.basis_tree(), target)
+ self.assertPathExists('target4/dir1/file')
+ self.assertEqual('directory', file_kind('target4/dir1/file'))
+ self.assertPathExists('target4/dir1/file.diverted')
+ self.assertEqual([DuplicateEntry('Diverted to',
+ 'dir1/file.diverted', 'dir1/file', 'new-file', None)],
+ target.conflicts())
+
+ def test_mixed_conflict_handling(self):
+ """Ensure that when building trees, conflict handling is done"""
+ source = self.make_branch_and_tree('source')
+ target = self.make_branch_and_tree('target')
+ self.build_tree(['source/name', 'target/name/'])
+ source.add('name', 'new-name')
+ source.commit('added file')
+ build_tree(source.basis_tree(), target)
+ self.assertEqual([DuplicateEntry('Moved existing file to',
+ 'name.moved', 'name', None, 'new-name')], target.conflicts())
+
+ def test_raises_in_populated(self):
+ source = self.make_branch_and_tree('source')
+ self.build_tree(['source/name'])
+ source.add('name')
+ source.commit('added name')
+ target = self.make_branch_and_tree('target')
+ self.build_tree(['target/name'])
+ target.add('name')
+ self.assertRaises(errors.WorkingTreeAlreadyPopulated,
+ build_tree, source.basis_tree(), target)
+
+ def test_build_tree_rename_count(self):
+ source = self.make_branch_and_tree('source')
+ self.build_tree(['source/file1', 'source/dir1/'])
+ source.add(['file1', 'dir1'])
+ source.commit('add1')
+ target1 = self.make_branch_and_tree('target1')
+ transform_result = build_tree(source.basis_tree(), target1)
+ self.assertEqual(2, transform_result.rename_count)
+
+ self.build_tree(['source/dir1/file2'])
+ source.add(['dir1/file2'])
+ source.commit('add3')
+ target2 = self.make_branch_and_tree('target2')
+ transform_result = build_tree(source.basis_tree(), target2)
+ # children of non-root directories should not be renamed
+ self.assertEqual(2, transform_result.rename_count)
+
+ def create_ab_tree(self):
+ """Create a committed test tree with two files"""
+ source = self.make_branch_and_tree('source')
+ self.build_tree_contents([('source/file1', 'A')])
+ self.build_tree_contents([('source/file2', 'B')])
+ source.add(['file1', 'file2'], ['file1-id', 'file2-id'])
+ source.commit('commit files')
+ source.lock_write()
+ self.addCleanup(source.unlock)
+ return source
+
+ def test_build_tree_accelerator_tree(self):
+ source = self.create_ab_tree()
+ self.build_tree_contents([('source/file2', 'C')])
+ calls = []
+ real_source_get_file = source.get_file
+ def get_file(file_id, path=None):
+ calls.append(file_id)
+ return real_source_get_file(file_id, path)
+ source.get_file = get_file
+ target = self.make_branch_and_tree('target')
+ revision_tree = source.basis_tree()
+ revision_tree.lock_read()
+ self.addCleanup(revision_tree.unlock)
+ build_tree(revision_tree, target, source)
+ self.assertEqual(['file1-id'], calls)
+ target.lock_read()
+ self.addCleanup(target.unlock)
+ self.assertEqual([], list(target.iter_changes(revision_tree)))
+
+ def test_build_tree_accelerator_tree_observes_sha1(self):
+ source = self.create_ab_tree()
+ sha1 = osutils.sha_string('A')
+ target = self.make_branch_and_tree('target')
+ target.lock_write()
+ self.addCleanup(target.unlock)
+ state = target.current_dirstate()
+ state._cutoff_time = time.time() + 60
+ build_tree(source.basis_tree(), target, source)
+ entry = state._get_entry(0, path_utf8='file1')
+ self.assertEqual(sha1, entry[1][0][1])
+
+ def test_build_tree_accelerator_tree_missing_file(self):
+ source = self.create_ab_tree()
+ os.unlink('source/file1')
+ source.remove(['file2'])
+ target = self.make_branch_and_tree('target')
+ revision_tree = source.basis_tree()
+ revision_tree.lock_read()
+ self.addCleanup(revision_tree.unlock)
+ build_tree(revision_tree, target, source)
+ target.lock_read()
+ self.addCleanup(target.unlock)
+ self.assertEqual([], list(target.iter_changes(revision_tree)))
+
+ def test_build_tree_accelerator_wrong_kind(self):
+ self.requireFeature(SymlinkFeature)
+ source = self.make_branch_and_tree('source')
+ self.build_tree_contents([('source/file1', '')])
+ self.build_tree_contents([('source/file2', '')])
+ source.add(['file1', 'file2'], ['file1-id', 'file2-id'])
+ source.commit('commit files')
+ os.unlink('source/file2')
+ self.build_tree_contents([('source/file2/', 'C')])
+ os.unlink('source/file1')
+ os.symlink('file2', 'source/file1')
+ calls = []
+ real_source_get_file = source.get_file
+ def get_file(file_id, path=None):
+ calls.append(file_id)
+ return real_source_get_file(file_id, path)
+ source.get_file = get_file
+ target = self.make_branch_and_tree('target')
+ revision_tree = source.basis_tree()
+ revision_tree.lock_read()
+ self.addCleanup(revision_tree.unlock)
+ build_tree(revision_tree, target, source)
+ self.assertEqual([], calls)
+ target.lock_read()
+ self.addCleanup(target.unlock)
+ self.assertEqual([], list(target.iter_changes(revision_tree)))
+
+ def test_build_tree_hardlink(self):
+ self.requireFeature(HardlinkFeature)
+ source = self.create_ab_tree()
+ target = self.make_branch_and_tree('target')
+ revision_tree = source.basis_tree()
+ revision_tree.lock_read()
+ self.addCleanup(revision_tree.unlock)
+ build_tree(revision_tree, target, source, hardlink=True)
+ target.lock_read()
+ self.addCleanup(target.unlock)
+ self.assertEqual([], list(target.iter_changes(revision_tree)))
+ source_stat = os.stat('source/file1')
+ target_stat = os.stat('target/file1')
+ self.assertEqual(source_stat, target_stat)
+
+ # Explicitly disallowing hardlinks should prevent them.
+ target2 = self.make_branch_and_tree('target2')
+ build_tree(revision_tree, target2, source, hardlink=False)
+ target2.lock_read()
+ self.addCleanup(target2.unlock)
+ self.assertEqual([], list(target2.iter_changes(revision_tree)))
+ source_stat = os.stat('source/file1')
+ target2_stat = os.stat('target2/file1')
+ self.assertNotEqual(source_stat, target2_stat)
+
+ def test_build_tree_accelerator_tree_moved(self):
+ source = self.make_branch_and_tree('source')
+ self.build_tree_contents([('source/file1', 'A')])
+ source.add(['file1'], ['file1-id'])
+ source.commit('commit files')
+ source.rename_one('file1', 'file2')
+ source.lock_read()
+ self.addCleanup(source.unlock)
+ target = self.make_branch_and_tree('target')
+ revision_tree = source.basis_tree()
+ revision_tree.lock_read()
+ self.addCleanup(revision_tree.unlock)
+ build_tree(revision_tree, target, source)
+ target.lock_read()
+ self.addCleanup(target.unlock)
+ self.assertEqual([], list(target.iter_changes(revision_tree)))
+
+ def test_build_tree_hardlinks_preserve_execute(self):
+ self.requireFeature(HardlinkFeature)
+ source = self.create_ab_tree()
+ tt = TreeTransform(source)
+ trans_id = tt.trans_id_tree_file_id('file1-id')
+ tt.set_executability(True, trans_id)
+ tt.apply()
+ self.assertTrue(source.is_executable('file1-id'))
+ target = self.make_branch_and_tree('target')
+ revision_tree = source.basis_tree()
+ revision_tree.lock_read()
+ self.addCleanup(revision_tree.unlock)
+ build_tree(revision_tree, target, source, hardlink=True)
+ target.lock_read()
+ self.addCleanup(target.unlock)
+ self.assertEqual([], list(target.iter_changes(revision_tree)))
+ self.assertTrue(source.is_executable('file1-id'))
+
+ def install_rot13_content_filter(self, pattern):
+ # We could use
+ # self.addCleanup(filters._reset_registry, filters._reset_registry())
+ # below, but that looks a bit... hard to read even if it's exactly
+ # the same thing.
+ original_registry = filters._reset_registry()
+ def restore_registry():
+ filters._reset_registry(original_registry)
+ self.addCleanup(restore_registry)
+ def rot13(chunks, context=None):
+ return [''.join(chunks).encode('rot13')]
+ rot13filter = filters.ContentFilter(rot13, rot13)
+ filters.filter_stacks_registry.register(
+ 'rot13', {'yes': [rot13filter]}.get)
+ os.mkdir(self.test_home_dir + '/.bazaar')
+ rules_filename = self.test_home_dir + '/.bazaar/rules'
+ f = open(rules_filename, 'wb')
+ f.write('[name %s]\nrot13=yes\n' % (pattern,))
+ f.close()
+ def uninstall_rules():
+ os.remove(rules_filename)
+ rules.reset_rules()
+ self.addCleanup(uninstall_rules)
+ rules.reset_rules()
+
+ def test_build_tree_content_filtered_files_are_not_hardlinked(self):
+ """build_tree will not hardlink files that have content filtering rules
+ applied to them (but will still hardlink other files from the same tree
+ if it can).
+ """
+ self.requireFeature(HardlinkFeature)
+ self.install_rot13_content_filter('file1')
+ source = self.create_ab_tree()
+ target = self.make_branch_and_tree('target')
+ revision_tree = source.basis_tree()
+ revision_tree.lock_read()
+ self.addCleanup(revision_tree.unlock)
+ build_tree(revision_tree, target, source, hardlink=True)
+ target.lock_read()
+ self.addCleanup(target.unlock)
+ self.assertEqual([], list(target.iter_changes(revision_tree)))
+ source_stat = os.stat('source/file1')
+ target_stat = os.stat('target/file1')
+ self.assertNotEqual(source_stat, target_stat)
+ source_stat = os.stat('source/file2')
+ target_stat = os.stat('target/file2')
+ self.assertEqualStat(source_stat, target_stat)
+
+ def test_case_insensitive_build_tree_inventory(self):
+ if (features.CaseInsensitiveFilesystemFeature.available()
+ or features.CaseInsCasePresFilenameFeature.available()):
+ raise tests.UnavailableFeature('Fully case sensitive filesystem')
+ source = self.make_branch_and_tree('source')
+ self.build_tree(['source/file', 'source/FILE'])
+ source.add(['file', 'FILE'], ['lower-id', 'upper-id'])
+ source.commit('added files')
+ # Don't try this at home, kids!
+ # Force the tree to report that it is case insensitive
+ target = self.make_branch_and_tree('target')
+ target.case_sensitive = False
+ build_tree(source.basis_tree(), target, source, delta_from_tree=True)
+ self.assertEqual('file.moved', target.id2path('lower-id'))
+ self.assertEqual('FILE', target.id2path('upper-id'))
+
+ def test_build_tree_observes_sha(self):
+ source = self.make_branch_and_tree('source')
+ self.build_tree(['source/file1', 'source/dir/', 'source/dir/file2'])
+ source.add(['file1', 'dir', 'dir/file2'],
+ ['file1-id', 'dir-id', 'file2-id'])
+ source.commit('new files')
+ target = self.make_branch_and_tree('target')
+ target.lock_write()
+ self.addCleanup(target.unlock)
+ # We make use of the fact that DirState caches its cutoff time. So we
+ # set the 'safe' time to one minute in the future.
+ state = target.current_dirstate()
+ state._cutoff_time = time.time() + 60
+ build_tree(source.basis_tree(), target)
+ entry1_sha = osutils.sha_file_by_name('source/file1')
+ entry2_sha = osutils.sha_file_by_name('source/dir/file2')
+ # entry[1] is the state information, entry[1][0] is the state of the
+ # working tree, entry[1][0][1] is the sha value for the current working
+ # tree
+ entry1 = state._get_entry(0, path_utf8='file1')
+ self.assertEqual(entry1_sha, entry1[1][0][1])
+ # The 'size' field must also be set.
+ self.assertEqual(25, entry1[1][0][2])
+ entry1_state = entry1[1][0]
+ entry2 = state._get_entry(0, path_utf8='dir/file2')
+ self.assertEqual(entry2_sha, entry2[1][0][1])
+ self.assertEqual(29, entry2[1][0][2])
+ entry2_state = entry2[1][0]
+ # Now, make sure that we don't have to re-read the content. The
+ # packed_stat should match exactly.
+ self.assertEqual(entry1_sha, target.get_file_sha1('file1-id', 'file1'))
+ self.assertEqual(entry2_sha,
+ target.get_file_sha1('file2-id', 'dir/file2'))
+ self.assertEqual(entry1_state, entry1[1][0])
+ self.assertEqual(entry2_state, entry2[1][0])
+
+
+class TestCommitTransform(tests.TestCaseWithTransport):
+
+ def get_branch(self):
+ tree = self.make_branch_and_tree('tree')
+ tree.lock_write()
+ self.addCleanup(tree.unlock)
+ tree.commit('empty commit')
+ return tree.branch
+
+ def get_branch_and_transform(self):
+ branch = self.get_branch()
+ tt = TransformPreview(branch.basis_tree())
+ self.addCleanup(tt.finalize)
+ return branch, tt
+
+ def test_commit_wrong_basis(self):
+ branch = self.get_branch()
+ basis = branch.repository.revision_tree(
+ _mod_revision.NULL_REVISION)
+ tt = TransformPreview(basis)
+ self.addCleanup(tt.finalize)
+ e = self.assertRaises(ValueError, tt.commit, branch, '')
+ self.assertEqual('TreeTransform not based on branch basis: null:',
+ str(e))
+
+ def test_empy_commit(self):
+ branch, tt = self.get_branch_and_transform()
+ rev = tt.commit(branch, 'my message')
+ self.assertEqual(2, branch.revno())
+ repo = branch.repository
+ self.assertEqual('my message', repo.get_revision(rev).message)
+
+ def test_merge_parents(self):
+ branch, tt = self.get_branch_and_transform()
+ rev = tt.commit(branch, 'my message', ['rev1b', 'rev1c'])
+ self.assertEqual(['rev1b', 'rev1c'],
+ branch.basis_tree().get_parent_ids()[1:])
+
+ def test_first_commit(self):
+ branch = self.make_branch('branch')
+ branch.lock_write()
+ self.addCleanup(branch.unlock)
+ tt = TransformPreview(branch.basis_tree())
+ self.addCleanup(tt.finalize)
+ tt.new_directory('', ROOT_PARENT, 'TREE_ROOT')
+ rev = tt.commit(branch, 'my message')
+ self.assertEqual([], branch.basis_tree().get_parent_ids())
+ self.assertNotEqual(_mod_revision.NULL_REVISION,
+ branch.last_revision())
+
+ def test_first_commit_with_merge_parents(self):
+ branch = self.make_branch('branch')
+ branch.lock_write()
+ self.addCleanup(branch.unlock)
+ tt = TransformPreview(branch.basis_tree())
+ self.addCleanup(tt.finalize)
+ e = self.assertRaises(ValueError, tt.commit, branch,
+ 'my message', ['rev1b-id'])
+ self.assertEqual('Cannot supply merge parents for first commit.',
+ str(e))
+ self.assertEqual(_mod_revision.NULL_REVISION, branch.last_revision())
+
+ def test_add_files(self):
+ branch, tt = self.get_branch_and_transform()
+ tt.new_file('file', tt.root, 'contents', 'file-id')
+ trans_id = tt.new_directory('dir', tt.root, 'dir-id')
+ if SymlinkFeature.available():
+ tt.new_symlink('symlink', trans_id, 'target', 'symlink-id')
+ rev = tt.commit(branch, 'message')
+ tree = branch.basis_tree()
+ self.assertEqual('file', tree.id2path('file-id'))
+ self.assertEqual('contents', tree.get_file_text('file-id'))
+ self.assertEqual('dir', tree.id2path('dir-id'))
+ if SymlinkFeature.available():
+ self.assertEqual('dir/symlink', tree.id2path('symlink-id'))
+ self.assertEqual('target', tree.get_symlink_target('symlink-id'))
+
+ def test_add_unversioned(self):
+ branch, tt = self.get_branch_and_transform()
+ tt.new_file('file', tt.root, 'contents')
+ self.assertRaises(errors.StrictCommitFailed, tt.commit, branch,
+ 'message', strict=True)
+
+ def test_modify_strict(self):
+ branch, tt = self.get_branch_and_transform()
+ tt.new_file('file', tt.root, 'contents', 'file-id')
+ tt.commit(branch, 'message', strict=True)
+ tt = TransformPreview(branch.basis_tree())
+ self.addCleanup(tt.finalize)
+ trans_id = tt.trans_id_file_id('file-id')
+ tt.delete_contents(trans_id)
+ tt.create_file('contents', trans_id)
+ tt.commit(branch, 'message', strict=True)
+
+ def test_commit_malformed(self):
+ """Committing a malformed transform should raise an exception.
+
+ In this case, we are adding a file without adding its parent.
+ """
+ branch, tt = self.get_branch_and_transform()
+ parent_id = tt.trans_id_file_id('parent-id')
+ tt.new_file('file', parent_id, 'contents', 'file-id')
+ self.assertRaises(errors.MalformedTransform, tt.commit, branch,
+ 'message')
+
+ def test_commit_rich_revision_data(self):
+ branch, tt = self.get_branch_and_transform()
+ rev_id = tt.commit(branch, 'message', timestamp=1, timezone=43201,
+ committer='me <me@example.com>',
+ revprops={'foo': 'bar'}, revision_id='revid-1',
+ authors=['Author1 <author1@example.com>',
+ 'Author2 <author2@example.com>',
+ ])
+ self.assertEqual('revid-1', rev_id)
+ revision = branch.repository.get_revision(rev_id)
+ self.assertEqual(1, revision.timestamp)
+ self.assertEqual(43201, revision.timezone)
+ self.assertEqual('me <me@example.com>', revision.committer)
+ self.assertEqual(['Author1 <author1@example.com>',
+ 'Author2 <author2@example.com>'],
+ revision.get_apparent_authors())
+ del revision.properties['authors']
+ self.assertEqual({'foo': 'bar',
+ 'branch-nick': 'tree'},
+ revision.properties)
+
+ def test_no_explicit_revprops(self):
+ branch, tt = self.get_branch_and_transform()
+ rev_id = tt.commit(branch, 'message', authors=[
+ 'Author1 <author1@example.com>',
+ 'Author2 <author2@example.com>', ])
+ revision = branch.repository.get_revision(rev_id)
+ self.assertEqual(['Author1 <author1@example.com>',
+ 'Author2 <author2@example.com>'],
+ revision.get_apparent_authors())
+ self.assertEqual('tree', revision.properties['branch-nick'])
+
+
+class TestFileMover(tests.TestCaseWithTransport):
+
+ def test_file_mover(self):
+ self.build_tree(['a/', 'a/b', 'c/', 'c/d'])
+ mover = _FileMover()
+ mover.rename('a', 'q')
+ self.assertPathExists('q')
+ self.assertPathDoesNotExist('a')
+ self.assertPathExists('q/b')
+ self.assertPathExists('c')
+ self.assertPathExists('c/d')
+
+ def test_pre_delete_rollback(self):
+ self.build_tree(['a/'])
+ mover = _FileMover()
+ mover.pre_delete('a', 'q')
+ self.assertPathExists('q')
+ self.assertPathDoesNotExist('a')
+ mover.rollback()
+ self.assertPathDoesNotExist('q')
+ self.assertPathExists('a')
+
+ def test_apply_deletions(self):
+ self.build_tree(['a/', 'b/'])
+ mover = _FileMover()
+ mover.pre_delete('a', 'q')
+ mover.pre_delete('b', 'r')
+ self.assertPathExists('q')
+ self.assertPathExists('r')
+ self.assertPathDoesNotExist('a')
+ self.assertPathDoesNotExist('b')
+ mover.apply_deletions()
+ self.assertPathDoesNotExist('q')
+ self.assertPathDoesNotExist('r')
+ self.assertPathDoesNotExist('a')
+ self.assertPathDoesNotExist('b')
+
+ def test_file_mover_rollback(self):
+ self.build_tree(['a/', 'a/b', 'c/', 'c/d/', 'c/e/'])
+ mover = _FileMover()
+ mover.rename('c/d', 'c/f')
+ mover.rename('c/e', 'c/d')
+ try:
+ mover.rename('a', 'c')
+ except errors.FileExists, e:
+ mover.rollback()
+ self.assertPathExists('a')
+ self.assertPathExists('c/d')
+
+
+class Bogus(Exception):
+ pass
+
+
+class TestTransformRollback(tests.TestCaseWithTransport):
+
+ class ExceptionFileMover(_FileMover):
+
+ def __init__(self, bad_source=None, bad_target=None):
+ _FileMover.__init__(self)
+ self.bad_source = bad_source
+ self.bad_target = bad_target
+
+ def rename(self, source, target):
+ if (self.bad_source is not None and
+ source.endswith(self.bad_source)):
+ raise Bogus
+ elif (self.bad_target is not None and
+ target.endswith(self.bad_target)):
+ raise Bogus
+ else:
+ _FileMover.rename(self, source, target)
+
+ def test_rollback_rename(self):
+ tree = self.make_branch_and_tree('.')
+ self.build_tree(['a/', 'a/b'])
+ tt = TreeTransform(tree)
+ self.addCleanup(tt.finalize)
+ a_id = tt.trans_id_tree_path('a')
+ tt.adjust_path('c', tt.root, a_id)
+ tt.adjust_path('d', a_id, tt.trans_id_tree_path('a/b'))
+ self.assertRaises(Bogus, tt.apply,
+ _mover=self.ExceptionFileMover(bad_source='a'))
+ self.assertPathExists('a')
+ self.assertPathExists('a/b')
+ tt.apply()
+ self.assertPathExists('c')
+ self.assertPathExists('c/d')
+
+ def test_rollback_rename_into_place(self):
+ tree = self.make_branch_and_tree('.')
+ self.build_tree(['a/', 'a/b'])
+ tt = TreeTransform(tree)
+ self.addCleanup(tt.finalize)
+ a_id = tt.trans_id_tree_path('a')
+ tt.adjust_path('c', tt.root, a_id)
+ tt.adjust_path('d', a_id, tt.trans_id_tree_path('a/b'))
+ self.assertRaises(Bogus, tt.apply,
+ _mover=self.ExceptionFileMover(bad_target='c/d'))
+ self.assertPathExists('a')
+ self.assertPathExists('a/b')
+ tt.apply()
+ self.assertPathExists('c')
+ self.assertPathExists('c/d')
+
+ def test_rollback_deletion(self):
+ tree = self.make_branch_and_tree('.')
+ self.build_tree(['a/', 'a/b'])
+ tt = TreeTransform(tree)
+ self.addCleanup(tt.finalize)
+ a_id = tt.trans_id_tree_path('a')
+ tt.delete_contents(a_id)
+ tt.adjust_path('d', tt.root, tt.trans_id_tree_path('a/b'))
+ self.assertRaises(Bogus, tt.apply,
+ _mover=self.ExceptionFileMover(bad_target='d'))
+ self.assertPathExists('a')
+ self.assertPathExists('a/b')
+
+
+class TestFinalizeRobustness(tests.TestCaseWithTransport):
+ """Ensure treetransform creation errors can be safely cleaned up after"""
+
+ def _override_globals_in_method(self, instance, method_name, globals):
+ """Replace method on instance with one with updated globals"""
+ import types
+ func = getattr(instance, method_name).im_func
+ new_globals = dict(func.func_globals)
+ new_globals.update(globals)
+ new_func = types.FunctionType(func.func_code, new_globals,
+ func.func_name, func.func_defaults)
+ setattr(instance, method_name,
+ types.MethodType(new_func, instance, instance.__class__))
+ self.addCleanup(delattr, instance, method_name)
+
+ @staticmethod
+ def _fake_open_raises_before(name, mode):
+ """Like open() but raises before doing anything"""
+ raise RuntimeError
+
+ @staticmethod
+ def _fake_open_raises_after(name, mode):
+ """Like open() but raises after creating file without returning"""
+ open(name, mode).close()
+ raise RuntimeError
+
+ def create_transform_and_root_trans_id(self):
+ """Setup a transform creating a file in limbo"""
+ tree = self.make_branch_and_tree('.')
+ tt = TreeTransform(tree)
+ return tt, tt.create_path("a", tt.root)
+
+ def create_transform_and_subdir_trans_id(self):
+ """Setup a transform creating a directory containing a file in limbo"""
+ tree = self.make_branch_and_tree('.')
+ tt = TreeTransform(tree)
+ d_trans_id = tt.create_path("d", tt.root)
+ tt.create_directory(d_trans_id)
+ f_trans_id = tt.create_path("a", d_trans_id)
+ tt.adjust_path("a", d_trans_id, f_trans_id)
+ return tt, f_trans_id
+
+ def test_root_create_file_open_raises_before_creation(self):
+ tt, trans_id = self.create_transform_and_root_trans_id()
+ self._override_globals_in_method(tt, "create_file",
+ {"open": self._fake_open_raises_before})
+ self.assertRaises(RuntimeError, tt.create_file, ["contents"], trans_id)
+ path = tt._limbo_name(trans_id)
+ self.assertPathDoesNotExist(path)
+ tt.finalize()
+ self.assertPathDoesNotExist(tt._limbodir)
+
+ def test_root_create_file_open_raises_after_creation(self):
+ tt, trans_id = self.create_transform_and_root_trans_id()
+ self._override_globals_in_method(tt, "create_file",
+ {"open": self._fake_open_raises_after})
+ self.assertRaises(RuntimeError, tt.create_file, ["contents"], trans_id)
+ path = tt._limbo_name(trans_id)
+ self.assertPathExists(path)
+ tt.finalize()
+ self.assertPathDoesNotExist(path)
+ self.assertPathDoesNotExist(tt._limbodir)
+
+ def test_subdir_create_file_open_raises_before_creation(self):
+ tt, trans_id = self.create_transform_and_subdir_trans_id()
+ self._override_globals_in_method(tt, "create_file",
+ {"open": self._fake_open_raises_before})
+ self.assertRaises(RuntimeError, tt.create_file, ["contents"], trans_id)
+ path = tt._limbo_name(trans_id)
+ self.assertPathDoesNotExist(path)
+ tt.finalize()
+ self.assertPathDoesNotExist(tt._limbodir)
+
+ def test_subdir_create_file_open_raises_after_creation(self):
+ tt, trans_id = self.create_transform_and_subdir_trans_id()
+ self._override_globals_in_method(tt, "create_file",
+ {"open": self._fake_open_raises_after})
+ self.assertRaises(RuntimeError, tt.create_file, ["contents"], trans_id)
+ path = tt._limbo_name(trans_id)
+ self.assertPathExists(path)
+ tt.finalize()
+ self.assertPathDoesNotExist(path)
+ self.assertPathDoesNotExist(tt._limbodir)
+
+ def test_rename_in_limbo_rename_raises_after_rename(self):
+ tt, trans_id = self.create_transform_and_root_trans_id()
+ parent1 = tt.new_directory('parent1', tt.root)
+ child1 = tt.new_file('child1', parent1, 'contents')
+ parent2 = tt.new_directory('parent2', tt.root)
+
+ class FakeOSModule(object):
+ def rename(self, old, new):
+ os.rename(old, new)
+ raise RuntimeError
+ self._override_globals_in_method(tt, "_rename_in_limbo",
+ {"os": FakeOSModule()})
+ self.assertRaises(
+ RuntimeError, tt.adjust_path, "child1", parent2, child1)
+ path = osutils.pathjoin(tt._limbo_name(parent2), "child1")
+ self.assertPathExists(path)
+ tt.finalize()
+ self.assertPathDoesNotExist(path)
+ self.assertPathDoesNotExist(tt._limbodir)
+
+ def test_rename_in_limbo_rename_raises_before_rename(self):
+ tt, trans_id = self.create_transform_and_root_trans_id()
+ parent1 = tt.new_directory('parent1', tt.root)
+ child1 = tt.new_file('child1', parent1, 'contents')
+ parent2 = tt.new_directory('parent2', tt.root)
+
+ class FakeOSModule(object):
+ def rename(self, old, new):
+ raise RuntimeError
+ self._override_globals_in_method(tt, "_rename_in_limbo",
+ {"os": FakeOSModule()})
+ self.assertRaises(
+ RuntimeError, tt.adjust_path, "child1", parent2, child1)
+ path = osutils.pathjoin(tt._limbo_name(parent1), "child1")
+ self.assertPathExists(path)
+ tt.finalize()
+ self.assertPathDoesNotExist(path)
+ self.assertPathDoesNotExist(tt._limbodir)
+
+
+class TestTransformMissingParent(tests.TestCaseWithTransport):
+
+ def make_tt_with_versioned_dir(self):
+ wt = self.make_branch_and_tree('.')
+ self.build_tree(['dir/',])
+ wt.add(['dir'], ['dir-id'])
+ wt.commit('Create dir')
+ tt = TreeTransform(wt)
+ self.addCleanup(tt.finalize)
+ return wt, tt
+
+ def test_resolve_create_parent_for_versioned_file(self):
+ wt, tt = self.make_tt_with_versioned_dir()
+ dir_tid = tt.trans_id_tree_file_id('dir-id')
+ file_tid = tt.new_file('file', dir_tid, 'Contents', file_id='file-id')
+ tt.delete_contents(dir_tid)
+ tt.unversion_file(dir_tid)
+ conflicts = resolve_conflicts(tt)
+ # one conflict for the missing directory, one for the unversioned
+ # parent
+ self.assertLength(2, conflicts)
+
+ def test_non_versioned_file_create_conflict(self):
+ wt, tt = self.make_tt_with_versioned_dir()
+ dir_tid = tt.trans_id_tree_file_id('dir-id')
+ tt.new_file('file', dir_tid, 'Contents')
+ tt.delete_contents(dir_tid)
+ tt.unversion_file(dir_tid)
+ conflicts = resolve_conflicts(tt)
+ # no conflicts or rather: orphaning 'file' resolve the 'dir' conflict
+ self.assertLength(1, conflicts)
+ self.assertEqual(('deleting parent', 'Not deleting', 'new-1'),
+ conflicts.pop())
+
+
+A_ENTRY = ('a-id', ('a', 'a'), True, (True, True),
+ ('TREE_ROOT', 'TREE_ROOT'), ('a', 'a'), ('file', 'file'),
+ (False, False))
+ROOT_ENTRY = ('TREE_ROOT', ('', ''), False, (True, True), (None, None),
+ ('', ''), ('directory', 'directory'), (False, False))
+
+
+class TestTransformPreview(tests.TestCaseWithTransport):
+
+ def create_tree(self):
+ tree = self.make_branch_and_tree('.')
+ self.build_tree_contents([('a', 'content 1')])
+ tree.set_root_id('TREE_ROOT')
+ tree.add('a', 'a-id')
+ tree.commit('rev1', rev_id='rev1')
+ return tree.branch.repository.revision_tree('rev1')
+
+ def get_empty_preview(self):
+ repository = self.make_repository('repo')
+ tree = repository.revision_tree(_mod_revision.NULL_REVISION)
+ preview = TransformPreview(tree)
+ self.addCleanup(preview.finalize)
+ return preview
+
+ def test_transform_preview(self):
+ revision_tree = self.create_tree()
+ preview = TransformPreview(revision_tree)
+ self.addCleanup(preview.finalize)
+
+ def test_transform_preview_tree(self):
+ revision_tree = self.create_tree()
+ preview = TransformPreview(revision_tree)
+ self.addCleanup(preview.finalize)
+ preview.get_preview_tree()
+
+ def test_transform_new_file(self):
+ revision_tree = self.create_tree()
+ preview = TransformPreview(revision_tree)
+ self.addCleanup(preview.finalize)
+ preview.new_file('file2', preview.root, 'content B\n', 'file2-id')
+ preview_tree = preview.get_preview_tree()
+ self.assertEqual(preview_tree.kind('file2-id'), 'file')
+ self.assertEqual(
+ preview_tree.get_file('file2-id').read(), 'content B\n')
+
+ def test_diff_preview_tree(self):
+ revision_tree = self.create_tree()
+ preview = TransformPreview(revision_tree)
+ self.addCleanup(preview.finalize)
+ preview.new_file('file2', preview.root, 'content B\n', 'file2-id')
+ preview_tree = preview.get_preview_tree()
+ out = StringIO()
+ show_diff_trees(revision_tree, preview_tree, out)
+ lines = out.getvalue().splitlines()
+ self.assertEqual(lines[0], "=== added file 'file2'")
+ # 3 lines of diff administrivia
+ self.assertEqual(lines[4], "+content B")
+
+ def test_transform_conflicts(self):
+ revision_tree = self.create_tree()
+ preview = TransformPreview(revision_tree)
+ self.addCleanup(preview.finalize)
+ preview.new_file('a', preview.root, 'content 2')
+ resolve_conflicts(preview)
+ trans_id = preview.trans_id_file_id('a-id')
+ self.assertEqual('a.moved', preview.final_name(trans_id))
+
+ def get_tree_and_preview_tree(self):
+ revision_tree = self.create_tree()
+ preview = TransformPreview(revision_tree)
+ self.addCleanup(preview.finalize)
+ a_trans_id = preview.trans_id_file_id('a-id')
+ preview.delete_contents(a_trans_id)
+ preview.create_file('b content', a_trans_id)
+ preview_tree = preview.get_preview_tree()
+ return revision_tree, preview_tree
+
+ def test_iter_changes(self):
+ revision_tree, preview_tree = self.get_tree_and_preview_tree()
+ root = revision_tree.get_root_id()
+ self.assertEqual([('a-id', ('a', 'a'), True, (True, True),
+ (root, root), ('a', 'a'), ('file', 'file'),
+ (False, False))],
+ list(preview_tree.iter_changes(revision_tree)))
+
+ def test_include_unchanged_succeeds(self):
+ revision_tree, preview_tree = self.get_tree_and_preview_tree()
+ changes = preview_tree.iter_changes(revision_tree,
+ include_unchanged=True)
+ root = revision_tree.get_root_id()
+
+ self.assertEqual([ROOT_ENTRY, A_ENTRY], list(changes))
+
+ def test_specific_files(self):
+ revision_tree, preview_tree = self.get_tree_and_preview_tree()
+ changes = preview_tree.iter_changes(revision_tree,
+ specific_files=[''])
+ self.assertEqual([A_ENTRY], list(changes))
+
+ def test_want_unversioned(self):
+ revision_tree, preview_tree = self.get_tree_and_preview_tree()
+ changes = preview_tree.iter_changes(revision_tree,
+ want_unversioned=True)
+ self.assertEqual([A_ENTRY], list(changes))
+
+ def test_ignore_extra_trees_no_specific_files(self):
+ # extra_trees is harmless without specific_files, so we'll silently
+ # accept it, even though we won't use it.
+ revision_tree, preview_tree = self.get_tree_and_preview_tree()
+ preview_tree.iter_changes(revision_tree, extra_trees=[preview_tree])
+
+ def test_ignore_require_versioned_no_specific_files(self):
+ # require_versioned is meaningless without specific_files.
+ revision_tree, preview_tree = self.get_tree_and_preview_tree()
+ preview_tree.iter_changes(revision_tree, require_versioned=False)
+
+ def test_ignore_pb(self):
+ # pb could be supported, but TT.iter_changes doesn't support it.
+ revision_tree, preview_tree = self.get_tree_and_preview_tree()
+ preview_tree.iter_changes(revision_tree)
+
+ def test_kind(self):
+ revision_tree = self.create_tree()
+ preview = TransformPreview(revision_tree)
+ self.addCleanup(preview.finalize)
+ preview.new_file('file', preview.root, 'contents', 'file-id')
+ preview.new_directory('directory', preview.root, 'dir-id')
+ preview_tree = preview.get_preview_tree()
+ self.assertEqual('file', preview_tree.kind('file-id'))
+ self.assertEqual('directory', preview_tree.kind('dir-id'))
+
+ def test_get_file_mtime(self):
+ preview = self.get_empty_preview()
+ file_trans_id = preview.new_file('file', preview.root, 'contents',
+ 'file-id')
+ limbo_path = preview._limbo_name(file_trans_id)
+ preview_tree = preview.get_preview_tree()
+ self.assertEqual(os.stat(limbo_path).st_mtime,
+ preview_tree.get_file_mtime('file-id'))
+
+ def test_get_file_mtime_renamed(self):
+ work_tree = self.make_branch_and_tree('tree')
+ self.build_tree(['tree/file'])
+ work_tree.add('file', 'file-id')
+ preview = TransformPreview(work_tree)
+ self.addCleanup(preview.finalize)
+ file_trans_id = preview.trans_id_tree_file_id('file-id')
+ preview.adjust_path('renamed', preview.root, file_trans_id)
+ preview_tree = preview.get_preview_tree()
+ preview_mtime = preview_tree.get_file_mtime('file-id', 'renamed')
+ work_mtime = work_tree.get_file_mtime('file-id', 'file')
+
+ def test_get_file_size(self):
+ work_tree = self.make_branch_and_tree('tree')
+ self.build_tree_contents([('tree/old', 'old')])
+ work_tree.add('old', 'old-id')
+ preview = TransformPreview(work_tree)
+ self.addCleanup(preview.finalize)
+ new_id = preview.new_file('name', preview.root, 'contents', 'new-id',
+ 'executable')
+ tree = preview.get_preview_tree()
+ self.assertEqual(len('old'), tree.get_file_size('old-id'))
+ self.assertEqual(len('contents'), tree.get_file_size('new-id'))
+
+ def test_get_file(self):
+ preview = self.get_empty_preview()
+ preview.new_file('file', preview.root, 'contents', 'file-id')
+ preview_tree = preview.get_preview_tree()
+ tree_file = preview_tree.get_file('file-id')
+ try:
+ self.assertEqual('contents', tree_file.read())
+ finally:
+ tree_file.close()
+
+ def test_get_symlink_target(self):
+ self.requireFeature(SymlinkFeature)
+ preview = self.get_empty_preview()
+ preview.new_symlink('symlink', preview.root, 'target', 'symlink-id')
+ preview_tree = preview.get_preview_tree()
+ self.assertEqual('target',
+ preview_tree.get_symlink_target('symlink-id'))
+
+ def test_all_file_ids(self):
+ tree = self.make_branch_and_tree('tree')
+ self.build_tree(['tree/a', 'tree/b', 'tree/c'])
+ tree.add(['a', 'b', 'c'], ['a-id', 'b-id', 'c-id'])
+ preview = TransformPreview(tree)
+ self.addCleanup(preview.finalize)
+ preview.unversion_file(preview.trans_id_file_id('b-id'))
+ c_trans_id = preview.trans_id_file_id('c-id')
+ preview.unversion_file(c_trans_id)
+ preview.version_file('c-id', c_trans_id)
+ preview_tree = preview.get_preview_tree()
+ self.assertEqual(set(['a-id', 'c-id', tree.get_root_id()]),
+ preview_tree.all_file_ids())
+
+ def test_path2id_deleted_unchanged(self):
+ tree = self.make_branch_and_tree('tree')
+ self.build_tree(['tree/unchanged', 'tree/deleted'])
+ tree.add(['unchanged', 'deleted'], ['unchanged-id', 'deleted-id'])
+ preview = TransformPreview(tree)
+ self.addCleanup(preview.finalize)
+ preview.unversion_file(preview.trans_id_file_id('deleted-id'))
+ preview_tree = preview.get_preview_tree()
+ self.assertEqual('unchanged-id', preview_tree.path2id('unchanged'))
+ self.assertIs(None, preview_tree.path2id('deleted'))
+
+ def test_path2id_created(self):
+ tree = self.make_branch_and_tree('tree')
+ self.build_tree(['tree/unchanged'])
+ tree.add(['unchanged'], ['unchanged-id'])
+ preview = TransformPreview(tree)
+ self.addCleanup(preview.finalize)
+ preview.new_file('new', preview.trans_id_file_id('unchanged-id'),
+ 'contents', 'new-id')
+ preview_tree = preview.get_preview_tree()
+ self.assertEqual('new-id', preview_tree.path2id('unchanged/new'))
+
+ def test_path2id_moved(self):
+ tree = self.make_branch_and_tree('tree')
+ self.build_tree(['tree/old_parent/', 'tree/old_parent/child'])
+ tree.add(['old_parent', 'old_parent/child'],
+ ['old_parent-id', 'child-id'])
+ preview = TransformPreview(tree)
+ self.addCleanup(preview.finalize)
+ new_parent = preview.new_directory('new_parent', preview.root,
+ 'new_parent-id')
+ preview.adjust_path('child', new_parent,
+ preview.trans_id_file_id('child-id'))
+ preview_tree = preview.get_preview_tree()
+ self.assertIs(None, preview_tree.path2id('old_parent/child'))
+ self.assertEqual('child-id', preview_tree.path2id('new_parent/child'))
+
+ def test_path2id_renamed_parent(self):
+ tree = self.make_branch_and_tree('tree')
+ self.build_tree(['tree/old_name/', 'tree/old_name/child'])
+ tree.add(['old_name', 'old_name/child'],
+ ['parent-id', 'child-id'])
+ preview = TransformPreview(tree)
+ self.addCleanup(preview.finalize)
+ preview.adjust_path('new_name', preview.root,
+ preview.trans_id_file_id('parent-id'))
+ preview_tree = preview.get_preview_tree()
+ self.assertIs(None, preview_tree.path2id('old_name/child'))
+ self.assertEqual('child-id', preview_tree.path2id('new_name/child'))
+
+ def assertMatchingIterEntries(self, tt, specific_file_ids=None):
+ preview_tree = tt.get_preview_tree()
+ preview_result = list(preview_tree.iter_entries_by_dir(
+ specific_file_ids))
+ tree = tt._tree
+ tt.apply()
+ actual_result = list(tree.iter_entries_by_dir(specific_file_ids))
+ self.assertEqual(actual_result, preview_result)
+
+ def test_iter_entries_by_dir_new(self):
+ tree = self.make_branch_and_tree('tree')
+ tt = TreeTransform(tree)
+ tt.new_file('new', tt.root, 'contents', 'new-id')
+ self.assertMatchingIterEntries(tt)
+
+ def test_iter_entries_by_dir_deleted(self):
+ tree = self.make_branch_and_tree('tree')
+ self.build_tree(['tree/deleted'])
+ tree.add('deleted', 'deleted-id')
+ tt = TreeTransform(tree)
+ tt.delete_contents(tt.trans_id_file_id('deleted-id'))
+ self.assertMatchingIterEntries(tt)
+
+ def test_iter_entries_by_dir_unversioned(self):
+ tree = self.make_branch_and_tree('tree')
+ self.build_tree(['tree/removed'])
+ tree.add('removed', 'removed-id')
+ tt = TreeTransform(tree)
+ tt.unversion_file(tt.trans_id_file_id('removed-id'))
+ self.assertMatchingIterEntries(tt)
+
+ def test_iter_entries_by_dir_moved(self):
+ tree = self.make_branch_and_tree('tree')
+ self.build_tree(['tree/moved', 'tree/new_parent/'])
+ tree.add(['moved', 'new_parent'], ['moved-id', 'new_parent-id'])
+ tt = TreeTransform(tree)
+ tt.adjust_path('moved', tt.trans_id_file_id('new_parent-id'),
+ tt.trans_id_file_id('moved-id'))
+ self.assertMatchingIterEntries(tt)
+
+ def test_iter_entries_by_dir_specific_file_ids(self):
+ tree = self.make_branch_and_tree('tree')
+ tree.set_root_id('tree-root-id')
+ self.build_tree(['tree/parent/', 'tree/parent/child'])
+ tree.add(['parent', 'parent/child'], ['parent-id', 'child-id'])
+ tt = TreeTransform(tree)
+ self.assertMatchingIterEntries(tt, ['tree-root-id', 'child-id'])
+
+ def test_symlink_content_summary(self):
+ self.requireFeature(SymlinkFeature)
+ preview = self.get_empty_preview()
+ preview.new_symlink('path', preview.root, 'target', 'path-id')
+ summary = preview.get_preview_tree().path_content_summary('path')
+ self.assertEqual(('symlink', None, None, 'target'), summary)
+
+ def test_missing_content_summary(self):
+ preview = self.get_empty_preview()
+ summary = preview.get_preview_tree().path_content_summary('path')
+ self.assertEqual(('missing', None, None, None), summary)
+
+ def test_deleted_content_summary(self):
+ tree = self.make_branch_and_tree('tree')
+ self.build_tree(['tree/path/'])
+ tree.add('path')
+ preview = TransformPreview(tree)
+ self.addCleanup(preview.finalize)
+ preview.delete_contents(preview.trans_id_tree_path('path'))
+ summary = preview.get_preview_tree().path_content_summary('path')
+ self.assertEqual(('missing', None, None, None), summary)
+
+ def test_file_content_summary_executable(self):
+ preview = self.get_empty_preview()
+ path_id = preview.new_file('path', preview.root, 'contents', 'path-id')
+ preview.set_executability(True, path_id)
+ summary = preview.get_preview_tree().path_content_summary('path')
+ self.assertEqual(4, len(summary))
+ self.assertEqual('file', summary[0])
+ # size must be known
+ self.assertEqual(len('contents'), summary[1])
+ # executable
+ self.assertEqual(True, summary[2])
+ # will not have hash (not cheap to determine)
+ self.assertIs(None, summary[3])
+
+ def test_change_executability(self):
+ tree = self.make_branch_and_tree('tree')
+ self.build_tree(['tree/path'])
+ tree.add('path')
+ preview = TransformPreview(tree)
+ self.addCleanup(preview.finalize)
+ path_id = preview.trans_id_tree_path('path')
+ preview.set_executability(True, path_id)
+ summary = preview.get_preview_tree().path_content_summary('path')
+ self.assertEqual(True, summary[2])
+
+ def test_file_content_summary_non_exec(self):
+ preview = self.get_empty_preview()
+ preview.new_file('path', preview.root, 'contents', 'path-id')
+ summary = preview.get_preview_tree().path_content_summary('path')
+ self.assertEqual(4, len(summary))
+ self.assertEqual('file', summary[0])
+ # size must be known
+ self.assertEqual(len('contents'), summary[1])
+ # not executable
+ self.assertEqual(False, summary[2])
+ # will not have hash (not cheap to determine)
+ self.assertIs(None, summary[3])
+
+ def test_dir_content_summary(self):
+ preview = self.get_empty_preview()
+ preview.new_directory('path', preview.root, 'path-id')
+ summary = preview.get_preview_tree().path_content_summary('path')
+ self.assertEqual(('directory', None, None, None), summary)
+
+ def test_tree_content_summary(self):
+ preview = self.get_empty_preview()
+ path = preview.new_directory('path', preview.root, 'path-id')
+ preview.set_tree_reference('rev-1', path)
+ summary = preview.get_preview_tree().path_content_summary('path')
+ self.assertEqual(4, len(summary))
+ self.assertEqual('tree-reference', summary[0])
+
+ def test_annotate(self):
+ tree = self.make_branch_and_tree('tree')
+ self.build_tree_contents([('tree/file', 'a\n')])
+ tree.add('file', 'file-id')
+ tree.commit('a', rev_id='one')
+ self.build_tree_contents([('tree/file', 'a\nb\n')])
+ preview = TransformPreview(tree)
+ self.addCleanup(preview.finalize)
+ file_trans_id = preview.trans_id_file_id('file-id')
+ preview.delete_contents(file_trans_id)
+ preview.create_file('a\nb\nc\n', file_trans_id)
+ preview_tree = preview.get_preview_tree()
+ expected = [
+ ('one', 'a\n'),
+ ('me:', 'b\n'),
+ ('me:', 'c\n'),
+ ]
+ annotation = preview_tree.annotate_iter('file-id', 'me:')
+ self.assertEqual(expected, annotation)
+
+ def test_annotate_missing(self):
+ preview = self.get_empty_preview()
+ preview.new_file('file', preview.root, 'a\nb\nc\n', 'file-id')
+ preview_tree = preview.get_preview_tree()
+ expected = [
+ ('me:', 'a\n'),
+ ('me:', 'b\n'),
+ ('me:', 'c\n'),
+ ]
+ annotation = preview_tree.annotate_iter('file-id', 'me:')
+ self.assertEqual(expected, annotation)
+
+ def test_annotate_rename(self):
+ tree = self.make_branch_and_tree('tree')
+ self.build_tree_contents([('tree/file', 'a\n')])
+ tree.add('file', 'file-id')
+ tree.commit('a', rev_id='one')
+ preview = TransformPreview(tree)
+ self.addCleanup(preview.finalize)
+ file_trans_id = preview.trans_id_file_id('file-id')
+ preview.adjust_path('newname', preview.root, file_trans_id)
+ preview_tree = preview.get_preview_tree()
+ expected = [
+ ('one', 'a\n'),
+ ]
+ annotation = preview_tree.annotate_iter('file-id', 'me:')
+ self.assertEqual(expected, annotation)
+
+ def test_annotate_deleted(self):
+ tree = self.make_branch_and_tree('tree')
+ self.build_tree_contents([('tree/file', 'a\n')])
+ tree.add('file', 'file-id')
+ tree.commit('a', rev_id='one')
+ self.build_tree_contents([('tree/file', 'a\nb\n')])
+ preview = TransformPreview(tree)
+ self.addCleanup(preview.finalize)
+ file_trans_id = preview.trans_id_file_id('file-id')
+ preview.delete_contents(file_trans_id)
+ preview_tree = preview.get_preview_tree()
+ annotation = preview_tree.annotate_iter('file-id', 'me:')
+ self.assertIs(None, annotation)
+
+ def test_stored_kind(self):
+ preview = self.get_empty_preview()
+ preview.new_file('file', preview.root, 'a\nb\nc\n', 'file-id')
+ preview_tree = preview.get_preview_tree()
+ self.assertEqual('file', preview_tree.stored_kind('file-id'))
+
+ def test_is_executable(self):
+ preview = self.get_empty_preview()
+ preview.new_file('file', preview.root, 'a\nb\nc\n', 'file-id')
+ preview.set_executability(True, preview.trans_id_file_id('file-id'))
+ preview_tree = preview.get_preview_tree()
+ self.assertEqual(True, preview_tree.is_executable('file-id'))
+
+ def test_get_set_parent_ids(self):
+ revision_tree, preview_tree = self.get_tree_and_preview_tree()
+ self.assertEqual([], preview_tree.get_parent_ids())
+ preview_tree.set_parent_ids(['rev-1'])
+ self.assertEqual(['rev-1'], preview_tree.get_parent_ids())
+
+ def test_plan_file_merge(self):
+ work_a = self.make_branch_and_tree('wta')
+ self.build_tree_contents([('wta/file', 'a\nb\nc\nd\n')])
+ work_a.add('file', 'file-id')
+ base_id = work_a.commit('base version')
+ tree_b = work_a.bzrdir.sprout('wtb').open_workingtree()
+ preview = TransformPreview(work_a)
+ self.addCleanup(preview.finalize)
+ trans_id = preview.trans_id_file_id('file-id')
+ preview.delete_contents(trans_id)
+ preview.create_file('b\nc\nd\ne\n', trans_id)
+ self.build_tree_contents([('wtb/file', 'a\nc\nd\nf\n')])
+ tree_a = preview.get_preview_tree()
+ tree_a.set_parent_ids([base_id])
+ self.assertEqual([
+ ('killed-a', 'a\n'),
+ ('killed-b', 'b\n'),
+ ('unchanged', 'c\n'),
+ ('unchanged', 'd\n'),
+ ('new-a', 'e\n'),
+ ('new-b', 'f\n'),
+ ], list(tree_a.plan_file_merge('file-id', tree_b)))
+
+ def test_plan_file_merge_revision_tree(self):
+ work_a = self.make_branch_and_tree('wta')
+ self.build_tree_contents([('wta/file', 'a\nb\nc\nd\n')])
+ work_a.add('file', 'file-id')
+ base_id = work_a.commit('base version')
+ tree_b = work_a.bzrdir.sprout('wtb').open_workingtree()
+ preview = TransformPreview(work_a.basis_tree())
+ self.addCleanup(preview.finalize)
+ trans_id = preview.trans_id_file_id('file-id')
+ preview.delete_contents(trans_id)
+ preview.create_file('b\nc\nd\ne\n', trans_id)
+ self.build_tree_contents([('wtb/file', 'a\nc\nd\nf\n')])
+ tree_a = preview.get_preview_tree()
+ tree_a.set_parent_ids([base_id])
+ self.assertEqual([
+ ('killed-a', 'a\n'),
+ ('killed-b', 'b\n'),
+ ('unchanged', 'c\n'),
+ ('unchanged', 'd\n'),
+ ('new-a', 'e\n'),
+ ('new-b', 'f\n'),
+ ], list(tree_a.plan_file_merge('file-id', tree_b)))
+
+ def test_walkdirs(self):
+ preview = self.get_empty_preview()
+ root = preview.new_directory('', ROOT_PARENT, 'tree-root')
+ # FIXME: new_directory should mark root.
+ preview.fixup_new_roots()
+ preview_tree = preview.get_preview_tree()
+ file_trans_id = preview.new_file('a', preview.root, 'contents',
+ 'a-id')
+ expected = [(('', 'tree-root'),
+ [('a', 'a', 'file', None, 'a-id', 'file')])]
+ self.assertEqual(expected, list(preview_tree.walkdirs()))
+
+ def test_extras(self):
+ work_tree = self.make_branch_and_tree('tree')
+ self.build_tree(['tree/removed-file', 'tree/existing-file',
+ 'tree/not-removed-file'])
+ work_tree.add(['removed-file', 'not-removed-file'])
+ preview = TransformPreview(work_tree)
+ self.addCleanup(preview.finalize)
+ preview.new_file('new-file', preview.root, 'contents')
+ preview.new_file('new-versioned-file', preview.root, 'contents',
+ 'new-versioned-id')
+ tree = preview.get_preview_tree()
+ preview.unversion_file(preview.trans_id_tree_path('removed-file'))
+ self.assertEqual(set(['new-file', 'removed-file', 'existing-file']),
+ set(tree.extras()))
+
+ def test_merge_into_preview(self):
+ work_tree = self.make_branch_and_tree('tree')
+ self.build_tree_contents([('tree/file','b\n')])
+ work_tree.add('file', 'file-id')
+ work_tree.commit('first commit')
+ child_tree = work_tree.bzrdir.sprout('child').open_workingtree()
+ self.build_tree_contents([('child/file','b\nc\n')])
+ child_tree.commit('child commit')
+ child_tree.lock_write()
+ self.addCleanup(child_tree.unlock)
+ work_tree.lock_write()
+ self.addCleanup(work_tree.unlock)
+ preview = TransformPreview(work_tree)
+ self.addCleanup(preview.finalize)
+ file_trans_id = preview.trans_id_file_id('file-id')
+ preview.delete_contents(file_trans_id)
+ preview.create_file('a\nb\n', file_trans_id)
+ preview_tree = preview.get_preview_tree()
+ merger = Merger.from_revision_ids(None, preview_tree,
+ child_tree.branch.last_revision(),
+ other_branch=child_tree.branch,
+ tree_branch=work_tree.branch)
+ merger.merge_type = Merge3Merger
+ tt = merger.make_merger().make_preview_transform()
+ self.addCleanup(tt.finalize)
+ final_tree = tt.get_preview_tree()
+ self.assertEqual('a\nb\nc\n', final_tree.get_file_text('file-id'))
+
+ def test_merge_preview_into_workingtree(self):
+ tree = self.make_branch_and_tree('tree')
+ tree.set_root_id('TREE_ROOT')
+ tt = TransformPreview(tree)
+ self.addCleanup(tt.finalize)
+ tt.new_file('name', tt.root, 'content', 'file-id')
+ tree2 = self.make_branch_and_tree('tree2')
+ tree2.set_root_id('TREE_ROOT')
+ merger = Merger.from_uncommitted(tree2, tt.get_preview_tree(),
+ None, tree.basis_tree())
+ merger.merge_type = Merge3Merger
+ merger.do_merge()
+
+ def test_merge_preview_into_workingtree_handles_conflicts(self):
+ tree = self.make_branch_and_tree('tree')
+ self.build_tree_contents([('tree/foo', 'bar')])
+ tree.add('foo', 'foo-id')
+ tree.commit('foo')
+ tt = TransformPreview(tree)
+ self.addCleanup(tt.finalize)
+ trans_id = tt.trans_id_file_id('foo-id')
+ tt.delete_contents(trans_id)
+ tt.create_file('baz', trans_id)
+ tree2 = tree.bzrdir.sprout('tree2').open_workingtree()
+ self.build_tree_contents([('tree2/foo', 'qux')])
+ pb = None
+ merger = Merger.from_uncommitted(tree2, tt.get_preview_tree(),
+ pb, tree.basis_tree())
+ merger.merge_type = Merge3Merger
+ merger.do_merge()
+
+ def test_has_filename(self):
+ wt = self.make_branch_and_tree('tree')
+ self.build_tree(['tree/unmodified', 'tree/removed', 'tree/modified'])
+ tt = TransformPreview(wt)
+ removed_id = tt.trans_id_tree_path('removed')
+ tt.delete_contents(removed_id)
+ tt.new_file('new', tt.root, 'contents')
+ modified_id = tt.trans_id_tree_path('modified')
+ tt.delete_contents(modified_id)
+ tt.create_file('modified-contents', modified_id)
+ self.addCleanup(tt.finalize)
+ tree = tt.get_preview_tree()
+ self.assertTrue(tree.has_filename('unmodified'))
+ self.assertFalse(tree.has_filename('not-present'))
+ self.assertFalse(tree.has_filename('removed'))
+ self.assertTrue(tree.has_filename('new'))
+ self.assertTrue(tree.has_filename('modified'))
+
+ def test_is_executable(self):
+ tree = self.make_branch_and_tree('tree')
+ preview = TransformPreview(tree)
+ self.addCleanup(preview.finalize)
+ preview.new_file('foo', preview.root, 'bar', 'baz-id')
+ preview_tree = preview.get_preview_tree()
+ self.assertEqual(False, preview_tree.is_executable('baz-id',
+ 'tree/foo'))
+ self.assertEqual(False, preview_tree.is_executable('baz-id'))
+
+ def test_commit_preview_tree(self):
+ tree = self.make_branch_and_tree('tree')
+ rev_id = tree.commit('rev1')
+ tree.branch.lock_write()
+ self.addCleanup(tree.branch.unlock)
+ tt = TransformPreview(tree)
+ tt.new_file('file', tt.root, 'contents', 'file_id')
+ self.addCleanup(tt.finalize)
+ preview = tt.get_preview_tree()
+ preview.set_parent_ids([rev_id])
+ builder = tree.branch.get_commit_builder([rev_id])
+ list(builder.record_iter_changes(preview, rev_id, tt.iter_changes()))
+ builder.finish_inventory()
+ rev2_id = builder.commit('rev2')
+ rev2_tree = tree.branch.repository.revision_tree(rev2_id)
+ self.assertEqual('contents', rev2_tree.get_file_text('file_id'))
+
+ def test_ascii_limbo_paths(self):
+ self.requireFeature(features.UnicodeFilenameFeature)
+ branch = self.make_branch('any')
+ tree = branch.repository.revision_tree(_mod_revision.NULL_REVISION)
+ tt = TransformPreview(tree)
+ self.addCleanup(tt.finalize)
+ foo_id = tt.new_directory('', ROOT_PARENT)
+ bar_id = tt.new_file(u'\u1234bar', foo_id, 'contents')
+ limbo_path = tt._limbo_name(bar_id)
+ self.assertEqual(limbo_path.encode('ascii', 'replace'), limbo_path)
+
+
+class FakeSerializer(object):
+ """Serializer implementation that simply returns the input.
+
+ The input is returned in the order used by pack.ContainerPushParser.
+ """
+ @staticmethod
+ def bytes_record(bytes, names):
+ return names, bytes
+
+
+class TestSerializeTransform(tests.TestCaseWithTransport):
+
+ _test_needs_features = [features.UnicodeFilenameFeature]
+
+ def get_preview(self, tree=None):
+ if tree is None:
+ tree = self.make_branch_and_tree('tree')
+ tt = TransformPreview(tree)
+ self.addCleanup(tt.finalize)
+ return tt
+
+ def assertSerializesTo(self, expected, tt):
+ records = list(tt.serialize(FakeSerializer()))
+ self.assertEqual(expected, records)
+
+ @staticmethod
+ def default_attribs():
+ return {
+ '_id_number': 1,
+ '_new_name': {},
+ '_new_parent': {},
+ '_new_executability': {},
+ '_new_id': {},
+ '_tree_path_ids': {'': 'new-0'},
+ '_removed_id': [],
+ '_removed_contents': [],
+ '_non_present_ids': {},
+ }
+
+ def make_records(self, attribs, contents):
+ records = [
+ (((('attribs'),),), bencode.bencode(attribs))]
+ records.extend([(((n, k),), c) for n, k, c in contents])
+ return records
+
+ def creation_records(self):
+ attribs = self.default_attribs()
+ attribs['_id_number'] = 3
+ attribs['_new_name'] = {
+ 'new-1': u'foo\u1234'.encode('utf-8'), 'new-2': 'qux'}
+ attribs['_new_id'] = {'new-1': 'baz', 'new-2': 'quxx'}
+ attribs['_new_parent'] = {'new-1': 'new-0', 'new-2': 'new-0'}
+ attribs['_new_executability'] = {'new-1': 1}
+ contents = [
+ ('new-1', 'file', 'i 1\nbar\n'),
+ ('new-2', 'directory', ''),
+ ]
+ return self.make_records(attribs, contents)
+
+ def test_serialize_creation(self):
+ tt = self.get_preview()
+ tt.new_file(u'foo\u1234', tt.root, 'bar', 'baz', True)
+ tt.new_directory('qux', tt.root, 'quxx')
+ self.assertSerializesTo(self.creation_records(), tt)
+
+ def test_deserialize_creation(self):
+ tt = self.get_preview()
+ tt.deserialize(iter(self.creation_records()))
+ self.assertEqual(3, tt._id_number)
+ self.assertEqual({'new-1': u'foo\u1234',
+ 'new-2': 'qux'}, tt._new_name)
+ self.assertEqual({'new-1': 'baz', 'new-2': 'quxx'}, tt._new_id)
+ self.assertEqual({'new-1': tt.root, 'new-2': tt.root}, tt._new_parent)
+ self.assertEqual({'baz': 'new-1', 'quxx': 'new-2'}, tt._r_new_id)
+ self.assertEqual({'new-1': True}, tt._new_executability)
+ self.assertEqual({'new-1': 'file',
+ 'new-2': 'directory'}, tt._new_contents)
+ foo_limbo = open(tt._limbo_name('new-1'), 'rb')
+ try:
+ foo_content = foo_limbo.read()
+ finally:
+ foo_limbo.close()
+ self.assertEqual('bar', foo_content)
+
+ def symlink_creation_records(self):
+ attribs = self.default_attribs()
+ attribs['_id_number'] = 2
+ attribs['_new_name'] = {'new-1': u'foo\u1234'.encode('utf-8')}
+ attribs['_new_parent'] = {'new-1': 'new-0'}
+ contents = [('new-1', 'symlink', u'bar\u1234'.encode('utf-8'))]
+ return self.make_records(attribs, contents)
+
+ def test_serialize_symlink_creation(self):
+ self.requireFeature(features.SymlinkFeature)
+ tt = self.get_preview()
+ tt.new_symlink(u'foo\u1234', tt.root, u'bar\u1234')
+ self.assertSerializesTo(self.symlink_creation_records(), tt)
+
+ def test_deserialize_symlink_creation(self):
+ self.requireFeature(features.SymlinkFeature)
+ tt = self.get_preview()
+ tt.deserialize(iter(self.symlink_creation_records()))
+ abspath = tt._limbo_name('new-1')
+ foo_content = osutils.readlink(abspath)
+ self.assertEqual(u'bar\u1234', foo_content)
+
+ def make_destruction_preview(self):
+ tree = self.make_branch_and_tree('.')
+ self.build_tree([u'foo\u1234', 'bar'])
+ tree.add([u'foo\u1234', 'bar'], ['foo-id', 'bar-id'])
+ return self.get_preview(tree)
+
+ def destruction_records(self):
+ attribs = self.default_attribs()
+ attribs['_id_number'] = 3
+ attribs['_removed_id'] = ['new-1']
+ attribs['_removed_contents'] = ['new-2']
+ attribs['_tree_path_ids'] = {
+ '': 'new-0',
+ u'foo\u1234'.encode('utf-8'): 'new-1',
+ 'bar': 'new-2',
+ }
+ return self.make_records(attribs, [])
+
+ def test_serialize_destruction(self):
+ tt = self.make_destruction_preview()
+ foo_trans_id = tt.trans_id_tree_file_id('foo-id')
+ tt.unversion_file(foo_trans_id)
+ bar_trans_id = tt.trans_id_tree_file_id('bar-id')
+ tt.delete_contents(bar_trans_id)
+ self.assertSerializesTo(self.destruction_records(), tt)
+
+ def test_deserialize_destruction(self):
+ tt = self.make_destruction_preview()
+ tt.deserialize(iter(self.destruction_records()))
+ self.assertEqual({u'foo\u1234': 'new-1',
+ 'bar': 'new-2',
+ '': tt.root}, tt._tree_path_ids)
+ self.assertEqual({'new-1': u'foo\u1234',
+ 'new-2': 'bar',
+ tt.root: ''}, tt._tree_id_paths)
+ self.assertEqual(set(['new-1']), tt._removed_id)
+ self.assertEqual(set(['new-2']), tt._removed_contents)
+
+ def missing_records(self):
+ attribs = self.default_attribs()
+ attribs['_id_number'] = 2
+ attribs['_non_present_ids'] = {
+ 'boo': 'new-1',}
+ return self.make_records(attribs, [])
+
+ def test_serialize_missing(self):
+ tt = self.get_preview()
+ boo_trans_id = tt.trans_id_file_id('boo')
+ self.assertSerializesTo(self.missing_records(), tt)
+
+ def test_deserialize_missing(self):
+ tt = self.get_preview()
+ tt.deserialize(iter(self.missing_records()))
+ self.assertEqual({'boo': 'new-1'}, tt._non_present_ids)
+
+ def make_modification_preview(self):
+ LINES_ONE = 'aa\nbb\ncc\ndd\n'
+ LINES_TWO = 'z\nbb\nx\ndd\n'
+ tree = self.make_branch_and_tree('tree')
+ self.build_tree_contents([('tree/file', LINES_ONE)])
+ tree.add('file', 'file-id')
+ return self.get_preview(tree), LINES_TWO
+
+ def modification_records(self):
+ attribs = self.default_attribs()
+ attribs['_id_number'] = 2
+ attribs['_tree_path_ids'] = {
+ 'file': 'new-1',
+ '': 'new-0',}
+ attribs['_removed_contents'] = ['new-1']
+ contents = [('new-1', 'file',
+ 'i 1\nz\n\nc 0 1 1 1\ni 1\nx\n\nc 0 3 3 1\n')]
+ return self.make_records(attribs, contents)
+
+ def test_serialize_modification(self):
+ tt, LINES = self.make_modification_preview()
+ trans_id = tt.trans_id_file_id('file-id')
+ tt.delete_contents(trans_id)
+ tt.create_file(LINES, trans_id)
+ self.assertSerializesTo(self.modification_records(), tt)
+
+ def test_deserialize_modification(self):
+ tt, LINES = self.make_modification_preview()
+ tt.deserialize(iter(self.modification_records()))
+ self.assertFileEqual(LINES, tt._limbo_name('new-1'))
+
+ def make_kind_change_preview(self):
+ LINES = 'a\nb\nc\nd\n'
+ tree = self.make_branch_and_tree('tree')
+ self.build_tree(['tree/foo/'])
+ tree.add('foo', 'foo-id')
+ return self.get_preview(tree), LINES
+
+ def kind_change_records(self):
+ attribs = self.default_attribs()
+ attribs['_id_number'] = 2
+ attribs['_tree_path_ids'] = {
+ 'foo': 'new-1',
+ '': 'new-0',}
+ attribs['_removed_contents'] = ['new-1']
+ contents = [('new-1', 'file',
+ 'i 4\na\nb\nc\nd\n\n')]
+ return self.make_records(attribs, contents)
+
+ def test_serialize_kind_change(self):
+ tt, LINES = self.make_kind_change_preview()
+ trans_id = tt.trans_id_file_id('foo-id')
+ tt.delete_contents(trans_id)
+ tt.create_file(LINES, trans_id)
+ self.assertSerializesTo(self.kind_change_records(), tt)
+
+ def test_deserialize_kind_change(self):
+ tt, LINES = self.make_kind_change_preview()
+ tt.deserialize(iter(self.kind_change_records()))
+ self.assertFileEqual(LINES, tt._limbo_name('new-1'))
+
+ def make_add_contents_preview(self):
+ LINES = 'a\nb\nc\nd\n'
+ tree = self.make_branch_and_tree('tree')
+ self.build_tree(['tree/foo'])
+ tree.add('foo')
+ os.unlink('tree/foo')
+ return self.get_preview(tree), LINES
+
+ def add_contents_records(self):
+ attribs = self.default_attribs()
+ attribs['_id_number'] = 2
+ attribs['_tree_path_ids'] = {
+ 'foo': 'new-1',
+ '': 'new-0',}
+ contents = [('new-1', 'file',
+ 'i 4\na\nb\nc\nd\n\n')]
+ return self.make_records(attribs, contents)
+
+ def test_serialize_add_contents(self):
+ tt, LINES = self.make_add_contents_preview()
+ trans_id = tt.trans_id_tree_path('foo')
+ tt.create_file(LINES, trans_id)
+ self.assertSerializesTo(self.add_contents_records(), tt)
+
+ def test_deserialize_add_contents(self):
+ tt, LINES = self.make_add_contents_preview()
+ tt.deserialize(iter(self.add_contents_records()))
+ self.assertFileEqual(LINES, tt._limbo_name('new-1'))
+
+ def test_get_parents_lines(self):
+ LINES_ONE = 'aa\nbb\ncc\ndd\n'
+ LINES_TWO = 'z\nbb\nx\ndd\n'
+ tree = self.make_branch_and_tree('tree')
+ self.build_tree_contents([('tree/file', LINES_ONE)])
+ tree.add('file', 'file-id')
+ tt = self.get_preview(tree)
+ trans_id = tt.trans_id_tree_path('file')
+ self.assertEqual((['aa\n', 'bb\n', 'cc\n', 'dd\n'],),
+ tt._get_parents_lines(trans_id))
+
+ def test_get_parents_texts(self):
+ LINES_ONE = 'aa\nbb\ncc\ndd\n'
+ LINES_TWO = 'z\nbb\nx\ndd\n'
+ tree = self.make_branch_and_tree('tree')
+ self.build_tree_contents([('tree/file', LINES_ONE)])
+ tree.add('file', 'file-id')
+ tt = self.get_preview(tree)
+ trans_id = tt.trans_id_tree_path('file')
+ self.assertEqual((LINES_ONE,),
+ tt._get_parents_texts(trans_id))
+
+
+class TestOrphan(tests.TestCaseWithTransport):
+
+ def test_no_orphan_for_transform_preview(self):
+ tree = self.make_branch_and_tree('tree')
+ tt = transform.TransformPreview(tree)
+ self.addCleanup(tt.finalize)
+ self.assertRaises(NotImplementedError, tt.new_orphan, 'foo', 'bar')
+
+ def _set_orphan_policy(self, wt, policy):
+ wt.branch.get_config_stack().set('bzr.transform.orphan_policy',
+ policy)
+
+ def _prepare_orphan(self, wt):
+ self.build_tree(['dir/', 'dir/file', 'dir/foo'])
+ wt.add(['dir', 'dir/file'], ['dir-id', 'file-id'])
+ wt.commit('add dir and file ignoring foo')
+ tt = transform.TreeTransform(wt)
+ self.addCleanup(tt.finalize)
+ # dir and bar are deleted
+ dir_tid = tt.trans_id_tree_path('dir')
+ file_tid = tt.trans_id_tree_path('dir/file')
+ orphan_tid = tt.trans_id_tree_path('dir/foo')
+ tt.delete_contents(file_tid)
+ tt.unversion_file(file_tid)
+ tt.delete_contents(dir_tid)
+ tt.unversion_file(dir_tid)
+ # There should be a conflict because dir still contain foo
+ raw_conflicts = tt.find_conflicts()
+ self.assertLength(1, raw_conflicts)
+ self.assertEqual(('missing parent', 'new-1'), raw_conflicts[0])
+ return tt, orphan_tid
+
+ def test_new_orphan_created(self):
+ wt = self.make_branch_and_tree('.')
+ self._set_orphan_policy(wt, 'move')
+ tt, orphan_tid = self._prepare_orphan(wt)
+ warnings = []
+ def warning(*args):
+ warnings.append(args[0] % args[1:])
+ self.overrideAttr(trace, 'warning', warning)
+ remaining_conflicts = resolve_conflicts(tt)
+ self.assertEquals(['dir/foo has been orphaned in bzr-orphans'],
+ warnings)
+ # Yeah for resolved conflicts !
+ self.assertLength(0, remaining_conflicts)
+ # We have a new orphan
+ self.assertEquals('foo.~1~', tt.final_name(orphan_tid))
+ self.assertEquals('bzr-orphans',
+ tt.final_name(tt.final_parent(orphan_tid)))
+
+ def test_never_orphan(self):
+ wt = self.make_branch_and_tree('.')
+ self._set_orphan_policy(wt, 'conflict')
+ tt, orphan_tid = self._prepare_orphan(wt)
+ remaining_conflicts = resolve_conflicts(tt)
+ self.assertLength(1, remaining_conflicts)
+ self.assertEqual(('deleting parent', 'Not deleting', 'new-1'),
+ remaining_conflicts.pop())
+
+ def test_orphan_error(self):
+ def bogus_orphan(tt, orphan_id, parent_id):
+ raise transform.OrphaningError(tt.final_name(orphan_id),
+ tt.final_name(parent_id))
+ transform.orphaning_registry.register('bogus', bogus_orphan,
+ 'Raise an error when orphaning')
+ wt = self.make_branch_and_tree('.')
+ self._set_orphan_policy(wt, 'bogus')
+ tt, orphan_tid = self._prepare_orphan(wt)
+ remaining_conflicts = resolve_conflicts(tt)
+ self.assertLength(1, remaining_conflicts)
+ self.assertEqual(('deleting parent', 'Not deleting', 'new-1'),
+ remaining_conflicts.pop())
+
+ def test_unknown_orphan_policy(self):
+ wt = self.make_branch_and_tree('.')
+ # Set a fictional policy nobody ever implemented
+ self._set_orphan_policy(wt, 'donttouchmypreciouuus')
+ tt, orphan_tid = self._prepare_orphan(wt)
+ warnings = []
+ def warning(*args):
+ warnings.append(args[0] % args[1:])
+ self.overrideAttr(trace, 'warning', warning)
+ remaining_conflicts = resolve_conflicts(tt)
+ # We fallback to the default policy which create a conflict
+ self.assertLength(1, remaining_conflicts)
+ self.assertEqual(('deleting parent', 'Not deleting', 'new-1'),
+ remaining_conflicts.pop())
+ self.assertLength(1, warnings)
+ self.assertStartsWith(warnings[0], 'Value "donttouchmypreciouuus" ')
+
+
+class TestTransformHooks(tests.TestCaseWithTransport):
+
+ def setUp(self):
+ super(TestTransformHooks, self).setUp()
+ self.wt = self.make_branch_and_tree('.')
+ os.chdir('..')
+
+ def get_transform(self):
+ transform = TreeTransform(self.wt)
+ self.addCleanup(transform.finalize)
+ return transform, transform.root
+
+ def test_pre_commit_hooks(self):
+ calls = []
+ def record_pre_transform(tree, tt):
+ calls.append((tree, tt))
+ MutableTree.hooks.install_named_hook('pre_transform',
+ record_pre_transform, "Pre transform")
+ transform, root = self.get_transform()
+ old_root_id = transform.tree_file_id(root)
+ transform.apply()
+ self.assertEqual(old_root_id, self.wt.get_root_id())
+ self.assertEquals([(self.wt, transform)], calls)
+
+ def test_post_commit_hooks(self):
+ calls = []
+ def record_post_transform(tree, tt):
+ calls.append((tree, tt))
+ MutableTree.hooks.install_named_hook('post_transform',
+ record_post_transform, "Post transform")
+ transform, root = self.get_transform()
+ old_root_id = transform.tree_file_id(root)
+ transform.apply()
+ self.assertEqual(old_root_id, self.wt.get_root_id())
+ self.assertEquals([(self.wt, transform)], calls)