summaryrefslogtreecommitdiff
path: root/bzrlib/tests/test_graph.py
diff options
context:
space:
mode:
Diffstat (limited to 'bzrlib/tests/test_graph.py')
-rw-r--r--bzrlib/tests/test_graph.py1743
1 files changed, 1743 insertions, 0 deletions
diff --git a/bzrlib/tests/test_graph.py b/bzrlib/tests/test_graph.py
new file mode 100644
index 0000000..312d60d
--- /dev/null
+++ b/bzrlib/tests/test_graph.py
@@ -0,0 +1,1743 @@
+# Copyright (C) 2007-2011 Canonical Ltd
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+from bzrlib import (
+ errors,
+ graph as _mod_graph,
+ tests,
+ )
+from bzrlib.revision import NULL_REVISION
+from bzrlib.tests import TestCaseWithMemoryTransport
+
+
+# Ancestry 1:
+#
+# NULL_REVISION
+# |
+# rev1
+# /\
+# rev2a rev2b
+# | |
+# rev3 /
+# | /
+# rev4
+ancestry_1 = {'rev1': [NULL_REVISION], 'rev2a': ['rev1'], 'rev2b': ['rev1'],
+ 'rev3': ['rev2a'], 'rev4': ['rev3', 'rev2b']}
+
+
+# Ancestry 2:
+#
+# NULL_REVISION
+# / \
+# rev1a rev1b
+# |
+# rev2a
+# |
+# rev3a
+# |
+# rev4a
+ancestry_2 = {'rev1a': [NULL_REVISION], 'rev2a': ['rev1a'],
+ 'rev1b': [NULL_REVISION], 'rev3a': ['rev2a'], 'rev4a': ['rev3a']}
+
+
+# Criss cross ancestry
+#
+# NULL_REVISION
+# |
+# rev1
+# / \
+# rev2a rev2b
+# |\ /|
+# | X |
+# |/ \|
+# rev3a rev3b
+criss_cross = {'rev1': [NULL_REVISION], 'rev2a': ['rev1'], 'rev2b': ['rev1'],
+ 'rev3a': ['rev2a', 'rev2b'], 'rev3b': ['rev2b', 'rev2a']}
+
+
+# Criss-cross 2
+#
+# NULL_REVISION
+# / \
+# rev1a rev1b
+# |\ /|
+# | \ / |
+# | X |
+# | / \ |
+# |/ \|
+# rev2a rev2b
+criss_cross2 = {'rev1a': [NULL_REVISION], 'rev1b': [NULL_REVISION],
+ 'rev2a': ['rev1a', 'rev1b'], 'rev2b': ['rev1b', 'rev1a']}
+
+
+# Mainline:
+#
+# NULL_REVISION
+# |
+# rev1
+# / \
+# | rev2b
+# | /
+# rev2a
+mainline = {'rev1': [NULL_REVISION], 'rev2a': ['rev1', 'rev2b'],
+ 'rev2b': ['rev1']}
+
+
+# feature branch:
+#
+# NULL_REVISION
+# |
+# rev1
+# |
+# rev2b
+# |
+# rev3b
+feature_branch = {'rev1': [NULL_REVISION],
+ 'rev2b': ['rev1'], 'rev3b': ['rev2b']}
+
+
+# History shortcut
+# NULL_REVISION
+# |
+# rev1------
+# / \ \
+# rev2a rev2b rev2c
+# | / \ /
+# rev3a rev3b
+history_shortcut = {'rev1': [NULL_REVISION], 'rev2a': ['rev1'],
+ 'rev2b': ['rev1'], 'rev2c': ['rev1'],
+ 'rev3a': ['rev2a', 'rev2b'], 'rev3b': ['rev2b', 'rev2c']}
+
+# Extended history shortcut
+# NULL_REVISION
+# |
+# a
+# |\
+# b |
+# | |
+# c |
+# | |
+# d |
+# |\|
+# e f
+extended_history_shortcut = {'a': [NULL_REVISION],
+ 'b': ['a'],
+ 'c': ['b'],
+ 'd': ['c'],
+ 'e': ['d'],
+ 'f': ['a', 'd'],
+ }
+
+# Double shortcut
+# Both sides will see 'A' first, even though it is actually a decendent of a
+# different common revision.
+#
+# NULL_REVISION
+# |
+# a
+# /|\
+# / b \
+# / | \
+# | c |
+# | / \ |
+# | d e |
+# |/ \|
+# f g
+
+double_shortcut = {'a':[NULL_REVISION], 'b':['a'], 'c':['b'],
+ 'd':['c'], 'e':['c'], 'f':['a', 'd'],
+ 'g':['a', 'e']}
+
+# Complex shortcut
+# This has a failure mode in that a shortcut will find some nodes in common,
+# but the common searcher won't have time to find that one branch is actually
+# in common. The extra nodes at the beginning are because we want to avoid
+# walking off the graph. Specifically, node G should be considered common, but
+# is likely to be seen by M long before the common searcher finds it.
+#
+# NULL_REVISION
+# |
+# a
+# |
+# b
+# |
+# c
+# |
+# d
+# |\
+# e f
+# | |\
+# | g h
+# |/| |
+# i j |
+# | | |
+# | k |
+# | | |
+# | l |
+# |/|/
+# m n
+complex_shortcut = {'a':[NULL_REVISION], 'b':['a'], 'c':['b'], 'd':['c'],
+ 'e':['d'], 'f':['d'], 'g':['f'], 'h':['f'],
+ 'i':['e', 'g'], 'j':['g'], 'k':['j'],
+ 'l':['k'], 'm':['i', 'l'], 'n':['l', 'h']}
+
+# NULL_REVISION
+# |
+# a
+# |
+# b
+# |
+# c
+# |
+# d
+# |\
+# e |
+# | |
+# f |
+# | |
+# g h
+# | |\
+# i | j
+# |\| |
+# | k |
+# | | |
+# | l |
+# | | |
+# | m |
+# | | |
+# | n |
+# | | |
+# | o |
+# | | |
+# | p |
+# | | |
+# | q |
+# | | |
+# | r |
+# | | |
+# | s |
+# | | |
+# |/|/
+# t u
+complex_shortcut2 = {'a':[NULL_REVISION], 'b':['a'], 'c':['b'], 'd':['c'],
+ 'e':['d'], 'f':['e'], 'g':['f'], 'h':['d'], 'i':['g'],
+ 'j':['h'], 'k':['h', 'i'], 'l':['k'], 'm':['l'], 'n':['m'],
+ 'o':['n'], 'p':['o'], 'q':['p'], 'r':['q'], 's':['r'],
+ 't':['i', 's'], 'u':['s', 'j'],
+ }
+
+# Graph where different walkers will race to find the common and uncommon
+# nodes.
+#
+# NULL_REVISION
+# |
+# a
+# |
+# b
+# |
+# c
+# |
+# d
+# |\
+# e k
+# | |
+# f-+-p
+# | | |
+# | l |
+# | | |
+# | m |
+# | |\|
+# g n q
+# |\| |
+# h o |
+# |/| |
+# i r |
+# | | |
+# | s |
+# | | |
+# | t |
+# | | |
+# | u |
+# | | |
+# | v |
+# | | |
+# | w |
+# | | |
+# | x |
+# | |\|
+# | y z
+# |/
+# j
+#
+# x is found to be common right away, but is the start of a long series of
+# common commits.
+# o is actually common, but the i-j shortcut makes it look like it is actually
+# unique to j at first, you have to traverse all of x->o to find it.
+# q,m gives the walker from j a common point to stop searching, as does p,f.
+# k-n exists so that the second pass still has nodes that are worth searching,
+# rather than instantly cancelling the extra walker.
+
+racing_shortcuts = {'a':[NULL_REVISION], 'b':['a'], 'c':['b'], 'd':['c'],
+ 'e':['d'], 'f':['e'], 'g':['f'], 'h':['g'], 'i':['h', 'o'], 'j':['i', 'y'],
+ 'k':['d'], 'l':['k'], 'm':['l'], 'n':['m'], 'o':['n', 'g'], 'p':['f'],
+ 'q':['p', 'm'], 'r':['o'], 's':['r'], 't':['s'], 'u':['t'], 'v':['u'],
+ 'w':['v'], 'x':['w'], 'y':['x'], 'z':['x', 'q']}
+
+
+# A graph with multiple nodes unique to one side.
+#
+# NULL_REVISION
+# |
+# a
+# |
+# b
+# |
+# c
+# |
+# d
+# |\
+# e f
+# |\ \
+# g h i
+# |\ \ \
+# j k l m
+# | |/ x|
+# | n o p
+# | |/ |
+# | q |
+# | | |
+# | r |
+# | | |
+# | s |
+# | | |
+# | t |
+# | | |
+# | u |
+# | | |
+# | v |
+# | | |
+# | w |
+# | | |
+# | x |
+# |/ \ /
+# y z
+#
+
+multiple_interesting_unique = {'a':[NULL_REVISION], 'b':['a'], 'c':['b'],
+ 'd':['c'], 'e':['d'], 'f':['d'], 'g':['e'], 'h':['e'], 'i':['f'],
+ 'j':['g'], 'k':['g'], 'l':['h'], 'm':['i'], 'n':['k', 'l'],
+ 'o':['m'], 'p':['m', 'l'], 'q':['n', 'o'], 'r':['q'], 's':['r'],
+ 't':['s'], 'u':['t'], 'v':['u'], 'w':['v'], 'x':['w'],
+ 'y':['j', 'x'], 'z':['x', 'p']}
+
+
+# Shortcut with extra root
+# We have a long history shortcut, and an extra root, which is why we can't
+# stop searchers based on seeing NULL_REVISION
+# NULL_REVISION
+# | |
+# a |
+# |\ |
+# b | |
+# | | |
+# c | |
+# | | |
+# d | g
+# |\|/
+# e f
+shortcut_extra_root = {'a': [NULL_REVISION],
+ 'b': ['a'],
+ 'c': ['b'],
+ 'd': ['c'],
+ 'e': ['d'],
+ 'f': ['a', 'd', 'g'],
+ 'g': [NULL_REVISION],
+ }
+
+# NULL_REVISION
+# |
+# f
+# |
+# e
+# / \
+# b d
+# | \ |
+# a c
+
+boundary = {'a': ['b'], 'c': ['b', 'd'], 'b':['e'], 'd':['e'], 'e': ['f'],
+ 'f':[NULL_REVISION]}
+
+
+# A graph that contains a ghost
+# NULL_REVISION
+# |
+# f
+# |
+# e g
+# / \ /
+# b d
+# | \ |
+# a c
+
+with_ghost = {'a': ['b'], 'c': ['b', 'd'], 'b':['e'], 'd':['e', 'g'],
+ 'e': ['f'], 'f':[NULL_REVISION], NULL_REVISION:()}
+
+# A graph that shows we can shortcut finding revnos when reaching them from the
+# side.
+# NULL_REVISION
+# |
+# a
+# |
+# b
+# |
+# c
+# |
+# d
+# |
+# e
+# / \
+# f g
+# |
+# h
+# |
+# i
+
+with_tail = {'a':[NULL_REVISION], 'b':['a'], 'c':['b'], 'd':['c'], 'e':['d'],
+ 'f':['e'], 'g':['e'], 'h':['f'], 'i':['h']}
+
+
+class InstrumentedParentsProvider(object):
+
+ def __init__(self, parents_provider):
+ self.calls = []
+ self._real_parents_provider = parents_provider
+ get_cached = getattr(parents_provider, 'get_cached_parent_map', None)
+ if get_cached is not None:
+ # Only expose the underlying 'get_cached_parent_map' function if
+ # the wrapped provider has it.
+ self.get_cached_parent_map = self._get_cached_parent_map
+
+ def get_parent_map(self, nodes):
+ self.calls.extend(nodes)
+ return self._real_parents_provider.get_parent_map(nodes)
+
+ def _get_cached_parent_map(self, nodes):
+ self.calls.append(('cached', sorted(nodes)))
+ return self._real_parents_provider.get_cached_parent_map(nodes)
+
+
+class SharedInstrumentedParentsProvider(object):
+
+ def __init__(self, parents_provider, calls, info):
+ self.calls = calls
+ self.info = info
+ self._real_parents_provider = parents_provider
+ get_cached = getattr(parents_provider, 'get_cached_parent_map', None)
+ if get_cached is not None:
+ # Only expose the underlying 'get_cached_parent_map' function if
+ # the wrapped provider has it.
+ self.get_cached_parent_map = self._get_cached_parent_map
+
+ def get_parent_map(self, nodes):
+ self.calls.append((self.info, sorted(nodes)))
+ return self._real_parents_provider.get_parent_map(nodes)
+
+ def _get_cached_parent_map(self, nodes):
+ self.calls.append((self.info, 'cached', sorted(nodes)))
+ return self._real_parents_provider.get_cached_parent_map(nodes)
+
+
+class TestGraphBase(tests.TestCase):
+
+ def make_graph(self, ancestors):
+ return _mod_graph.Graph(_mod_graph.DictParentsProvider(ancestors))
+
+ def make_breaking_graph(self, ancestors, break_on):
+ """Make a Graph that raises an exception if we hit a node."""
+ g = self.make_graph(ancestors)
+ orig_parent_map = g.get_parent_map
+ def get_parent_map(keys):
+ bad_keys = set(keys).intersection(break_on)
+ if bad_keys:
+ self.fail('key(s) %s was accessed' % (sorted(bad_keys),))
+ return orig_parent_map(keys)
+ g.get_parent_map = get_parent_map
+ return g
+
+
+class TestGraph(TestCaseWithMemoryTransport):
+
+ def make_graph(self, ancestors):
+ return _mod_graph.Graph(_mod_graph.DictParentsProvider(ancestors))
+
+ def prepare_memory_tree(self, location):
+ tree = self.make_branch_and_memory_tree(location)
+ tree.lock_write()
+ tree.add('.')
+ return tree
+
+ def build_ancestry(self, tree, ancestors):
+ """Create an ancestry as specified by a graph dict
+
+ :param tree: A tree to use
+ :param ancestors: a dict of {node: [node_parent, ...]}
+ """
+ pending = [NULL_REVISION]
+ descendants = {}
+ for descendant, parents in ancestors.iteritems():
+ for parent in parents:
+ descendants.setdefault(parent, []).append(descendant)
+ while len(pending) > 0:
+ cur_node = pending.pop()
+ for descendant in descendants.get(cur_node, []):
+ if tree.branch.repository.has_revision(descendant):
+ continue
+ parents = [p for p in ancestors[descendant] if p is not
+ NULL_REVISION]
+ if len([p for p in parents if not
+ tree.branch.repository.has_revision(p)]) > 0:
+ continue
+ tree.set_parent_ids(parents)
+ if len(parents) > 0:
+ left_parent = parents[0]
+ else:
+ left_parent = NULL_REVISION
+ tree.branch.set_last_revision_info(
+ len(tree.branch._lefthand_history(left_parent)),
+ left_parent)
+ tree.commit(descendant, rev_id=descendant)
+ pending.append(descendant)
+
+ def test_lca(self):
+ """Test finding least common ancestor.
+
+ ancestry_1 should always have a single common ancestor
+ """
+ graph = self.make_graph(ancestry_1)
+ self.assertRaises(errors.InvalidRevisionId, graph.find_lca, None)
+ self.assertEqual(set([NULL_REVISION]),
+ graph.find_lca(NULL_REVISION, NULL_REVISION))
+ self.assertEqual(set([NULL_REVISION]),
+ graph.find_lca(NULL_REVISION, 'rev1'))
+ self.assertEqual(set(['rev1']), graph.find_lca('rev1', 'rev1'))
+ self.assertEqual(set(['rev1']), graph.find_lca('rev2a', 'rev2b'))
+
+ def test_no_unique_lca(self):
+ """Test error when one revision is not in the graph"""
+ graph = self.make_graph(ancestry_1)
+ self.assertRaises(errors.NoCommonAncestor, graph.find_unique_lca,
+ 'rev1', '1rev')
+
+ def test_lca_criss_cross(self):
+ """Test least-common-ancestor after a criss-cross merge."""
+ graph = self.make_graph(criss_cross)
+ self.assertEqual(set(['rev2a', 'rev2b']),
+ graph.find_lca('rev3a', 'rev3b'))
+ self.assertEqual(set(['rev2b']),
+ graph.find_lca('rev3a', 'rev3b', 'rev2b'))
+
+ def test_lca_shortcut(self):
+ """Test least-common ancestor on this history shortcut"""
+ graph = self.make_graph(history_shortcut)
+ self.assertEqual(set(['rev2b']), graph.find_lca('rev3a', 'rev3b'))
+
+ def test_lefthand_distance_smoke(self):
+ """A simple does it work test for graph.lefthand_distance(keys)."""
+ graph = self.make_graph(history_shortcut)
+ distance_graph = graph.find_lefthand_distances(['rev3b', 'rev2a'])
+ self.assertEqual({'rev2a': 2, 'rev3b': 3}, distance_graph)
+
+ def test_lefthand_distance_ghosts(self):
+ """A simple does it work test for graph.lefthand_distance(keys)."""
+ nodes = {'nonghost':[NULL_REVISION], 'toghost':['ghost']}
+ graph = self.make_graph(nodes)
+ distance_graph = graph.find_lefthand_distances(['nonghost', 'toghost'])
+ self.assertEqual({'nonghost': 1, 'toghost': -1}, distance_graph)
+
+ def test_recursive_unique_lca(self):
+ """Test finding a unique least common ancestor.
+
+ ancestry_1 should always have a single common ancestor
+ """
+ graph = self.make_graph(ancestry_1)
+ self.assertEqual(NULL_REVISION,
+ graph.find_unique_lca(NULL_REVISION, NULL_REVISION))
+ self.assertEqual(NULL_REVISION,
+ graph.find_unique_lca(NULL_REVISION, 'rev1'))
+ self.assertEqual('rev1', graph.find_unique_lca('rev1', 'rev1'))
+ self.assertEqual('rev1', graph.find_unique_lca('rev2a', 'rev2b'))
+ self.assertEqual(('rev1', 1,),
+ graph.find_unique_lca('rev2a', 'rev2b',
+ count_steps=True))
+
+ def assertRemoveDescendants(self, expected, graph, revisions):
+ parents = graph.get_parent_map(revisions)
+ self.assertEqual(expected,
+ graph._remove_simple_descendants(revisions, parents))
+
+ def test__remove_simple_descendants(self):
+ graph = self.make_graph(ancestry_1)
+ self.assertRemoveDescendants(set(['rev1']), graph,
+ set(['rev1', 'rev2a', 'rev2b', 'rev3', 'rev4']))
+
+ def test__remove_simple_descendants_disjoint(self):
+ graph = self.make_graph(ancestry_1)
+ self.assertRemoveDescendants(set(['rev1', 'rev3']), graph,
+ set(['rev1', 'rev3']))
+
+ def test__remove_simple_descendants_chain(self):
+ graph = self.make_graph(ancestry_1)
+ self.assertRemoveDescendants(set(['rev1']), graph,
+ set(['rev1', 'rev2a', 'rev3']))
+
+ def test__remove_simple_descendants_siblings(self):
+ graph = self.make_graph(ancestry_1)
+ self.assertRemoveDescendants(set(['rev2a', 'rev2b']), graph,
+ set(['rev2a', 'rev2b', 'rev3']))
+
+ def test_unique_lca_criss_cross(self):
+ """Ensure we don't pick non-unique lcas in a criss-cross"""
+ graph = self.make_graph(criss_cross)
+ self.assertEqual('rev1', graph.find_unique_lca('rev3a', 'rev3b'))
+ lca, steps = graph.find_unique_lca('rev3a', 'rev3b', count_steps=True)
+ self.assertEqual('rev1', lca)
+ self.assertEqual(2, steps)
+
+ def test_unique_lca_null_revision(self):
+ """Ensure we pick NULL_REVISION when necessary"""
+ graph = self.make_graph(criss_cross2)
+ self.assertEqual('rev1b', graph.find_unique_lca('rev2a', 'rev1b'))
+ self.assertEqual(NULL_REVISION,
+ graph.find_unique_lca('rev2a', 'rev2b'))
+
+ def test_unique_lca_null_revision2(self):
+ """Ensure we pick NULL_REVISION when necessary"""
+ graph = self.make_graph(ancestry_2)
+ self.assertEqual(NULL_REVISION,
+ graph.find_unique_lca('rev4a', 'rev1b'))
+
+ def test_lca_double_shortcut(self):
+ graph = self.make_graph(double_shortcut)
+ self.assertEqual('c', graph.find_unique_lca('f', 'g'))
+
+ def test_common_ancestor_two_repos(self):
+ """Ensure we do unique_lca using data from two repos"""
+ mainline_tree = self.prepare_memory_tree('mainline')
+ self.build_ancestry(mainline_tree, mainline)
+ self.addCleanup(mainline_tree.unlock)
+
+ # This is cheating, because the revisions in the graph are actually
+ # different revisions, despite having the same revision-id.
+ feature_tree = self.prepare_memory_tree('feature')
+ self.build_ancestry(feature_tree, feature_branch)
+ self.addCleanup(feature_tree.unlock)
+
+ graph = mainline_tree.branch.repository.get_graph(
+ feature_tree.branch.repository)
+ self.assertEqual('rev2b', graph.find_unique_lca('rev2a', 'rev3b'))
+
+ def test_graph_difference(self):
+ graph = self.make_graph(ancestry_1)
+ self.assertEqual((set(), set()), graph.find_difference('rev1', 'rev1'))
+ self.assertEqual((set(), set(['rev1'])),
+ graph.find_difference(NULL_REVISION, 'rev1'))
+ self.assertEqual((set(['rev1']), set()),
+ graph.find_difference('rev1', NULL_REVISION))
+ self.assertEqual((set(['rev2a', 'rev3']), set(['rev2b'])),
+ graph.find_difference('rev3', 'rev2b'))
+ self.assertEqual((set(['rev4', 'rev3', 'rev2a']), set()),
+ graph.find_difference('rev4', 'rev2b'))
+
+ def test_graph_difference_separate_ancestry(self):
+ graph = self.make_graph(ancestry_2)
+ self.assertEqual((set(['rev1a']), set(['rev1b'])),
+ graph.find_difference('rev1a', 'rev1b'))
+ self.assertEqual((set(['rev1a', 'rev2a', 'rev3a', 'rev4a']),
+ set(['rev1b'])),
+ graph.find_difference('rev4a', 'rev1b'))
+
+ def test_graph_difference_criss_cross(self):
+ graph = self.make_graph(criss_cross)
+ self.assertEqual((set(['rev3a']), set(['rev3b'])),
+ graph.find_difference('rev3a', 'rev3b'))
+ self.assertEqual((set([]), set(['rev3b', 'rev2b'])),
+ graph.find_difference('rev2a', 'rev3b'))
+
+ def test_graph_difference_extended_history(self):
+ graph = self.make_graph(extended_history_shortcut)
+ self.assertEqual((set(['e']), set(['f'])),
+ graph.find_difference('e', 'f'))
+ self.assertEqual((set(['f']), set(['e'])),
+ graph.find_difference('f', 'e'))
+
+ def test_graph_difference_double_shortcut(self):
+ graph = self.make_graph(double_shortcut)
+ self.assertEqual((set(['d', 'f']), set(['e', 'g'])),
+ graph.find_difference('f', 'g'))
+
+ def test_graph_difference_complex_shortcut(self):
+ graph = self.make_graph(complex_shortcut)
+ self.assertEqual((set(['m', 'i', 'e']), set(['n', 'h'])),
+ graph.find_difference('m', 'n'))
+
+ def test_graph_difference_complex_shortcut2(self):
+ graph = self.make_graph(complex_shortcut2)
+ self.assertEqual((set(['t']), set(['j', 'u'])),
+ graph.find_difference('t', 'u'))
+
+ def test_graph_difference_shortcut_extra_root(self):
+ graph = self.make_graph(shortcut_extra_root)
+ self.assertEqual((set(['e']), set(['f', 'g'])),
+ graph.find_difference('e', 'f'))
+
+ def test_iter_topo_order(self):
+ graph = self.make_graph(ancestry_1)
+ args = ['rev2a', 'rev3', 'rev1']
+ topo_args = list(graph.iter_topo_order(args))
+ self.assertEqual(set(args), set(topo_args))
+ self.assertTrue(topo_args.index('rev2a') > topo_args.index('rev1'))
+ self.assertTrue(topo_args.index('rev2a') < topo_args.index('rev3'))
+
+ def test_is_ancestor(self):
+ graph = self.make_graph(ancestry_1)
+ self.assertEqual(True, graph.is_ancestor('null:', 'null:'))
+ self.assertEqual(True, graph.is_ancestor('null:', 'rev1'))
+ self.assertEqual(False, graph.is_ancestor('rev1', 'null:'))
+ self.assertEqual(True, graph.is_ancestor('null:', 'rev4'))
+ self.assertEqual(False, graph.is_ancestor('rev4', 'null:'))
+ self.assertEqual(False, graph.is_ancestor('rev4', 'rev2b'))
+ self.assertEqual(True, graph.is_ancestor('rev2b', 'rev4'))
+ self.assertEqual(False, graph.is_ancestor('rev2b', 'rev3'))
+ self.assertEqual(False, graph.is_ancestor('rev3', 'rev2b'))
+ instrumented_provider = InstrumentedParentsProvider(graph)
+ instrumented_graph = _mod_graph.Graph(instrumented_provider)
+ instrumented_graph.is_ancestor('rev2a', 'rev2b')
+ self.assertTrue('null:' not in instrumented_provider.calls)
+
+ def test_is_between(self):
+ graph = self.make_graph(ancestry_1)
+ self.assertEqual(True, graph.is_between('null:', 'null:', 'null:'))
+ self.assertEqual(True, graph.is_between('rev1', 'null:', 'rev1'))
+ self.assertEqual(True, graph.is_between('rev1', 'rev1', 'rev4'))
+ self.assertEqual(True, graph.is_between('rev4', 'rev1', 'rev4'))
+ self.assertEqual(True, graph.is_between('rev3', 'rev1', 'rev4'))
+ self.assertEqual(False, graph.is_between('rev4', 'rev1', 'rev3'))
+ self.assertEqual(False, graph.is_between('rev1', 'rev2a', 'rev4'))
+ self.assertEqual(False, graph.is_between('null:', 'rev1', 'rev4'))
+
+ def test_is_ancestor_boundary(self):
+ """Ensure that we avoid searching the whole graph.
+
+ This requires searching through b as a common ancestor, so we
+ can identify that e is common.
+ """
+ graph = self.make_graph(boundary)
+ instrumented_provider = InstrumentedParentsProvider(graph)
+ graph = _mod_graph.Graph(instrumented_provider)
+ self.assertFalse(graph.is_ancestor('a', 'c'))
+ self.assertTrue('null:' not in instrumented_provider.calls)
+
+ def test_iter_ancestry(self):
+ nodes = boundary.copy()
+ nodes[NULL_REVISION] = ()
+ graph = self.make_graph(nodes)
+ expected = nodes.copy()
+ expected.pop('a') # 'a' is not in the ancestry of 'c', all the
+ # other nodes are
+ self.assertEqual(expected, dict(graph.iter_ancestry(['c'])))
+ self.assertEqual(nodes, dict(graph.iter_ancestry(['a', 'c'])))
+
+ def test_iter_ancestry_with_ghost(self):
+ graph = self.make_graph(with_ghost)
+ expected = with_ghost.copy()
+ # 'a' is not in the ancestry of 'c', and 'g' is a ghost
+ expected['g'] = None
+ self.assertEqual(expected, dict(graph.iter_ancestry(['a', 'c'])))
+ expected.pop('a')
+ self.assertEqual(expected, dict(graph.iter_ancestry(['c'])))
+
+ def test_filter_candidate_lca(self):
+ """Test filter_candidate_lca for a corner case
+
+ This tests the case where we encounter the end of iteration for 'e'
+ in the same pass as we discover that 'd' is an ancestor of 'e', and
+ therefore 'e' can't be an lca.
+
+ To compensate for different dict orderings on other Python
+ implementations, we mirror 'd' and 'e' with 'b' and 'a'.
+ """
+ # This test is sensitive to the iteration order of dicts. It will
+ # pass incorrectly if 'e' and 'a' sort before 'c'
+ #
+ # NULL_REVISION
+ # / \
+ # a e
+ # | |
+ # b d
+ # \ /
+ # c
+ graph = self.make_graph({'c': ['b', 'd'], 'd': ['e'], 'b': ['a'],
+ 'a': [NULL_REVISION], 'e': [NULL_REVISION]})
+ self.assertEqual(set(['c']), graph.heads(['a', 'c', 'e']))
+
+ def test_heads_null(self):
+ graph = self.make_graph(ancestry_1)
+ self.assertEqual(set(['null:']), graph.heads(['null:']))
+ self.assertEqual(set(['rev1']), graph.heads(['null:', 'rev1']))
+ self.assertEqual(set(['rev1']), graph.heads(['rev1', 'null:']))
+ self.assertEqual(set(['rev1']), graph.heads(set(['rev1', 'null:'])))
+ self.assertEqual(set(['rev1']), graph.heads(('rev1', 'null:')))
+
+ def test_heads_one(self):
+ # A single node will always be a head
+ graph = self.make_graph(ancestry_1)
+ self.assertEqual(set(['null:']), graph.heads(['null:']))
+ self.assertEqual(set(['rev1']), graph.heads(['rev1']))
+ self.assertEqual(set(['rev2a']), graph.heads(['rev2a']))
+ self.assertEqual(set(['rev2b']), graph.heads(['rev2b']))
+ self.assertEqual(set(['rev3']), graph.heads(['rev3']))
+ self.assertEqual(set(['rev4']), graph.heads(['rev4']))
+
+ def test_heads_single(self):
+ graph = self.make_graph(ancestry_1)
+ self.assertEqual(set(['rev4']), graph.heads(['null:', 'rev4']))
+ self.assertEqual(set(['rev2a']), graph.heads(['rev1', 'rev2a']))
+ self.assertEqual(set(['rev2b']), graph.heads(['rev1', 'rev2b']))
+ self.assertEqual(set(['rev3']), graph.heads(['rev1', 'rev3']))
+ self.assertEqual(set(['rev4']), graph.heads(['rev1', 'rev4']))
+ self.assertEqual(set(['rev4']), graph.heads(['rev2a', 'rev4']))
+ self.assertEqual(set(['rev4']), graph.heads(['rev2b', 'rev4']))
+ self.assertEqual(set(['rev4']), graph.heads(['rev3', 'rev4']))
+
+ def test_heads_two_heads(self):
+ graph = self.make_graph(ancestry_1)
+ self.assertEqual(set(['rev2a', 'rev2b']),
+ graph.heads(['rev2a', 'rev2b']))
+ self.assertEqual(set(['rev3', 'rev2b']),
+ graph.heads(['rev3', 'rev2b']))
+
+ def test_heads_criss_cross(self):
+ graph = self.make_graph(criss_cross)
+ self.assertEqual(set(['rev2a']),
+ graph.heads(['rev2a', 'rev1']))
+ self.assertEqual(set(['rev2b']),
+ graph.heads(['rev2b', 'rev1']))
+ self.assertEqual(set(['rev3a']),
+ graph.heads(['rev3a', 'rev1']))
+ self.assertEqual(set(['rev3b']),
+ graph.heads(['rev3b', 'rev1']))
+ self.assertEqual(set(['rev2a', 'rev2b']),
+ graph.heads(['rev2a', 'rev2b']))
+ self.assertEqual(set(['rev3a']),
+ graph.heads(['rev3a', 'rev2a']))
+ self.assertEqual(set(['rev3a']),
+ graph.heads(['rev3a', 'rev2b']))
+ self.assertEqual(set(['rev3a']),
+ graph.heads(['rev3a', 'rev2a', 'rev2b']))
+ self.assertEqual(set(['rev3b']),
+ graph.heads(['rev3b', 'rev2a']))
+ self.assertEqual(set(['rev3b']),
+ graph.heads(['rev3b', 'rev2b']))
+ self.assertEqual(set(['rev3b']),
+ graph.heads(['rev3b', 'rev2a', 'rev2b']))
+ self.assertEqual(set(['rev3a', 'rev3b']),
+ graph.heads(['rev3a', 'rev3b']))
+ self.assertEqual(set(['rev3a', 'rev3b']),
+ graph.heads(['rev3a', 'rev3b', 'rev2a', 'rev2b']))
+
+ def test_heads_shortcut(self):
+ graph = self.make_graph(history_shortcut)
+
+ self.assertEqual(set(['rev2a', 'rev2b', 'rev2c']),
+ graph.heads(['rev2a', 'rev2b', 'rev2c']))
+ self.assertEqual(set(['rev3a', 'rev3b']),
+ graph.heads(['rev3a', 'rev3b']))
+ self.assertEqual(set(['rev3a', 'rev3b']),
+ graph.heads(['rev2a', 'rev3a', 'rev3b']))
+ self.assertEqual(set(['rev2a', 'rev3b']),
+ graph.heads(['rev2a', 'rev3b']))
+ self.assertEqual(set(['rev2c', 'rev3a']),
+ graph.heads(['rev2c', 'rev3a']))
+
+ def _run_heads_break_deeper(self, graph_dict, search):
+ """Run heads on a graph-as-a-dict.
+
+ If the search asks for the parents of 'deeper' the test will fail.
+ """
+ class stub(object):
+ pass
+ def get_parent_map(keys):
+ result = {}
+ for key in keys:
+ if key == 'deeper':
+ self.fail('key deeper was accessed')
+ result[key] = graph_dict[key]
+ return result
+ an_obj = stub()
+ an_obj.get_parent_map = get_parent_map
+ graph = _mod_graph.Graph(an_obj)
+ return graph.heads(search)
+
+ def test_heads_limits_search(self):
+ # test that a heads query does not search all of history
+ graph_dict = {
+ 'left':['common'],
+ 'right':['common'],
+ 'common':['deeper'],
+ }
+ self.assertEqual(set(['left', 'right']),
+ self._run_heads_break_deeper(graph_dict, ['left', 'right']))
+
+ def test_heads_limits_search_assymetric(self):
+ # test that a heads query does not search all of history
+ graph_dict = {
+ 'left':['midleft'],
+ 'midleft':['common'],
+ 'right':['common'],
+ 'common':['aftercommon'],
+ 'aftercommon':['deeper'],
+ }
+ self.assertEqual(set(['left', 'right']),
+ self._run_heads_break_deeper(graph_dict, ['left', 'right']))
+
+ def test_heads_limits_search_common_search_must_continue(self):
+ # test that common nodes are still queried, preventing
+ # all-the-way-to-origin behaviour in the following graph:
+ graph_dict = {
+ 'h1':['shortcut', 'common1'],
+ 'h2':['common1'],
+ 'shortcut':['common2'],
+ 'common1':['common2'],
+ 'common2':['deeper'],
+ }
+ self.assertEqual(set(['h1', 'h2']),
+ self._run_heads_break_deeper(graph_dict, ['h1', 'h2']))
+
+ def test_breadth_first_search_start_ghosts(self):
+ graph = self.make_graph({})
+ # with_ghosts reports the ghosts
+ search = graph._make_breadth_first_searcher(['a-ghost'])
+ self.assertEqual((set(), set(['a-ghost'])), search.next_with_ghosts())
+ self.assertRaises(StopIteration, search.next_with_ghosts)
+ # next includes them
+ search = graph._make_breadth_first_searcher(['a-ghost'])
+ self.assertEqual(set(['a-ghost']), search.next())
+ self.assertRaises(StopIteration, search.next)
+
+ def test_breadth_first_search_deep_ghosts(self):
+ graph = self.make_graph({
+ 'head':['present'],
+ 'present':['child', 'ghost'],
+ 'child':[],
+ })
+ # with_ghosts reports the ghosts
+ search = graph._make_breadth_first_searcher(['head'])
+ self.assertEqual((set(['head']), set()), search.next_with_ghosts())
+ self.assertEqual((set(['present']), set()), search.next_with_ghosts())
+ self.assertEqual((set(['child']), set(['ghost'])),
+ search.next_with_ghosts())
+ self.assertRaises(StopIteration, search.next_with_ghosts)
+ # next includes them
+ search = graph._make_breadth_first_searcher(['head'])
+ self.assertEqual(set(['head']), search.next())
+ self.assertEqual(set(['present']), search.next())
+ self.assertEqual(set(['child', 'ghost']),
+ search.next())
+ self.assertRaises(StopIteration, search.next)
+
+ def test_breadth_first_search_change_next_to_next_with_ghosts(self):
+ # To make the API robust, we allow calling both next() and
+ # next_with_ghosts() on the same searcher.
+ graph = self.make_graph({
+ 'head':['present'],
+ 'present':['child', 'ghost'],
+ 'child':[],
+ })
+ # start with next_with_ghosts
+ search = graph._make_breadth_first_searcher(['head'])
+ self.assertEqual((set(['head']), set()), search.next_with_ghosts())
+ self.assertEqual(set(['present']), search.next())
+ self.assertEqual((set(['child']), set(['ghost'])),
+ search.next_with_ghosts())
+ self.assertRaises(StopIteration, search.next)
+ # start with next
+ search = graph._make_breadth_first_searcher(['head'])
+ self.assertEqual(set(['head']), search.next())
+ self.assertEqual((set(['present']), set()), search.next_with_ghosts())
+ self.assertEqual(set(['child', 'ghost']),
+ search.next())
+ self.assertRaises(StopIteration, search.next_with_ghosts)
+
+ def test_breadth_first_change_search(self):
+ # Changing the search should work with both next and next_with_ghosts.
+ graph = self.make_graph({
+ 'head':['present'],
+ 'present':['stopped'],
+ 'other':['other_2'],
+ 'other_2':[],
+ })
+ search = graph._make_breadth_first_searcher(['head'])
+ self.assertEqual((set(['head']), set()), search.next_with_ghosts())
+ self.assertEqual((set(['present']), set()), search.next_with_ghosts())
+ self.assertEqual(set(['present']),
+ search.stop_searching_any(['present']))
+ self.assertEqual((set(['other']), set(['other_ghost'])),
+ search.start_searching(['other', 'other_ghost']))
+ self.assertEqual((set(['other_2']), set()), search.next_with_ghosts())
+ self.assertRaises(StopIteration, search.next_with_ghosts)
+ # next includes them
+ search = graph._make_breadth_first_searcher(['head'])
+ self.assertEqual(set(['head']), search.next())
+ self.assertEqual(set(['present']), search.next())
+ self.assertEqual(set(['present']),
+ search.stop_searching_any(['present']))
+ search.start_searching(['other', 'other_ghost'])
+ self.assertEqual(set(['other_2']), search.next())
+ self.assertRaises(StopIteration, search.next)
+
+ def assertSeenAndResult(self, instructions, search, next):
+ """Check the results of .seen and get_result() for a seach.
+
+ :param instructions: A list of tuples:
+ (seen, recipe, included_keys, starts, stops).
+ seen, recipe and included_keys are results to check on the search
+ and the searches get_result(). starts and stops are parameters to
+ pass to start_searching and stop_searching_any during each
+ iteration, if they are not None.
+ :param search: The search to use.
+ :param next: A callable to advance the search.
+ """
+ for seen, recipe, included_keys, starts, stops in instructions:
+ # Adjust for recipe contract changes that don't vary for all the
+ # current tests.
+ recipe = ('search',) + recipe
+ next()
+ if starts is not None:
+ search.start_searching(starts)
+ if stops is not None:
+ search.stop_searching_any(stops)
+ state = search.get_state()
+ self.assertEqual(set(included_keys), state[2])
+ self.assertEqual(seen, search.seen)
+
+ def test_breadth_first_get_result_excludes_current_pending(self):
+ graph = self.make_graph({
+ 'head':['child'],
+ 'child':[NULL_REVISION],
+ NULL_REVISION:[],
+ })
+ search = graph._make_breadth_first_searcher(['head'])
+ # At the start, nothing has been seen, to its all excluded:
+ state = search.get_state()
+ self.assertEqual((set(['head']), set(['head']), set()),
+ state)
+ self.assertEqual(set(), search.seen)
+ # using next:
+ expected = [
+ (set(['head']), (set(['head']), set(['child']), 1),
+ ['head'], None, None),
+ (set(['head', 'child']), (set(['head']), set([NULL_REVISION]), 2),
+ ['head', 'child'], None, None),
+ (set(['head', 'child', NULL_REVISION]), (set(['head']), set(), 3),
+ ['head', 'child', NULL_REVISION], None, None),
+ ]
+ self.assertSeenAndResult(expected, search, search.next)
+ # using next_with_ghosts:
+ search = graph._make_breadth_first_searcher(['head'])
+ self.assertSeenAndResult(expected, search, search.next_with_ghosts)
+
+ def test_breadth_first_get_result_starts_stops(self):
+ graph = self.make_graph({
+ 'head':['child'],
+ 'child':[NULL_REVISION],
+ 'otherhead':['otherchild'],
+ 'otherchild':['excluded'],
+ 'excluded':[NULL_REVISION],
+ NULL_REVISION:[]
+ })
+ search = graph._make_breadth_first_searcher([])
+ # Starting with nothing and adding a search works:
+ search.start_searching(['head'])
+ # head has been seen:
+ state = search.get_state()
+ self.assertEqual((set(['head']), set(['child']), set(['head'])),
+ state)
+ self.assertEqual(set(['head']), search.seen)
+ # using next:
+ expected = [
+ # stop at child, and start a new search at otherhead:
+ # - otherhead counts as seen immediately when start_searching is
+ # called.
+ (set(['head', 'child', 'otherhead']),
+ (set(['head', 'otherhead']), set(['child', 'otherchild']), 2),
+ ['head', 'otherhead'], ['otherhead'], ['child']),
+ (set(['head', 'child', 'otherhead', 'otherchild']),
+ (set(['head', 'otherhead']), set(['child', 'excluded']), 3),
+ ['head', 'otherhead', 'otherchild'], None, None),
+ # stop searching excluded now
+ (set(['head', 'child', 'otherhead', 'otherchild', 'excluded']),
+ (set(['head', 'otherhead']), set(['child', 'excluded']), 3),
+ ['head', 'otherhead', 'otherchild'], None, ['excluded']),
+ ]
+ self.assertSeenAndResult(expected, search, search.next)
+ # using next_with_ghosts:
+ search = graph._make_breadth_first_searcher([])
+ search.start_searching(['head'])
+ self.assertSeenAndResult(expected, search, search.next_with_ghosts)
+
+ def test_breadth_first_stop_searching_not_queried(self):
+ # A client should be able to say 'stop node X' even if X has not been
+ # returned to the client.
+ graph = self.make_graph({
+ 'head':['child', 'ghost1'],
+ 'child':[NULL_REVISION],
+ NULL_REVISION:[],
+ })
+ search = graph._make_breadth_first_searcher(['head'])
+ expected = [
+ # NULL_REVISION and ghost1 have not been returned
+ (set(['head']),
+ (set(['head']), set(['child', NULL_REVISION, 'ghost1']), 1),
+ ['head'], None, [NULL_REVISION, 'ghost1']),
+ # ghost1 has been returned, NULL_REVISION is to be returned in the
+ # next iteration.
+ (set(['head', 'child', 'ghost1']),
+ (set(['head']), set(['ghost1', NULL_REVISION]), 2),
+ ['head', 'child'], None, [NULL_REVISION, 'ghost1']),
+ ]
+ self.assertSeenAndResult(expected, search, search.next)
+ # using next_with_ghosts:
+ search = graph._make_breadth_first_searcher(['head'])
+ self.assertSeenAndResult(expected, search, search.next_with_ghosts)
+
+ def test_breadth_first_stop_searching_late(self):
+ # A client should be able to say 'stop node X' and have it excluded
+ # from the result even if X was seen in an older iteration of the
+ # search.
+ graph = self.make_graph({
+ 'head':['middle'],
+ 'middle':['child'],
+ 'child':[NULL_REVISION],
+ NULL_REVISION:[],
+ })
+ search = graph._make_breadth_first_searcher(['head'])
+ expected = [
+ (set(['head']), (set(['head']), set(['middle']), 1),
+ ['head'], None, None),
+ (set(['head', 'middle']), (set(['head']), set(['child']), 2),
+ ['head', 'middle'], None, None),
+ # 'middle' came from the previous iteration, but we don't stop
+ # searching it until *after* advancing the searcher.
+ (set(['head', 'middle', 'child']),
+ (set(['head']), set(['middle', 'child']), 1),
+ ['head'], None, ['middle', 'child']),
+ ]
+ self.assertSeenAndResult(expected, search, search.next)
+ # using next_with_ghosts:
+ search = graph._make_breadth_first_searcher(['head'])
+ self.assertSeenAndResult(expected, search, search.next_with_ghosts)
+
+ def test_breadth_first_get_result_ghosts_are_excluded(self):
+ graph = self.make_graph({
+ 'head':['child', 'ghost'],
+ 'child':[NULL_REVISION],
+ NULL_REVISION:[],
+ })
+ search = graph._make_breadth_first_searcher(['head'])
+ # using next:
+ expected = [
+ (set(['head']),
+ (set(['head']), set(['ghost', 'child']), 1),
+ ['head'], None, None),
+ (set(['head', 'child', 'ghost']),
+ (set(['head']), set([NULL_REVISION, 'ghost']), 2),
+ ['head', 'child'], None, None),
+ ]
+ self.assertSeenAndResult(expected, search, search.next)
+ # using next_with_ghosts:
+ search = graph._make_breadth_first_searcher(['head'])
+ self.assertSeenAndResult(expected, search, search.next_with_ghosts)
+
+ def test_breadth_first_get_result_starting_a_ghost_ghost_is_excluded(self):
+ graph = self.make_graph({
+ 'head':['child'],
+ 'child':[NULL_REVISION],
+ NULL_REVISION:[],
+ })
+ search = graph._make_breadth_first_searcher(['head'])
+ # using next:
+ expected = [
+ (set(['head', 'ghost']),
+ (set(['head', 'ghost']), set(['child', 'ghost']), 1),
+ ['head'], ['ghost'], None),
+ (set(['head', 'child', 'ghost']),
+ (set(['head', 'ghost']), set([NULL_REVISION, 'ghost']), 2),
+ ['head', 'child'], None, None),
+ ]
+ self.assertSeenAndResult(expected, search, search.next)
+ # using next_with_ghosts:
+ search = graph._make_breadth_first_searcher(['head'])
+ self.assertSeenAndResult(expected, search, search.next_with_ghosts)
+
+ def test_breadth_first_revision_count_includes_NULL_REVISION(self):
+ graph = self.make_graph({
+ 'head':[NULL_REVISION],
+ NULL_REVISION:[],
+ })
+ search = graph._make_breadth_first_searcher(['head'])
+ # using next:
+ expected = [
+ (set(['head']),
+ (set(['head']), set([NULL_REVISION]), 1),
+ ['head'], None, None),
+ (set(['head', NULL_REVISION]),
+ (set(['head']), set([]), 2),
+ ['head', NULL_REVISION], None, None),
+ ]
+ self.assertSeenAndResult(expected, search, search.next)
+ # using next_with_ghosts:
+ search = graph._make_breadth_first_searcher(['head'])
+ self.assertSeenAndResult(expected, search, search.next_with_ghosts)
+
+ def test_breadth_first_search_get_result_after_StopIteration(self):
+ # StopIteration should not invalid anything..
+ graph = self.make_graph({
+ 'head':[NULL_REVISION],
+ NULL_REVISION:[],
+ })
+ search = graph._make_breadth_first_searcher(['head'])
+ # using next:
+ expected = [
+ (set(['head']),
+ (set(['head']), set([NULL_REVISION]), 1),
+ ['head'], None, None),
+ (set(['head', 'ghost', NULL_REVISION]),
+ (set(['head', 'ghost']), set(['ghost']), 2),
+ ['head', NULL_REVISION], ['ghost'], None),
+ ]
+ self.assertSeenAndResult(expected, search, search.next)
+ self.assertRaises(StopIteration, search.next)
+ self.assertEqual(set(['head', 'ghost', NULL_REVISION]), search.seen)
+ state = search.get_state()
+ self.assertEqual(
+ (set(['ghost', 'head']), set(['ghost']),
+ set(['head', NULL_REVISION])),
+ state)
+ # using next_with_ghosts:
+ search = graph._make_breadth_first_searcher(['head'])
+ self.assertSeenAndResult(expected, search, search.next_with_ghosts)
+ self.assertRaises(StopIteration, search.next)
+ self.assertEqual(set(['head', 'ghost', NULL_REVISION]), search.seen)
+ state = search.get_state()
+ self.assertEqual(
+ (set(['ghost', 'head']), set(['ghost']),
+ set(['head', NULL_REVISION])),
+ state)
+
+
+class TestFindUniqueAncestors(TestGraphBase):
+
+ def assertFindUniqueAncestors(self, graph, expected, node, common):
+ actual = graph.find_unique_ancestors(node, common)
+ self.assertEqual(expected, sorted(actual))
+
+ def test_empty_set(self):
+ graph = self.make_graph(ancestry_1)
+ self.assertFindUniqueAncestors(graph, [], 'rev1', ['rev1'])
+ self.assertFindUniqueAncestors(graph, [], 'rev2b', ['rev2b'])
+ self.assertFindUniqueAncestors(graph, [], 'rev3', ['rev1', 'rev3'])
+
+ def test_single_node(self):
+ graph = self.make_graph(ancestry_1)
+ self.assertFindUniqueAncestors(graph, ['rev2a'], 'rev2a', ['rev1'])
+ self.assertFindUniqueAncestors(graph, ['rev2b'], 'rev2b', ['rev1'])
+ self.assertFindUniqueAncestors(graph, ['rev3'], 'rev3', ['rev2a'])
+
+ def test_minimal_ancestry(self):
+ graph = self.make_breaking_graph(extended_history_shortcut,
+ [NULL_REVISION, 'a', 'b'])
+ self.assertFindUniqueAncestors(graph, ['e'], 'e', ['d'])
+
+ graph = self.make_breaking_graph(extended_history_shortcut,
+ ['b'])
+ self.assertFindUniqueAncestors(graph, ['f'], 'f', ['a', 'd'])
+
+ graph = self.make_breaking_graph(complex_shortcut,
+ ['a', 'b'])
+ self.assertFindUniqueAncestors(graph, ['h'], 'h', ['i'])
+ self.assertFindUniqueAncestors(graph, ['e', 'g', 'i'], 'i', ['h'])
+ self.assertFindUniqueAncestors(graph, ['h'], 'h', ['g'])
+ self.assertFindUniqueAncestors(graph, ['h'], 'h', ['j'])
+
+ def test_in_ancestry(self):
+ graph = self.make_graph(ancestry_1)
+ self.assertFindUniqueAncestors(graph, [], 'rev1', ['rev3'])
+ self.assertFindUniqueAncestors(graph, [], 'rev2b', ['rev4'])
+
+ def test_multiple_revisions(self):
+ graph = self.make_graph(ancestry_1)
+ self.assertFindUniqueAncestors(graph,
+ ['rev4'], 'rev4', ['rev3', 'rev2b'])
+ self.assertFindUniqueAncestors(graph,
+ ['rev2a', 'rev3', 'rev4'], 'rev4', ['rev2b'])
+
+ def test_complex_shortcut(self):
+ graph = self.make_graph(complex_shortcut)
+ self.assertFindUniqueAncestors(graph,
+ ['h', 'n'], 'n', ['m'])
+ self.assertFindUniqueAncestors(graph,
+ ['e', 'i', 'm'], 'm', ['n'])
+
+ def test_complex_shortcut2(self):
+ graph = self.make_graph(complex_shortcut2)
+ self.assertFindUniqueAncestors(graph,
+ ['j', 'u'], 'u', ['t'])
+ self.assertFindUniqueAncestors(graph,
+ ['t'], 't', ['u'])
+
+ def test_multiple_interesting_unique(self):
+ graph = self.make_graph(multiple_interesting_unique)
+ self.assertFindUniqueAncestors(graph,
+ ['j', 'y'], 'y', ['z'])
+ self.assertFindUniqueAncestors(graph,
+ ['p', 'z'], 'z', ['y'])
+
+ def test_racing_shortcuts(self):
+ graph = self.make_graph(racing_shortcuts)
+ self.assertFindUniqueAncestors(graph,
+ ['p', 'q', 'z'], 'z', ['y'])
+ self.assertFindUniqueAncestors(graph,
+ ['h', 'i', 'j', 'y'], 'j', ['z'])
+
+
+class TestGraphFindDistanceToNull(TestGraphBase):
+ """Test an api that should be able to compute a revno"""
+
+ def assertFindDistance(self, revno, graph, target_id, known_ids):
+ """Assert the output of Graph.find_distance_to_null()"""
+ actual = graph.find_distance_to_null(target_id, known_ids)
+ self.assertEqual(revno, actual)
+
+ def test_nothing_known(self):
+ graph = self.make_graph(ancestry_1)
+ self.assertFindDistance(0, graph, NULL_REVISION, [])
+ self.assertFindDistance(1, graph, 'rev1', [])
+ self.assertFindDistance(2, graph, 'rev2a', [])
+ self.assertFindDistance(2, graph, 'rev2b', [])
+ self.assertFindDistance(3, graph, 'rev3', [])
+ self.assertFindDistance(4, graph, 'rev4', [])
+
+ def test_rev_is_ghost(self):
+ graph = self.make_graph(ancestry_1)
+ e = self.assertRaises(errors.GhostRevisionsHaveNoRevno,
+ graph.find_distance_to_null, 'rev_missing', [])
+ self.assertEqual('rev_missing', e.revision_id)
+ self.assertEqual('rev_missing', e.ghost_revision_id)
+
+ def test_ancestor_is_ghost(self):
+ graph = self.make_graph({'rev':['parent']})
+ e = self.assertRaises(errors.GhostRevisionsHaveNoRevno,
+ graph.find_distance_to_null, 'rev', [])
+ self.assertEqual('rev', e.revision_id)
+ self.assertEqual('parent', e.ghost_revision_id)
+
+ def test_known_in_ancestry(self):
+ graph = self.make_graph(ancestry_1)
+ self.assertFindDistance(2, graph, 'rev2a', [('rev1', 1)])
+ self.assertFindDistance(3, graph, 'rev3', [('rev2a', 2)])
+
+ def test_known_in_ancestry_limits(self):
+ graph = self.make_breaking_graph(ancestry_1, ['rev1'])
+ self.assertFindDistance(4, graph, 'rev4', [('rev3', 3)])
+
+ def test_target_is_ancestor(self):
+ graph = self.make_graph(ancestry_1)
+ self.assertFindDistance(2, graph, 'rev2a', [('rev3', 3)])
+
+ def test_target_is_ancestor_limits(self):
+ """We shouldn't search all history if we run into ourselves"""
+ graph = self.make_breaking_graph(ancestry_1, ['rev1'])
+ self.assertFindDistance(3, graph, 'rev3', [('rev4', 4)])
+
+ def test_target_parallel_to_known_limits(self):
+ # Even though the known revision isn't part of the other ancestry, they
+ # eventually converge
+ graph = self.make_breaking_graph(with_tail, ['a'])
+ self.assertFindDistance(6, graph, 'f', [('g', 6)])
+ self.assertFindDistance(7, graph, 'h', [('g', 6)])
+ self.assertFindDistance(8, graph, 'i', [('g', 6)])
+ self.assertFindDistance(6, graph, 'g', [('i', 8)])
+
+
+class TestFindMergeOrder(TestGraphBase):
+
+ def assertMergeOrder(self, expected, graph, tip, base_revisions):
+ self.assertEqual(expected, graph.find_merge_order(tip, base_revisions))
+
+ def test_parents(self):
+ graph = self.make_graph(ancestry_1)
+ self.assertMergeOrder(['rev3', 'rev2b'], graph, 'rev4',
+ ['rev3', 'rev2b'])
+ self.assertMergeOrder(['rev3', 'rev2b'], graph, 'rev4',
+ ['rev2b', 'rev3'])
+
+ def test_ancestors(self):
+ graph = self.make_graph(ancestry_1)
+ self.assertMergeOrder(['rev1', 'rev2b'], graph, 'rev4',
+ ['rev1', 'rev2b'])
+ self.assertMergeOrder(['rev1', 'rev2b'], graph, 'rev4',
+ ['rev2b', 'rev1'])
+
+ def test_shortcut_one_ancestor(self):
+ # When we have enough info, we can stop searching
+ graph = self.make_breaking_graph(ancestry_1, ['rev3', 'rev2b', 'rev4'])
+ # Single ancestors shortcut right away
+ self.assertMergeOrder(['rev3'], graph, 'rev4', ['rev3'])
+
+ def test_shortcut_after_one_ancestor(self):
+ graph = self.make_breaking_graph(ancestry_1, ['rev2a', 'rev2b'])
+ self.assertMergeOrder(['rev3', 'rev1'], graph, 'rev4', ['rev1', 'rev3'])
+
+
+class TestFindDescendants(TestGraphBase):
+
+ def test_find_descendants_rev1_rev3(self):
+ graph = self.make_graph(ancestry_1)
+ descendants = graph.find_descendants('rev1', 'rev3')
+ self.assertEqual(set(['rev1', 'rev2a', 'rev3']), descendants)
+
+ def test_find_descendants_rev1_rev4(self):
+ graph = self.make_graph(ancestry_1)
+ descendants = graph.find_descendants('rev1', 'rev4')
+ self.assertEqual(set(['rev1', 'rev2a', 'rev2b', 'rev3', 'rev4']),
+ descendants)
+
+ def test_find_descendants_rev2a_rev4(self):
+ graph = self.make_graph(ancestry_1)
+ descendants = graph.find_descendants('rev2a', 'rev4')
+ self.assertEqual(set(['rev2a', 'rev3', 'rev4']), descendants)
+
+class TestFindLefthandMerger(TestGraphBase):
+
+ def check_merger(self, result, ancestry, merged, tip):
+ graph = self.make_graph(ancestry)
+ self.assertEqual(result, graph.find_lefthand_merger(merged, tip))
+
+ def test_find_lefthand_merger_rev2b(self):
+ self.check_merger('rev4', ancestry_1, 'rev2b', 'rev4')
+
+ def test_find_lefthand_merger_rev2a(self):
+ self.check_merger('rev2a', ancestry_1, 'rev2a', 'rev4')
+
+ def test_find_lefthand_merger_rev4(self):
+ self.check_merger(None, ancestry_1, 'rev4', 'rev2a')
+
+ def test_find_lefthand_merger_f(self):
+ self.check_merger('i', complex_shortcut, 'f', 'm')
+
+ def test_find_lefthand_merger_g(self):
+ self.check_merger('i', complex_shortcut, 'g', 'm')
+
+ def test_find_lefthand_merger_h(self):
+ self.check_merger('n', complex_shortcut, 'h', 'n')
+
+
+class TestGetChildMap(TestGraphBase):
+
+ def test_get_child_map(self):
+ graph = self.make_graph(ancestry_1)
+ child_map = graph.get_child_map(['rev4', 'rev3', 'rev2a', 'rev2b'])
+ self.assertEqual({'rev1': ['rev2a', 'rev2b'],
+ 'rev2a': ['rev3'],
+ 'rev2b': ['rev4'],
+ 'rev3': ['rev4']},
+ child_map)
+
+
+class TestCachingParentsProvider(tests.TestCase):
+ """These tests run with:
+
+ self.inst_pp, a recording parents provider with a graph of a->b, and b is a
+ ghost.
+ self.caching_pp, a CachingParentsProvider layered on inst_pp.
+ """
+
+ def setUp(self):
+ super(TestCachingParentsProvider, self).setUp()
+ dict_pp = _mod_graph.DictParentsProvider({'a': ('b',)})
+ self.inst_pp = InstrumentedParentsProvider(dict_pp)
+ self.caching_pp = _mod_graph.CachingParentsProvider(self.inst_pp)
+
+ def test_get_parent_map(self):
+ """Requesting the same revision should be returned from cache"""
+ self.assertEqual({}, self.caching_pp._cache)
+ self.assertEqual({'a':('b',)}, self.caching_pp.get_parent_map(['a']))
+ self.assertEqual(['a'], self.inst_pp.calls)
+ self.assertEqual({'a':('b',)}, self.caching_pp.get_parent_map(['a']))
+ # No new call, as it should have been returned from the cache
+ self.assertEqual(['a'], self.inst_pp.calls)
+ self.assertEqual({'a':('b',)}, self.caching_pp._cache)
+
+ def test_get_parent_map_not_present(self):
+ """The cache should also track when a revision doesn't exist"""
+ self.assertEqual({}, self.caching_pp.get_parent_map(['b']))
+ self.assertEqual(['b'], self.inst_pp.calls)
+ self.assertEqual({}, self.caching_pp.get_parent_map(['b']))
+ # No new calls
+ self.assertEqual(['b'], self.inst_pp.calls)
+
+ def test_get_parent_map_mixed(self):
+ """Anything that can be returned from cache, should be"""
+ self.assertEqual({}, self.caching_pp.get_parent_map(['b']))
+ self.assertEqual(['b'], self.inst_pp.calls)
+ self.assertEqual({'a':('b',)},
+ self.caching_pp.get_parent_map(['a', 'b']))
+ self.assertEqual(['b', 'a'], self.inst_pp.calls)
+
+ def test_get_parent_map_repeated(self):
+ """Asking for the same parent 2x will only forward 1 request."""
+ self.assertEqual({'a':('b',)},
+ self.caching_pp.get_parent_map(['b', 'a', 'b']))
+ # Use sorted because we don't care about the order, just that each is
+ # only present 1 time.
+ self.assertEqual(['a', 'b'], sorted(self.inst_pp.calls))
+
+ def test_note_missing_key(self):
+ """After noting that a key is missing it is cached."""
+ self.caching_pp.note_missing_key('b')
+ self.assertEqual({}, self.caching_pp.get_parent_map(['b']))
+ self.assertEqual([], self.inst_pp.calls)
+ self.assertEqual(set(['b']), self.caching_pp.missing_keys)
+
+ def test_get_cached_parent_map(self):
+ self.assertEqual({}, self.caching_pp.get_cached_parent_map(['a']))
+ self.assertEqual([], self.inst_pp.calls)
+ self.assertEqual({'a': ('b',)}, self.caching_pp.get_parent_map(['a']))
+ self.assertEqual(['a'], self.inst_pp.calls)
+ self.assertEqual({'a': ('b',)},
+ self.caching_pp.get_cached_parent_map(['a']))
+
+
+class TestCachingParentsProviderExtras(tests.TestCaseWithTransport):
+ """Test the behaviour when parents are provided that were not requested."""
+
+ def setUp(self):
+ super(TestCachingParentsProviderExtras, self).setUp()
+ class ExtraParentsProvider(object):
+
+ def get_parent_map(self, keys):
+ return {'rev1': [], 'rev2': ['rev1',]}
+
+ self.inst_pp = InstrumentedParentsProvider(ExtraParentsProvider())
+ self.caching_pp = _mod_graph.CachingParentsProvider(
+ get_parent_map=self.inst_pp.get_parent_map)
+
+ def test_uncached(self):
+ self.caching_pp.disable_cache()
+ self.assertEqual({'rev1': []},
+ self.caching_pp.get_parent_map(['rev1']))
+ self.assertEqual(['rev1'], self.inst_pp.calls)
+ self.assertIs(None, self.caching_pp._cache)
+
+ def test_cache_initially_empty(self):
+ self.assertEqual({}, self.caching_pp._cache)
+
+ def test_cached(self):
+ self.assertEqual({'rev1': []},
+ self.caching_pp.get_parent_map(['rev1']))
+ self.assertEqual(['rev1'], self.inst_pp.calls)
+ self.assertEqual({'rev1': [], 'rev2': ['rev1']},
+ self.caching_pp._cache)
+ self.assertEqual({'rev1': []},
+ self.caching_pp.get_parent_map(['rev1']))
+ self.assertEqual(['rev1'], self.inst_pp.calls)
+
+ def test_disable_cache_clears_cache(self):
+ # Put something in the cache
+ self.caching_pp.get_parent_map(['rev1'])
+ self.assertEqual(2, len(self.caching_pp._cache))
+ self.caching_pp.disable_cache()
+ self.assertIs(None, self.caching_pp._cache)
+
+ def test_enable_cache_raises(self):
+ e = self.assertRaises(AssertionError, self.caching_pp.enable_cache)
+ self.assertEqual('Cache enabled when already enabled.', str(e))
+
+ def test_cache_misses(self):
+ self.caching_pp.get_parent_map(['rev3'])
+ self.caching_pp.get_parent_map(['rev3'])
+ self.assertEqual(['rev3'], self.inst_pp.calls)
+
+ def test_no_cache_misses(self):
+ self.caching_pp.disable_cache()
+ self.caching_pp.enable_cache(cache_misses=False)
+ self.caching_pp.get_parent_map(['rev3'])
+ self.caching_pp.get_parent_map(['rev3'])
+ self.assertEqual(['rev3', 'rev3'], self.inst_pp.calls)
+
+ def test_cache_extras(self):
+ self.assertEqual({}, self.caching_pp.get_parent_map(['rev3']))
+ self.assertEqual({'rev2': ['rev1']},
+ self.caching_pp.get_parent_map(['rev2']))
+ self.assertEqual(['rev3'], self.inst_pp.calls)
+
+ def test_extras_using_cached(self):
+ self.assertEqual({}, self.caching_pp.get_cached_parent_map(['rev3']))
+ self.assertEqual({}, self.caching_pp.get_parent_map(['rev3']))
+ self.assertEqual({'rev2': ['rev1']},
+ self.caching_pp.get_cached_parent_map(['rev2']))
+ self.assertEqual(['rev3'], self.inst_pp.calls)
+
+
+
+class TestCollapseLinearRegions(tests.TestCase):
+
+ def assertCollapsed(self, collapsed, original):
+ self.assertEqual(collapsed,
+ _mod_graph.collapse_linear_regions(original))
+
+ def test_collapse_nothing(self):
+ d = {1:[2, 3], 2:[], 3:[]}
+ self.assertCollapsed(d, d)
+ d = {1:[2], 2:[3, 4], 3:[5], 4:[5], 5:[]}
+ self.assertCollapsed(d, d)
+
+ def test_collapse_chain(self):
+ # Any time we have a linear chain, we should be able to collapse
+ d = {1:[2], 2:[3], 3:[4], 4:[5], 5:[]}
+ self.assertCollapsed({1:[5], 5:[]}, d)
+ d = {5:[4], 4:[3], 3:[2], 2:[1], 1:[]}
+ self.assertCollapsed({5:[1], 1:[]}, d)
+ d = {5:[3], 3:[4], 4:[1], 1:[2], 2:[]}
+ self.assertCollapsed({5:[2], 2:[]}, d)
+
+ def test_collapse_with_multiple_children(self):
+ # 7
+ # |
+ # 6
+ # / \
+ # 4 5
+ # | |
+ # 2 3
+ # \ /
+ # 1
+ #
+ # 4 and 5 cannot be removed because 6 has 2 children
+ # 2 and 3 cannot be removed because 1 has 2 parents
+ d = {1:[2, 3], 2:[4], 4:[6], 3:[5], 5:[6], 6:[7], 7:[]}
+ self.assertCollapsed(d, d)
+
+
+class TestGraphThunkIdsToKeys(tests.TestCase):
+
+ def test_heads(self):
+ # A
+ # |\
+ # B C
+ # |/
+ # D
+ d = {('D',): [('B',), ('C',)], ('C',):[('A',)],
+ ('B',): [('A',)], ('A',): []}
+ g = _mod_graph.Graph(_mod_graph.DictParentsProvider(d))
+ graph_thunk = _mod_graph.GraphThunkIdsToKeys(g)
+ self.assertEqual(['D'], sorted(graph_thunk.heads(['D', 'A'])))
+ self.assertEqual(['D'], sorted(graph_thunk.heads(['D', 'B'])))
+ self.assertEqual(['D'], sorted(graph_thunk.heads(['D', 'C'])))
+ self.assertEqual(['B', 'C'], sorted(graph_thunk.heads(['B', 'C'])))
+
+ def test_add_node(self):
+ d = {('C',):[('A',)], ('B',): [('A',)], ('A',): []}
+ g = _mod_graph.KnownGraph(d)
+ graph_thunk = _mod_graph.GraphThunkIdsToKeys(g)
+ graph_thunk.add_node("D", ["A", "C"])
+ self.assertEqual(['B', 'D'],
+ sorted(graph_thunk.heads(['D', 'B', 'A'])))
+
+ def test_merge_sort(self):
+ d = {('C',):[('A',)], ('B',): [('A',)], ('A',): []}
+ g = _mod_graph.KnownGraph(d)
+ graph_thunk = _mod_graph.GraphThunkIdsToKeys(g)
+ graph_thunk.add_node("D", ["A", "C"])
+ self.assertEqual([('C', 0, (2,), False), ('A', 0, (1,), True)],
+ [(n.key, n.merge_depth, n.revno, n.end_of_merge)
+ for n in graph_thunk.merge_sort('C')])
+
+
+class TestStackedParentsProvider(tests.TestCase):
+
+ def setUp(self):
+ super(TestStackedParentsProvider, self).setUp()
+ self.calls = []
+
+ def get_shared_provider(self, info, ancestry, has_cached):
+ pp = _mod_graph.DictParentsProvider(ancestry)
+ if has_cached:
+ pp.get_cached_parent_map = pp.get_parent_map
+ return SharedInstrumentedParentsProvider(pp, self.calls, info)
+
+ def test_stacked_parents_provider(self):
+ parents1 = _mod_graph.DictParentsProvider({'rev2': ['rev3']})
+ parents2 = _mod_graph.DictParentsProvider({'rev1': ['rev4']})
+ stacked = _mod_graph.StackedParentsProvider([parents1, parents2])
+ self.assertEqual({'rev1':['rev4'], 'rev2':['rev3']},
+ stacked.get_parent_map(['rev1', 'rev2']))
+ self.assertEqual({'rev2':['rev3'], 'rev1':['rev4']},
+ stacked.get_parent_map(['rev2', 'rev1']))
+ self.assertEqual({'rev2':['rev3']},
+ stacked.get_parent_map(['rev2', 'rev2']))
+ self.assertEqual({'rev1':['rev4']},
+ stacked.get_parent_map(['rev1', 'rev1']))
+
+ def test_stacked_parents_provider_overlapping(self):
+ # rev2 is availible in both providers.
+ # 1
+ # |
+ # 2
+ parents1 = _mod_graph.DictParentsProvider({'rev2': ['rev1']})
+ parents2 = _mod_graph.DictParentsProvider({'rev2': ['rev1']})
+ stacked = _mod_graph.StackedParentsProvider([parents1, parents2])
+ self.assertEqual({'rev2': ['rev1']},
+ stacked.get_parent_map(['rev2']))
+
+ def test_handles_no_get_cached_parent_map(self):
+ # this shows that we both handle when a provider doesn't implement
+ # get_cached_parent_map
+ pp1 = self.get_shared_provider('pp1', {'rev2': ('rev1',)},
+ has_cached=False)
+ pp2 = self.get_shared_provider('pp2', {'rev2': ('rev1',)},
+ has_cached=True)
+ stacked = _mod_graph.StackedParentsProvider([pp1, pp2])
+ self.assertEqual({'rev2': ('rev1',)}, stacked.get_parent_map(['rev2']))
+ # No call on 'pp1' because it doesn't provide get_cached_parent_map
+ self.assertEqual([('pp2', 'cached', ['rev2'])], self.calls)
+
+ def test_query_order(self):
+ # We should call get_cached_parent_map on all providers before we call
+ # get_parent_map. Further, we should track what entries we have found,
+ # and not re-try them.
+ pp1 = self.get_shared_provider('pp1', {'a': ()}, has_cached=True)
+ pp2 = self.get_shared_provider('pp2', {'c': ('b',)}, has_cached=False)
+ pp3 = self.get_shared_provider('pp3', {'b': ('a',)}, has_cached=True)
+ stacked = _mod_graph.StackedParentsProvider([pp1, pp2, pp3])
+ self.assertEqual({'a': (), 'b': ('a',), 'c': ('b',)},
+ stacked.get_parent_map(['a', 'b', 'c', 'd']))
+ self.assertEqual([('pp1', 'cached', ['a', 'b', 'c', 'd']),
+ # No call to pp2, because it doesn't have cached
+ ('pp3', 'cached', ['b', 'c', 'd']),
+ ('pp1', ['c', 'd']),
+ ('pp2', ['c', 'd']),
+ ('pp3', ['d']),
+ ], self.calls)