summaryrefslogtreecommitdiff
path: root/bzrlib/tests/test_merge_core.py
diff options
context:
space:
mode:
Diffstat (limited to 'bzrlib/tests/test_merge_core.py')
-rw-r--r--bzrlib/tests/test_merge_core.py829
1 files changed, 829 insertions, 0 deletions
diff --git a/bzrlib/tests/test_merge_core.py b/bzrlib/tests/test_merge_core.py
new file mode 100644
index 0000000..c1fcb28
--- /dev/null
+++ b/bzrlib/tests/test_merge_core.py
@@ -0,0 +1,829 @@
+# Copyright (C) 2005-2011 Canonical Ltd
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+import os
+import sys
+
+import bzrlib
+from bzrlib import (
+ controldir,
+ errors,
+ generate_ids,
+ merge_directive,
+ osutils,
+ )
+from bzrlib.conflicts import (
+ ContentsConflict,
+ TextConflict,
+ PathConflict,
+ )
+from bzrlib.merge import (
+ Merge3Merger,
+ Diff3Merger,
+ WeaveMerger,
+ Merger,
+ )
+from bzrlib.osutils import getcwd, pathjoin
+from bzrlib.transform import TreeTransform
+from bzrlib.tests import TestCaseWithTransport, TestSkipped
+from bzrlib.workingtree import WorkingTree
+
+
+class MergeBuilder(object):
+
+ def __init__(self, dir=None):
+ self.dir = osutils.mkdtemp(prefix="merge-test", dir=dir)
+ self.tree_root = generate_ids.gen_root_id()
+ def wt(name):
+ path = pathjoin(self.dir, name)
+ os.mkdir(path)
+ wt = controldir.ControlDir.create_standalone_workingtree(path)
+ # the tests perform pulls, so need a branch that is writeable.
+ wt.lock_write()
+ wt.set_root_id(self.tree_root)
+ wt.flush()
+ tt = TreeTransform(wt)
+ return wt, tt
+ self.base, self.base_tt = wt('base')
+ self.this, self.this_tt = wt('this')
+ self.other, self.other_tt = wt('other')
+
+ def get_cset_path(self, parent, name):
+ if name is None:
+ if parent is not None:
+ raise AssertionError()
+ return None
+ return pathjoin(self.cset.entries[parent].path, name)
+
+ def add_file(self, id, parent, name, contents, executable, this=True,
+ base=True, other=True):
+ def new_file(tt):
+ parent_id = tt.trans_id_file_id(parent)
+ tt.new_file(name, parent_id, contents, id, executable)
+ for option, tt in self.selected_transforms(this, base, other):
+ if option is True:
+ new_file(tt)
+
+ def merge(self, merge_type=Merge3Merger, interesting_ids=None, **kwargs):
+ merger = self.make_merger(merge_type, interesting_ids, **kwargs)
+ merger.do_merge()
+ return merger.cooked_conflicts
+
+ def make_preview_transform(self):
+ merger = self.make_merger(Merge3Merger, None, this_revision_tree=True)
+ return merger.make_preview_transform()
+
+ def make_merger(self, merge_type, interesting_ids,
+ this_revision_tree=False, **kwargs):
+ self.base_tt.apply()
+ self.base.commit('base commit')
+ for tt, wt in ((self.this_tt, self.this), (self.other_tt, self.other)):
+ # why does this not do wt.pull() ?
+ wt.branch.pull(self.base.branch)
+ wt.set_parent_ids([wt.branch.last_revision()])
+ wt.flush()
+ # We maintain a write lock, so make sure changes are flushed to
+ # disk first
+ tt.apply()
+ wt.commit('branch commit')
+ wt.flush()
+ if wt.branch.last_revision_info()[0] != 2:
+ raise AssertionError()
+ self.this.branch.fetch(self.other.branch)
+ other_basis = self.other.branch.basis_tree()
+ if this_revision_tree:
+ self.this.commit('message')
+ this_tree = self.this.basis_tree()
+ else:
+ this_tree = self.this
+ merger = merge_type(this_tree, self.this, self.base, other_basis,
+ interesting_ids=interesting_ids, do_merge=False,
+ this_branch=self.this.branch, **kwargs)
+ return merger
+
+ def list_transforms(self):
+ return [self.this_tt, self.base_tt, self.other_tt]
+
+ def selected_transforms(self, this, base, other):
+ pairs = [(this, self.this_tt), (base, self.base_tt),
+ (other, self.other_tt)]
+ return [(v, tt) for (v, tt) in pairs if v is not None]
+
+ def add_symlink(self, id, parent, name, contents):
+ for tt in self.list_transforms():
+ parent_id = tt.trans_id_file_id(parent)
+ tt.new_symlink(name, parent_id, contents, id)
+
+ def remove_file(self, file_id, base=False, this=False, other=False):
+ for option, tt in self.selected_transforms(this, base, other):
+ if option is True:
+ trans_id = tt.trans_id_file_id(file_id)
+ tt.cancel_creation(trans_id)
+ tt.cancel_versioning(trans_id)
+ tt.set_executability(None, trans_id)
+
+ def add_dir(self, file_id, parent, name, this=True, base=True, other=True):
+ for option, tt in self.selected_transforms(this, base, other):
+ if option is True:
+ parent_id = tt.trans_id_file_id(parent)
+ tt.new_directory(name, parent_id, file_id)
+
+ def change_name(self, id, base=None, this=None, other=None):
+ for val, tt in ((base, self.base_tt), (this, self.this_tt),
+ (other, self.other_tt)):
+ if val is None:
+ continue
+ trans_id = tt.trans_id_file_id(id)
+ parent_id = tt.final_parent(trans_id)
+ tt.adjust_path(val, parent_id, trans_id)
+
+ def change_parent(self, file_id, base=None, this=None, other=None):
+ for parent, tt in self.selected_transforms(this, base, other):
+ trans_id = tt.trans_id_file_id(file_id)
+ parent_id = tt.trans_id_file_id(parent)
+ tt.adjust_path(tt.final_name(trans_id), parent_id, trans_id)
+
+ def change_contents(self, file_id, base=None, this=None, other=None):
+ for contents, tt in self.selected_transforms(this, base, other):
+ trans_id = tt.trans_id_file_id(file_id)
+ tt.cancel_creation(trans_id)
+ tt.create_file(contents, trans_id)
+
+ def change_target(self, id, base=None, this=None, other=None):
+ for target, tt in self.selected_transforms(this, base, other):
+ trans_id = tt.trans_id_file_id(id)
+ tt.cancel_creation(trans_id)
+ tt.create_symlink(target, trans_id)
+
+ def change_perms(self, id, base=None, this=None, other=None):
+ for executability, tt in self.selected_transforms(this, base, other):
+ trans_id = tt.trans_id_file_id(id)
+ tt.set_executability(None, trans_id)
+ tt.set_executability(executability, trans_id)
+
+ def change_perms_tree(self, id, tree, mode):
+ os.chmod(tree.full_path(id), mode)
+
+ def apply_inv_change(self, inventory_change, orig_inventory):
+ orig_inventory_by_path = {}
+ for file_id, path in orig_inventory.iteritems():
+ orig_inventory_by_path[path] = file_id
+
+ def parent_id(file_id):
+ try:
+ parent_dir = os.path.dirname(orig_inventory[file_id])
+ except:
+ print file_id
+ raise
+ if parent_dir == "":
+ return None
+ return orig_inventory_by_path[parent_dir]
+
+ def new_path(file_id):
+ if fild_id in inventory_change:
+ return inventory_change[file_id]
+ else:
+ parent = parent_id(file_id)
+ if parent is None:
+ return orig_inventory[file_id]
+ dirname = new_path(parent)
+ return pathjoin(dirname, os.path.basename(orig_inventory[file_id]))
+
+ new_inventory = {}
+ for file_id in orig_inventory.iterkeys():
+ path = new_path(file_id)
+ if path is None:
+ continue
+ new_inventory[file_id] = path
+
+ for file_id, path in inventory_change.iteritems():
+ if file_id in orig_inventory:
+ continue
+ new_inventory[file_id] = path
+ return new_inventory
+
+ def unlock(self):
+ self.base.unlock()
+ self.this.unlock()
+ self.other.unlock()
+
+ def cleanup(self):
+ self.unlock()
+ osutils.rmtree(self.dir)
+
+
+class MergeTest(TestCaseWithTransport):
+
+ def test_change_name(self):
+ """Test renames"""
+ builder = MergeBuilder(getcwd())
+ builder.add_file("1", builder.tree_root, "name1", "hello1", True)
+ builder.change_name("1", other="name2")
+ builder.add_file("2", builder.tree_root, "name3", "hello2", True)
+ builder.change_name("2", base="name4")
+ builder.add_file("3", builder.tree_root, "name5", "hello3", True)
+ builder.change_name("3", this="name6")
+ builder.merge()
+ builder.cleanup()
+ builder = MergeBuilder(getcwd())
+ builder.add_file("1", builder.tree_root, "name1", "hello1", False)
+ builder.change_name("1", other="name2", this="name3")
+ conflicts = builder.merge()
+ self.assertEqual(conflicts, [PathConflict('name3', 'name2', '1')])
+ builder.cleanup()
+
+ def test_merge_one(self):
+ builder = MergeBuilder(getcwd())
+ builder.add_file("1", builder.tree_root, "name1", "hello1", True)
+ builder.change_contents("1", other="text4")
+ builder.add_file("2", builder.tree_root, "name2", "hello1", True)
+ builder.change_contents("2", other="text4")
+ builder.merge(interesting_ids=["1"])
+ self.assertEqual(builder.this.get_file("1").read(), "text4" )
+ self.assertEqual(builder.this.get_file("2").read(), "hello1" )
+ builder.cleanup()
+
+ def test_file_moves(self):
+ """Test moves"""
+ builder = MergeBuilder(getcwd())
+ builder.add_dir("1", builder.tree_root, "dir1")
+ builder.add_dir("2", builder.tree_root, "dir2")
+ builder.add_file("3", "1", "file1", "hello1", True)
+ builder.add_file("4", "1", "file2", "hello2", True)
+ builder.add_file("5", "1", "file3", "hello3", True)
+ builder.change_parent("3", other="2")
+ builder.change_parent("4", this="2")
+ builder.change_parent("5", base="2")
+ builder.merge()
+ builder.cleanup()
+
+ builder = MergeBuilder(getcwd())
+ builder.add_dir("1", builder.tree_root, "dir1")
+ builder.add_dir("2", builder.tree_root, "dir2")
+ builder.add_dir("3", builder.tree_root, "dir3")
+ builder.add_file("4", "1", "file1", "hello1", False)
+ builder.change_parent("4", other="2", this="3")
+ conflicts = builder.merge()
+ path2 = pathjoin('dir2', 'file1')
+ path3 = pathjoin('dir3', 'file1')
+ self.assertEqual(conflicts, [PathConflict(path3, path2, '4')])
+ builder.cleanup()
+
+ def test_contents_merge(self):
+ """Test merge3 merging"""
+ self.do_contents_test(Merge3Merger)
+
+ def test_contents_merge2(self):
+ """Test diff3 merging"""
+ if sys.platform == 'win32':
+ raise TestSkipped("diff3 does not have --binary flag"
+ " and therefore always fails on win32")
+ try:
+ self.do_contents_test(Diff3Merger)
+ except errors.NoDiff3:
+ raise TestSkipped("diff3 not available")
+
+ def test_contents_merge3(self):
+ """Test diff3 merging"""
+ self.do_contents_test(WeaveMerger)
+
+ def test_reprocess_weave(self):
+ # Reprocess works on weaves, and behaves as expected
+ builder = MergeBuilder(getcwd())
+ builder.add_file('a', builder.tree_root, 'blah', 'a', False)
+ builder.change_contents('a', this='b\nc\nd\ne\n', other='z\nc\nd\ny\n')
+ builder.merge(WeaveMerger, reprocess=True)
+ expected = """<<<<<<< TREE
+b
+=======
+z
+>>>>>>> MERGE-SOURCE
+c
+d
+<<<<<<< TREE
+e
+=======
+y
+>>>>>>> MERGE-SOURCE
+"""
+ self.assertEqualDiff(builder.this.get_file("a").read(), expected)
+ builder.cleanup()
+
+ def do_contents_test(self, merge_factory):
+ """Test merging with specified ContentsChange factory"""
+ builder = self.contents_test_success(merge_factory)
+ builder.cleanup()
+ self.contents_test_conflicts(merge_factory)
+
+ def contents_test_success(self, merge_factory):
+ builder = MergeBuilder(getcwd())
+ builder.add_file("1", builder.tree_root, "name1", "text1", True)
+ builder.change_contents("1", other="text4")
+ builder.add_file("2", builder.tree_root, "name3", "text2", False)
+ builder.change_contents("2", base="text5")
+ builder.add_file("3", builder.tree_root, "name5", "text3", True)
+ builder.add_file("4", builder.tree_root, "name6", "text4", True)
+ builder.remove_file("4", base=True)
+ builder.add_file("5", builder.tree_root, "name7", "a\nb\nc\nd\ne\nf\n",
+ True)
+ builder.change_contents("5", other="a\nz\nc\nd\ne\nf\n",
+ this="a\nb\nc\nd\ne\nz\n")
+ conflicts = builder.merge(merge_factory)
+ try:
+ self.assertEqual([], conflicts)
+ self.assertEqual("text4", builder.this.get_file("1").read())
+ self.assertEqual("text2", builder.this.get_file("2").read())
+ self.assertEqual("a\nz\nc\nd\ne\nz\n",
+ builder.this.get_file("5").read())
+ self.assertTrue(builder.this.is_executable("1"))
+ self.assertFalse(builder.this.is_executable("2"))
+ self.assertTrue(builder.this.is_executable("3"))
+ except:
+ builder.unlock()
+ raise
+ return builder
+
+ def contents_test_conflicts(self, merge_factory):
+ builder = MergeBuilder(getcwd())
+ builder.add_file("1", builder.tree_root, "name1", "text1", True)
+ builder.change_contents("1", other="text4", this="text3")
+ builder.add_file("2", builder.tree_root, "name2", "text1", True)
+ builder.change_contents("2", other="\x00", this="text3")
+ builder.add_file("3", builder.tree_root, "name3", "text5", False)
+ builder.change_perms("3", this=True)
+ builder.change_contents('3', this='moretext')
+ builder.remove_file('3', other=True)
+ conflicts = builder.merge(merge_factory)
+ self.assertEqual(conflicts, [TextConflict('name1', file_id='1'),
+ ContentsConflict('name2', file_id='2'),
+ ContentsConflict('name3', file_id='3')])
+ self.assertEqual(builder.this.get_file('2').read(), '\x00')
+ builder.cleanup()
+
+ def test_symlink_conflicts(self):
+ if sys.platform != "win32":
+ builder = MergeBuilder(getcwd())
+ builder.add_symlink("2", builder.tree_root, "name2", "target1")
+ builder.change_target("2", other="target4", base="text3")
+ conflicts = builder.merge()
+ self.assertEqual(conflicts, [ContentsConflict('name2',
+ file_id='2')])
+ builder.cleanup()
+
+ def test_symlink_merge(self):
+ if sys.platform != "win32":
+ builder = MergeBuilder(getcwd())
+ builder.add_symlink("1", builder.tree_root, "name1", "target1")
+ builder.add_symlink("2", builder.tree_root, "name2", "target1")
+ builder.add_symlink("3", builder.tree_root, "name3", "target1")
+ builder.change_target("1", this="target2")
+ builder.change_target("2", base="target2")
+ builder.change_target("3", other="target2")
+ builder.merge()
+ self.assertEqual(builder.this.get_symlink_target("1"), "target2")
+ self.assertEqual(builder.this.get_symlink_target("2"), "target1")
+ self.assertEqual(builder.this.get_symlink_target("3"), "target2")
+ builder.cleanup()
+
+ def test_no_passive_add(self):
+ builder = MergeBuilder(getcwd())
+ builder.add_file("1", builder.tree_root, "name1", "text1", True)
+ builder.remove_file("1", this=True)
+ builder.merge()
+ builder.cleanup()
+
+ def test_perms_merge(self):
+ builder = MergeBuilder(getcwd())
+ builder.add_file("1", builder.tree_root, "name1", "text1", True)
+ builder.change_perms("1", other=False)
+ builder.add_file("2", builder.tree_root, "name2", "text2", True)
+ builder.change_perms("2", base=False)
+ builder.add_file("3", builder.tree_root, "name3", "text3", True)
+ builder.change_perms("3", this=False)
+ builder.add_file('4', builder.tree_root, 'name4', 'text4', False)
+ builder.change_perms('4', this=True)
+ builder.remove_file('4', base=True)
+ builder.merge()
+ self.assertIs(builder.this.is_executable("1"), False)
+ self.assertIs(builder.this.is_executable("2"), True)
+ self.assertIs(builder.this.is_executable("3"), False)
+ builder.cleanup();
+
+ def test_new_suffix(self):
+ builder = MergeBuilder(getcwd())
+ builder.add_file("1", builder.tree_root, "name1", "text1", True)
+ builder.change_contents("1", other="text3")
+ builder.add_file("2", builder.tree_root, "name1.new", "text2", True)
+ builder.merge()
+ os.lstat(builder.this.id2abspath("2"))
+ builder.cleanup()
+
+ def test_spurious_conflict(self):
+ builder = MergeBuilder(getcwd())
+ builder.add_file("1", builder.tree_root, "name1", "text1", False)
+ builder.remove_file("1", other=True)
+ builder.add_file("2", builder.tree_root, "name1", "text1", False,
+ this=False, base=False)
+ conflicts = builder.merge()
+ self.assertEqual(conflicts, [])
+ builder.cleanup()
+
+ def test_merge_one_renamed(self):
+ builder = MergeBuilder(getcwd())
+ builder.add_file('1', builder.tree_root, 'name1', 'text1a', False)
+ builder.change_name('1', this='name2')
+ builder.change_contents('1', other='text2')
+ builder.merge(interesting_files=['name2'])
+ self.assertEqual('text2', builder.this.get_file('1').read())
+ builder.cleanup()
+
+
+class FunctionalMergeTest(TestCaseWithTransport):
+
+ def test_trivial_star_merge(self):
+ """Test that merges in a star shape Just Work."""
+ # John starts a branch
+ self.build_tree(("original/", "original/file1", "original/file2"))
+ tree = self.make_branch_and_tree('original')
+ branch = tree.branch
+ tree.smart_add(["original"])
+ tree.commit("start branch.", verbose=False)
+ # Mary branches it.
+ self.build_tree(("mary/",))
+ branch.bzrdir.clone("mary")
+ # Now John commits a change
+ file = open("original/file1", "wt")
+ file.write("John\n")
+ file.close()
+ tree.commit("change file1")
+ # Mary does too
+ mary_tree = WorkingTree.open('mary')
+ mary_branch = mary_tree.branch
+ file = open("mary/file2", "wt")
+ file.write("Mary\n")
+ file.close()
+ mary_tree.commit("change file2")
+ # john should be able to merge with no conflicts.
+ base = [None, None]
+ other = ("mary", -1)
+ tree.merge_from_branch(mary_tree.branch)
+ self.assertEqual("John\n", open("original/file1", "rt").read())
+ self.assertEqual("Mary\n", open("original/file2", "rt").read())
+
+ def test_conflicts(self):
+ wta = self.make_branch_and_tree('a')
+ self.build_tree_contents([('a/file', 'contents\n')])
+ wta.add('file')
+ wta.commit('base revision', allow_pointless=False)
+ d_b = wta.branch.bzrdir.clone('b')
+ self.build_tree_contents([('a/file', 'other contents\n')])
+ wta.commit('other revision', allow_pointless=False)
+ self.build_tree_contents([('b/file', 'this contents contents\n')])
+ wtb = d_b.open_workingtree()
+ wtb.commit('this revision', allow_pointless=False)
+ self.assertEqual(1, wtb.merge_from_branch(wta.branch))
+ self.assertPathExists('b/file.THIS')
+ self.assertPathExists('b/file.BASE')
+ self.assertPathExists('b/file.OTHER')
+ wtb.revert()
+ self.assertEqual(1, wtb.merge_from_branch(wta.branch,
+ merge_type=WeaveMerger))
+ self.assertPathExists('b/file')
+ self.assertPathExists('b/file.THIS')
+ self.assertPathExists('b/file.BASE')
+ self.assertPathExists('b/file.OTHER')
+
+ def test_weave_conflicts_not_in_base(self):
+ builder = self.make_branch_builder('source')
+ builder.start_series()
+ # See bug #494197
+ # A base revision (before criss-cross)
+ # |\
+ # B C B does nothing, C adds 'foo'
+ # |X|
+ # D E D and E modify foo in incompatible ways
+ #
+ # Merging will conflict, with C as a clean base text. However, the
+ # current code uses A as the global base and 'foo' doesn't exist there.
+ # It isn't trivial to create foo.BASE because it tries to look up
+ # attributes like 'executable' in A.
+ builder.build_snapshot('A-id', None, [
+ ('add', ('', 'TREE_ROOT', 'directory', None))])
+ builder.build_snapshot('B-id', ['A-id'], [])
+ builder.build_snapshot('C-id', ['A-id'], [
+ ('add', ('foo', 'foo-id', 'file', 'orig\ncontents\n'))])
+ builder.build_snapshot('D-id', ['B-id', 'C-id'], [
+ ('add', ('foo', 'foo-id', 'file', 'orig\ncontents\nand D\n'))])
+ builder.build_snapshot('E-id', ['C-id', 'B-id'], [
+ ('modify', ('foo-id', 'orig\ncontents\nand E\n'))])
+ builder.finish_series()
+ tree = builder.get_branch().create_checkout('tree', lightweight=True)
+ self.assertEqual(1, tree.merge_from_branch(tree.branch,
+ to_revision='D-id',
+ merge_type=WeaveMerger))
+ self.assertPathExists('tree/foo.THIS')
+ self.assertPathExists('tree/foo.OTHER')
+ self.expectFailure('fail to create .BASE in some criss-cross merges',
+ self.assertPathExists, 'tree/foo.BASE')
+ self.assertPathExists('tree/foo.BASE')
+
+ def test_merge_unrelated(self):
+ """Sucessfully merges unrelated branches with no common names"""
+ wta = self.make_branch_and_tree('a')
+ a = wta.branch
+ with file('a/a_file', 'wb') as f: f.write('contents\n')
+ wta.add('a_file')
+ wta.commit('a_revision', allow_pointless=False)
+ wtb = self.make_branch_and_tree('b')
+ b = wtb.branch
+ with file('b/b_file', 'wb') as f: f.write('contents\n')
+ wtb.add('b_file')
+ b_rev = wtb.commit('b_revision', allow_pointless=False)
+ wta.merge_from_branch(wtb.branch, b_rev, 'null:')
+ self.assert_(os.path.lexists('a/b_file'))
+ self.assertEqual([b_rev], wta.get_parent_ids()[1:])
+
+ def test_merge_unrelated_conflicting(self):
+ """Sucessfully merges unrelated branches with common names"""
+ wta = self.make_branch_and_tree('a')
+ a = wta.branch
+ with file('a/file', 'wb') as f: f.write('contents\n')
+ wta.add('file')
+ wta.commit('a_revision', allow_pointless=False)
+ wtb = self.make_branch_and_tree('b')
+ b = wtb.branch
+ with file('b/file', 'wb') as f: f.write('contents\n')
+ wtb.add('file')
+ b_rev = wtb.commit('b_revision', allow_pointless=False)
+ wta.merge_from_branch(wtb.branch, b_rev, 'null:')
+ self.assert_(os.path.lexists('a/file'))
+ self.assert_(os.path.lexists('a/file.moved'))
+ self.assertEqual([b_rev], wta.get_parent_ids()[1:])
+
+ def test_merge_deleted_conflicts(self):
+ wta = self.make_branch_and_tree('a')
+ with file('a/file', 'wb') as f: f.write('contents\n')
+ wta.add('file')
+ wta.commit('a_revision', allow_pointless=False)
+ self.run_bzr('branch a b')
+ os.remove('a/file')
+ wta.commit('removed file', allow_pointless=False)
+ with file('b/file', 'wb') as f: f.write('changed contents\n')
+ wtb = WorkingTree.open('b')
+ wtb.commit('changed file', allow_pointless=False)
+ wtb.merge_from_branch(wta.branch, wta.branch.last_revision(),
+ wta.branch.get_rev_id(1))
+ self.assertFalse(os.path.lexists('b/file'))
+
+ def test_merge_metadata_vs_deletion(self):
+ """Conflict deletion vs metadata change"""
+ a_wt = self.make_branch_and_tree('a')
+ with file('a/file', 'wb') as f: f.write('contents\n')
+ a_wt.add('file')
+ a_wt.commit('r0')
+ self.run_bzr('branch a b')
+ b_wt = WorkingTree.open('b')
+ os.chmod('b/file', 0755)
+ os.remove('a/file')
+ a_wt.commit('removed a')
+ self.assertEqual(a_wt.branch.revno(), 2)
+ self.assertFalse(os.path.exists('a/file'))
+ b_wt.commit('exec a')
+ a_wt.merge_from_branch(b_wt.branch, b_wt.last_revision(), 'null:')
+ self.assert_(os.path.exists('a/file'))
+
+ def test_merge_swapping_renames(self):
+ a_wt = self.make_branch_and_tree('a')
+ with file('a/un','wb') as f: f.write('UN')
+ with file('a/deux','wb') as f: f.write('DEUX')
+ a_wt.add('un', 'un-id')
+ a_wt.add('deux', 'deux-id')
+ a_wt.commit('r0', rev_id='r0')
+ self.run_bzr('branch a b')
+ b_wt = WorkingTree.open('b')
+ b_wt.rename_one('un','tmp')
+ b_wt.rename_one('deux','un')
+ b_wt.rename_one('tmp','deux')
+ b_wt.commit('r1', rev_id='r1')
+ self.assertEqual(0, a_wt.merge_from_branch(b_wt.branch,
+ b_wt.branch.last_revision(), b_wt.branch.get_rev_id(1)))
+ self.assertPathExists('a/un')
+ self.assertTrue('a/deux')
+ self.assertFalse(os.path.exists('a/tmp'))
+ self.assertEqual(file('a/un').read(),'DEUX')
+ self.assertEqual(file('a/deux').read(),'UN')
+
+ def test_merge_delete_and_add_same(self):
+ a_wt = self.make_branch_and_tree('a')
+ with file('a/file', 'wb') as f: f.write('THIS')
+ a_wt.add('file')
+ a_wt.commit('r0')
+ self.run_bzr('branch a b')
+ b_wt = WorkingTree.open('b')
+ os.remove('b/file')
+ b_wt.commit('r1')
+ with file('b/file', 'wb') as f: f.write('THAT')
+ b_wt.add('file')
+ b_wt.commit('r2')
+ a_wt.merge_from_branch(b_wt.branch, b_wt.branch.last_revision(),
+ b_wt.branch.get_rev_id(1))
+ self.assert_(os.path.exists('a/file'))
+ self.assertEqual(file('a/file').read(),'THAT')
+
+ def test_merge_rename_before_create(self):
+ """rename before create
+
+ This case requires that you must not do creates
+ before move-into-place:
+
+ $ touch foo
+ $ bzr add foo
+ $ bzr commit
+ $ bzr mv foo bar
+ $ touch foo
+ $ bzr add foo
+ $ bzr commit
+ """
+ a_wt = self.make_branch_and_tree('a')
+ with file('a/foo', 'wb') as f: f.write('A/FOO')
+ a_wt.add('foo')
+ a_wt.commit('added foo')
+ self.run_bzr('branch a b')
+ b_wt = WorkingTree.open('b')
+ b_wt.rename_one('foo', 'bar')
+ with file('b/foo', 'wb') as f: f.write('B/FOO')
+ b_wt.add('foo')
+ b_wt.commit('moved foo to bar, added new foo')
+ a_wt.merge_from_branch(b_wt.branch, b_wt.branch.last_revision(),
+ b_wt.branch.get_rev_id(1))
+
+ def test_merge_create_before_rename(self):
+ """create before rename, target parents before children
+
+ This case requires that you must not do move-into-place
+ before creates, and that you must not do children after
+ parents:
+
+ $ touch foo
+ $ bzr add foo
+ $ bzr commit
+ $ bzr mkdir bar
+ $ bzr add bar
+ $ bzr mv foo bar/foo
+ $ bzr commit
+ """
+ os.mkdir('a')
+ a_wt = self.make_branch_and_tree('a')
+ with file('a/foo', 'wb') as f: f.write('A/FOO')
+ a_wt.add('foo')
+ a_wt.commit('added foo')
+ self.run_bzr('branch a b')
+ b_wt = WorkingTree.open('b')
+ os.mkdir('b/bar')
+ b_wt.add('bar')
+ b_wt.rename_one('foo', 'bar/foo')
+ b_wt.commit('created bar dir, moved foo into bar')
+ a_wt.merge_from_branch(b_wt.branch, b_wt.branch.last_revision(),
+ b_wt.branch.get_rev_id(1))
+
+ def test_merge_rename_to_temp_before_delete(self):
+ """rename to temp before delete, source children before parents
+
+ This case requires that you must not do deletes before
+ move-out-of-the-way, and that you must not do children
+ after parents:
+
+ $ mkdir foo
+ $ touch foo/bar
+ $ bzr add foo/bar
+ $ bzr commit
+ $ bzr mv foo/bar bar
+ $ rmdir foo
+ $ bzr commit
+ """
+ a_wt = self.make_branch_and_tree('a')
+ os.mkdir('a/foo')
+ with file('a/foo/bar', 'wb') as f: f.write('A/FOO/BAR')
+ a_wt.add('foo')
+ a_wt.add('foo/bar')
+ a_wt.commit('added foo/bar')
+ self.run_bzr('branch a b')
+ b_wt = WorkingTree.open('b')
+ b_wt.rename_one('foo/bar', 'bar')
+ os.rmdir('b/foo')
+ b_wt.remove('foo')
+ b_wt.commit('moved foo/bar to bar, deleted foo')
+ a_wt.merge_from_branch(b_wt.branch, b_wt.branch.last_revision(),
+ b_wt.branch.get_rev_id(1))
+
+ def test_merge_delete_before_rename_to_temp(self):
+ """delete before rename to temp
+
+ This case requires that you must not do
+ move-out-of-the-way before deletes:
+
+ $ touch foo
+ $ touch bar
+ $ bzr add foo bar
+ $ bzr commit
+ $ rm foo
+ $ bzr rm foo
+ $ bzr mv bar foo
+ $ bzr commit
+ """
+ a_wt = self.make_branch_and_tree('a')
+ with file('a/foo', 'wb') as f: f.write('A/FOO')
+ with file('a/bar', 'wb') as f: f.write('A/BAR')
+ a_wt.add('foo')
+ a_wt.add('bar')
+ a_wt.commit('added foo and bar')
+ self.run_bzr('branch a b')
+ b_wt = WorkingTree.open('b')
+ os.unlink('b/foo')
+ b_wt.remove('foo')
+ b_wt.rename_one('bar', 'foo')
+ b_wt.commit('deleted foo, renamed bar to foo')
+ a_wt.merge_from_branch(b_wt.branch, b_wt.branch.last_revision(),
+ b_wt.branch.get_rev_id(1))
+
+
+class TestMerger(TestCaseWithTransport):
+
+ def set_up_trees(self):
+ this = self.make_branch_and_tree('this')
+ this.commit('rev1', rev_id='rev1')
+ other = this.bzrdir.sprout('other').open_workingtree()
+ this.commit('rev2a', rev_id='rev2a')
+ other.commit('rev2b', rev_id='rev2b')
+ return this, other
+
+ def test_from_revision_ids(self):
+ this, other = self.set_up_trees()
+ self.assertRaises(errors.NoSuchRevision, Merger.from_revision_ids,
+ None, this, 'rev2b')
+ this.lock_write()
+ self.addCleanup(this.unlock)
+ merger = Merger.from_revision_ids(None, this,
+ 'rev2b', other_branch=other.branch)
+ self.assertEqual('rev2b', merger.other_rev_id)
+ self.assertEqual('rev1', merger.base_rev_id)
+ merger = Merger.from_revision_ids(None, this,
+ 'rev2b', 'rev2a', other_branch=other.branch)
+ self.assertEqual('rev2a', merger.base_rev_id)
+
+ def test_from_uncommitted(self):
+ this, other = self.set_up_trees()
+ merger = Merger.from_uncommitted(this, other, None)
+ self.assertIs(other, merger.other_tree)
+ self.assertIs(None, merger.other_rev_id)
+ self.assertEqual('rev2b', merger.base_rev_id)
+
+ def prepare_for_merging(self):
+ this, other = self.set_up_trees()
+ other.commit('rev3', rev_id='rev3')
+ this.lock_write()
+ self.addCleanup(this.unlock)
+ return this, other
+
+ def test_from_mergeable(self):
+ this, other = self.prepare_for_merging()
+ md = merge_directive.MergeDirective2.from_objects(
+ other.branch.repository, 'rev3', 0, 0, 'this')
+ other.lock_read()
+ self.addCleanup(other.unlock)
+ merger, verified = Merger.from_mergeable(this, md,
+ None)
+ md.patch = None
+ merger, verified = Merger.from_mergeable(this, md,
+ None)
+ self.assertEqual('inapplicable', verified)
+ self.assertEqual('rev3', merger.other_rev_id)
+ self.assertEqual('rev1', merger.base_rev_id)
+ md.base_revision_id = 'rev2b'
+ merger, verified = Merger.from_mergeable(this, md,
+ None)
+ self.assertEqual('rev2b', merger.base_rev_id)
+
+ def test_from_mergeable_old_merge_directive(self):
+ this, other = self.prepare_for_merging()
+ other.lock_write()
+ self.addCleanup(other.unlock)
+ md = merge_directive.MergeDirective.from_objects(
+ other.branch.repository, 'rev3', 0, 0, 'this')
+ merger, verified = Merger.from_mergeable(this, md,
+ None)
+ self.assertEqual('rev3', merger.other_rev_id)
+ self.assertEqual('rev1', merger.base_rev_id)