summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/git/test_actor.py44
-rw-r--r--test/git/test_base.py160
-rw-r--r--test/git/test_blob.py36
-rw-r--r--test/git/test_commit.py304
-rw-r--r--test/git/test_config.py182
-rw-r--r--test/git/test_diff.py148
-rw-r--r--test/git/test_git.py128
-rw-r--r--test/git/test_index.py1022
-rw-r--r--test/git/test_performance.py74
-rw-r--r--test/git/test_refs.py722
-rw-r--r--test/git/test_remote.py844
-rw-r--r--test/git/test_repo.py508
-rw-r--r--test/git/test_stats.py30
-rw-r--r--test/git/test_tree.py106
-rw-r--r--test/git/test_utils.py204
-rw-r--r--test/testlib/__init__.py2
-rw-r--r--test/testlib/asserts.py46
-rw-r--r--test/testlib/helper.py468
18 files changed, 2514 insertions, 2514 deletions
diff --git a/test/git/test_actor.py b/test/git/test_actor.py
index 2941468d..8fda57e5 100644
--- a/test/git/test_actor.py
+++ b/test/git/test_actor.py
@@ -9,28 +9,28 @@ from test.testlib import *
from git import *
class TestActor(object):
- def test_from_string_should_separate_name_and_email(self):
- a = Actor._from_string("Michael Trier <mtrier@example.com>")
- assert_equal("Michael Trier", a.name)
- assert_equal("mtrier@example.com", a.email)
-
- # base type capabilities
- assert a == a
- assert not ( a != a )
- m = set()
- m.add(a)
- m.add(a)
- assert len(m) == 1
+ def test_from_string_should_separate_name_and_email(self):
+ a = Actor._from_string("Michael Trier <mtrier@example.com>")
+ assert_equal("Michael Trier", a.name)
+ assert_equal("mtrier@example.com", a.email)
+
+ # base type capabilities
+ assert a == a
+ assert not ( a != a )
+ m = set()
+ m.add(a)
+ m.add(a)
+ assert len(m) == 1
- def test_from_string_should_handle_just_name(self):
- a = Actor._from_string("Michael Trier")
- assert_equal("Michael Trier", a.name)
- assert_equal(None, a.email)
+ def test_from_string_should_handle_just_name(self):
+ a = Actor._from_string("Michael Trier")
+ assert_equal("Michael Trier", a.name)
+ assert_equal(None, a.email)
- def test_should_display_representation(self):
- a = Actor._from_string("Michael Trier <mtrier@example.com>")
- assert_equal('<git.Actor "Michael Trier <mtrier@example.com>">', repr(a))
+ def test_should_display_representation(self):
+ a = Actor._from_string("Michael Trier <mtrier@example.com>")
+ assert_equal('<git.Actor "Michael Trier <mtrier@example.com>">', repr(a))
- def test_str_should_alias_name(self):
- a = Actor._from_string("Michael Trier <mtrier@example.com>")
- assert_equal(a.name, str(a)) \ No newline at end of file
+ def test_str_should_alias_name(self):
+ a = Actor._from_string("Michael Trier <mtrier@example.com>")
+ assert_equal(a.name, str(a)) \ No newline at end of file
diff --git a/test/git/test_base.py b/test/git/test_base.py
index ec85c2a7..81931ad0 100644
--- a/test/git/test_base.py
+++ b/test/git/test_base.py
@@ -15,84 +15,84 @@ from git.objects.utils import get_object_type_by_name
import tempfile
class TestBase(TestBase):
-
- type_tuples = ( ("blob", "8741fc1d09d61f02ffd8cded15ff603eff1ec070", "blob.py"),
- ("tree", "3a6a5e3eeed3723c09f1ef0399f81ed6b8d82e79", "directory"),
- ("commit", "4251bd59fb8e11e40c40548cba38180a9536118c", None),
- ("tag", "e56a60e8e9cd333cfba0140a77cd12b0d9398f10", None) )
-
- def test_base_object(self):
- # test interface of base object classes
- types = (Blob, Tree, Commit, TagObject)
- assert len(types) == len(self.type_tuples)
-
- s = set()
- num_objs = 0
- num_index_objs = 0
- for obj_type, (typename, hexsha, path) in zip(types, self.type_tuples):
- item = None
- if path is None:
- item = obj_type(self.rorepo,hexsha)
- else:
- item = obj_type(self.rorepo,hexsha, 0, path)
- num_objs += 1
- assert item.sha == hexsha
- assert item.type == typename
- assert item.size
- assert item.data
- assert item == item
- assert not item != item
- assert str(item) == item.sha
- assert repr(item)
- s.add(item)
-
- if isinstance(item, base.IndexObject):
- num_index_objs += 1
- if hasattr(item,'path'): # never runs here
- assert not item.path.startswith("/") # must be relative
- assert isinstance(item.mode, int)
- # END index object check
-
- # read from stream
- data_stream = item.data_stream
- data = data_stream.read()
- assert data
-
- tmpfile = os.tmpfile()
- assert item == item.stream_data(tmpfile)
- tmpfile.seek(0)
- assert tmpfile.read() == data
- # END stream to file directly
- # END for each object type to create
-
- # each has a unique sha
- assert len(s) == num_objs
- assert len(s|s) == num_objs
- assert num_index_objs == 2
-
- def test_get_object_type_by_name(self):
- for tname in base.Object.TYPES:
- assert base.Object in get_object_type_by_name(tname).mro()
- # END for each known type
-
- assert_raises( ValueError, get_object_type_by_name, "doesntexist" )
+
+ type_tuples = ( ("blob", "8741fc1d09d61f02ffd8cded15ff603eff1ec070", "blob.py"),
+ ("tree", "3a6a5e3eeed3723c09f1ef0399f81ed6b8d82e79", "directory"),
+ ("commit", "4251bd59fb8e11e40c40548cba38180a9536118c", None),
+ ("tag", "e56a60e8e9cd333cfba0140a77cd12b0d9398f10", None) )
+
+ def test_base_object(self):
+ # test interface of base object classes
+ types = (Blob, Tree, Commit, TagObject)
+ assert len(types) == len(self.type_tuples)
+
+ s = set()
+ num_objs = 0
+ num_index_objs = 0
+ for obj_type, (typename, hexsha, path) in zip(types, self.type_tuples):
+ item = None
+ if path is None:
+ item = obj_type(self.rorepo,hexsha)
+ else:
+ item = obj_type(self.rorepo,hexsha, 0, path)
+ num_objs += 1
+ assert item.sha == hexsha
+ assert item.type == typename
+ assert item.size
+ assert item.data
+ assert item == item
+ assert not item != item
+ assert str(item) == item.sha
+ assert repr(item)
+ s.add(item)
+
+ if isinstance(item, base.IndexObject):
+ num_index_objs += 1
+ if hasattr(item,'path'): # never runs here
+ assert not item.path.startswith("/") # must be relative
+ assert isinstance(item.mode, int)
+ # END index object check
+
+ # read from stream
+ data_stream = item.data_stream
+ data = data_stream.read()
+ assert data
+
+ tmpfile = os.tmpfile()
+ assert item == item.stream_data(tmpfile)
+ tmpfile.seek(0)
+ assert tmpfile.read() == data
+ # END stream to file directly
+ # END for each object type to create
+
+ # each has a unique sha
+ assert len(s) == num_objs
+ assert len(s|s) == num_objs
+ assert num_index_objs == 2
+
+ def test_get_object_type_by_name(self):
+ for tname in base.Object.TYPES:
+ assert base.Object in get_object_type_by_name(tname).mro()
+ # END for each known type
+
+ assert_raises( ValueError, get_object_type_by_name, "doesntexist" )
- def test_object_resolution(self):
- # objects must be resolved to shas so they compare equal
- assert self.rorepo.head.reference.object == self.rorepo.active_branch.object
-
- @with_bare_rw_repo
- def test_with_bare_rw_repo(self, bare_rw_repo):
- assert bare_rw_repo.config_reader("repository").getboolean("core", "bare")
- assert os.path.isfile(os.path.join(bare_rw_repo.git_dir,'HEAD'))
-
- @with_rw_repo('0.1.6')
- def test_with_rw_repo(self, rw_repo):
- assert not rw_repo.config_reader("repository").getboolean("core", "bare")
- assert os.path.isdir(os.path.join(rw_repo.working_tree_dir,'lib'))
-
- @with_rw_and_rw_remote_repo('0.1.6')
- def test_with_rw_remote_and_rw_repo(self, rw_repo, rw_remote_repo):
- assert not rw_repo.config_reader("repository").getboolean("core", "bare")
- assert rw_remote_repo.config_reader("repository").getboolean("core", "bare")
- assert os.path.isdir(os.path.join(rw_repo.working_tree_dir,'lib'))
+ def test_object_resolution(self):
+ # objects must be resolved to shas so they compare equal
+ assert self.rorepo.head.reference.object == self.rorepo.active_branch.object
+
+ @with_bare_rw_repo
+ def test_with_bare_rw_repo(self, bare_rw_repo):
+ assert bare_rw_repo.config_reader("repository").getboolean("core", "bare")
+ assert os.path.isfile(os.path.join(bare_rw_repo.git_dir,'HEAD'))
+
+ @with_rw_repo('0.1.6')
+ def test_with_rw_repo(self, rw_repo):
+ assert not rw_repo.config_reader("repository").getboolean("core", "bare")
+ assert os.path.isdir(os.path.join(rw_repo.working_tree_dir,'lib'))
+
+ @with_rw_and_rw_remote_repo('0.1.6')
+ def test_with_rw_remote_and_rw_repo(self, rw_repo, rw_remote_repo):
+ assert not rw_repo.config_reader("repository").getboolean("core", "bare")
+ assert rw_remote_repo.config_reader("repository").getboolean("core", "bare")
+ assert os.path.isdir(os.path.join(rw_repo.working_tree_dir,'lib'))
diff --git a/test/git/test_blob.py b/test/git/test_blob.py
index 464cfd6b..cf70cb8c 100644
--- a/test/git/test_blob.py
+++ b/test/git/test_blob.py
@@ -8,23 +8,23 @@ from test.testlib import *
from git import *
class TestBlob(TestBase):
-
- def test_should_cache_data(self):
- bid = 'a802c139d4767c89dcad79d836d05f7004d39aac'
- blob = Blob(self.rorepo, bid)
- blob.data
- assert blob.data
- blob.size
- blob.size
-
- def test_mime_type_should_return_mime_type_for_known_types(self):
- blob = Blob(self.rorepo, **{'sha': 'abc', 'path': 'foo.png'})
- assert_equal("image/png", blob.mime_type)
+
+ def test_should_cache_data(self):
+ bid = 'a802c139d4767c89dcad79d836d05f7004d39aac'
+ blob = Blob(self.rorepo, bid)
+ blob.data
+ assert blob.data
+ blob.size
+ blob.size
+
+ def test_mime_type_should_return_mime_type_for_known_types(self):
+ blob = Blob(self.rorepo, **{'sha': 'abc', 'path': 'foo.png'})
+ assert_equal("image/png", blob.mime_type)
- def test_mime_type_should_return_text_plain_for_unknown_types(self):
- blob = Blob(self.rorepo, **{'sha': 'abc','path': 'something'})
- assert_equal("text/plain", blob.mime_type)
+ def test_mime_type_should_return_text_plain_for_unknown_types(self):
+ blob = Blob(self.rorepo, **{'sha': 'abc','path': 'something'})
+ assert_equal("text/plain", blob.mime_type)
- def test_should_return_appropriate_representation(self):
- blob = Blob(self.rorepo, **{'sha': 'abc'})
- assert_equal('<git.Blob "abc">', repr(blob))
+ def test_should_return_appropriate_representation(self):
+ blob = Blob(self.rorepo, **{'sha': 'abc'})
+ assert_equal('<git.Blob "abc">', repr(blob))
diff --git a/test/git/test_commit.py b/test/git/test_commit.py
index da18f275..0a8470ec 100644
--- a/test/git/test_commit.py
+++ b/test/git/test_commit.py
@@ -9,166 +9,166 @@ from git import *
class TestCommit(TestBase):
- def test_bake(self):
+ def test_bake(self):
- commit = Commit(self.rorepo, **{'sha': '2454ae89983a4496a445ce347d7a41c0bb0ea7ae'})
- commit.author # bake
+ commit = Commit(self.rorepo, **{'sha': '2454ae89983a4496a445ce347d7a41c0bb0ea7ae'})
+ commit.author # bake
- assert_equal("Sebastian Thiel", commit.author.name)
- assert_equal("byronimo@gmail.com", commit.author.email)
- assert commit.author == commit.committer
- assert isinstance(commit.authored_date, int) and isinstance(commit.committed_date, int)
- assert commit.message == "Added missing information to docstrings of commit and stats module"
+ assert_equal("Sebastian Thiel", commit.author.name)
+ assert_equal("byronimo@gmail.com", commit.author.email)
+ assert commit.author == commit.committer
+ assert isinstance(commit.authored_date, int) and isinstance(commit.committed_date, int)
+ assert commit.message == "Added missing information to docstrings of commit and stats module"
- def test_stats(self):
- commit = Commit(self.rorepo, '33ebe7acec14b25c5f84f35a664803fcab2f7781')
- stats = commit.stats
-
- def check_entries(d):
- assert isinstance(d, dict)
- for key in ("insertions", "deletions", "lines"):
- assert key in d
- # END assertion helper
- assert stats.files
- assert stats.total
-
- check_entries(stats.total)
- assert "files" in stats.total
-
- for filepath, d in stats.files.items():
- check_entries(d)
- # END for each stated file
-
- # assure data is parsed properly
- michael = Actor._from_string("Michael Trier <mtrier@gmail.com>")
- assert commit.author == michael
- assert commit.committer == michael
- assert commit.authored_date == 1210193388
- assert commit.committed_date == 1210193388
- assert commit.message == "initial project"
-
- def test_traversal(self):
- start = self.rorepo.commit("a4d06724202afccd2b5c54f81bcf2bf26dea7fff")
- first = self.rorepo.commit("33ebe7acec14b25c5f84f35a664803fcab2f7781")
- p0 = start.parents[0]
- p1 = start.parents[1]
- p00 = p0.parents[0]
- p10 = p1.parents[0]
-
- # basic branch first, depth first
- dfirst = start.traverse(branch_first=False)
- bfirst = start.traverse(branch_first=True)
- assert dfirst.next() == p0
- assert dfirst.next() == p00
-
- assert bfirst.next() == p0
- assert bfirst.next() == p1
- assert bfirst.next() == p00
- assert bfirst.next() == p10
-
- # at some point, both iterations should stop
- assert list(bfirst)[-1] == first
- stoptraverse = self.rorepo.commit("254d04aa3180eb8b8daf7b7ff25f010cd69b4e7d").traverse(as_edge=True)
- l = list(stoptraverse)
- assert len(l[0]) == 2
-
- # ignore self
- assert start.traverse(ignore_self=False).next() == start
-
- # depth
- assert len(list(start.traverse(ignore_self=False, depth=0))) == 1
-
- # prune
- assert start.traverse(branch_first=1, prune=lambda i,d: i==p0).next() == p1
-
- # predicate
- assert start.traverse(branch_first=1, predicate=lambda i,d: i==p1).next() == p1
-
- # traversal should stop when the beginning is reached
- self.failUnlessRaises(StopIteration, first.traverse().next)
-
- # parents of the first commit should be empty ( as the only parent has a null
- # sha )
- assert len(first.parents) == 0
-
- def test_iteration(self):
- # we can iterate commits
- all_commits = Commit.list_items(self.rorepo, 'master')
- assert all_commits
- assert all_commits == list(self.rorepo.iter_commits())
-
- # this includes merge commits
- mcomit = Commit(self.rorepo, 'd884adc80c80300b4cc05321494713904ef1df2d')
- assert mcomit in all_commits
-
- # we can limit the result to paths
- ltd_commits = list(self.rorepo.iter_commits(paths='CHANGES'))
- assert ltd_commits and len(ltd_commits) < len(all_commits)
-
- # show commits of multiple paths, resulting in a union of commits
- less_ltd_commits = list(Commit.iter_items(self.rorepo, 'master', paths=('CHANGES', 'AUTHORS')))
- assert len(ltd_commits) < len(less_ltd_commits)
-
-
- @patch_object(Git, '_call_process')
- def test_rev_list_bisect_all(self, git):
- """
- 'git rev-list --bisect-all' returns additional information
- in the commit header. This test ensures that we properly parse it.
- """
+ def test_stats(self):
+ commit = Commit(self.rorepo, '33ebe7acec14b25c5f84f35a664803fcab2f7781')
+ stats = commit.stats
+
+ def check_entries(d):
+ assert isinstance(d, dict)
+ for key in ("insertions", "deletions", "lines"):
+ assert key in d
+ # END assertion helper
+ assert stats.files
+ assert stats.total
+
+ check_entries(stats.total)
+ assert "files" in stats.total
+
+ for filepath, d in stats.files.items():
+ check_entries(d)
+ # END for each stated file
+
+ # assure data is parsed properly
+ michael = Actor._from_string("Michael Trier <mtrier@gmail.com>")
+ assert commit.author == michael
+ assert commit.committer == michael
+ assert commit.authored_date == 1210193388
+ assert commit.committed_date == 1210193388
+ assert commit.message == "initial project"
+
+ def test_traversal(self):
+ start = self.rorepo.commit("a4d06724202afccd2b5c54f81bcf2bf26dea7fff")
+ first = self.rorepo.commit("33ebe7acec14b25c5f84f35a664803fcab2f7781")
+ p0 = start.parents[0]
+ p1 = start.parents[1]
+ p00 = p0.parents[0]
+ p10 = p1.parents[0]
+
+ # basic branch first, depth first
+ dfirst = start.traverse(branch_first=False)
+ bfirst = start.traverse(branch_first=True)
+ assert dfirst.next() == p0
+ assert dfirst.next() == p00
+
+ assert bfirst.next() == p0
+ assert bfirst.next() == p1
+ assert bfirst.next() == p00
+ assert bfirst.next() == p10
+
+ # at some point, both iterations should stop
+ assert list(bfirst)[-1] == first
+ stoptraverse = self.rorepo.commit("254d04aa3180eb8b8daf7b7ff25f010cd69b4e7d").traverse(as_edge=True)
+ l = list(stoptraverse)
+ assert len(l[0]) == 2
+
+ # ignore self
+ assert start.traverse(ignore_self=False).next() == start
+
+ # depth
+ assert len(list(start.traverse(ignore_self=False, depth=0))) == 1
+
+ # prune
+ assert start.traverse(branch_first=1, prune=lambda i,d: i==p0).next() == p1
+
+ # predicate
+ assert start.traverse(branch_first=1, predicate=lambda i,d: i==p1).next() == p1
+
+ # traversal should stop when the beginning is reached
+ self.failUnlessRaises(StopIteration, first.traverse().next)
+
+ # parents of the first commit should be empty ( as the only parent has a null
+ # sha )
+ assert len(first.parents) == 0
+
+ def test_iteration(self):
+ # we can iterate commits
+ all_commits = Commit.list_items(self.rorepo, 'master')
+ assert all_commits
+ assert all_commits == list(self.rorepo.iter_commits())
+
+ # this includes merge commits
+ mcomit = Commit(self.rorepo, 'd884adc80c80300b4cc05321494713904ef1df2d')
+ assert mcomit in all_commits
+
+ # we can limit the result to paths
+ ltd_commits = list(self.rorepo.iter_commits(paths='CHANGES'))
+ assert ltd_commits and len(ltd_commits) < len(all_commits)
+
+ # show commits of multiple paths, resulting in a union of commits
+ less_ltd_commits = list(Commit.iter_items(self.rorepo, 'master', paths=('CHANGES', 'AUTHORS')))
+ assert len(ltd_commits) < len(less_ltd_commits)
+
+
+ @patch_object(Git, '_call_process')
+ def test_rev_list_bisect_all(self, git):
+ """
+ 'git rev-list --bisect-all' returns additional information
+ in the commit header. This test ensures that we properly parse it.
+ """
- git.return_value = fixture('rev_list_bisect_all')
+ git.return_value = fixture('rev_list_bisect_all')
- revs = self.rorepo.git.rev_list('HEAD',
- pretty='raw',
- first_parent=True,
- bisect_all=True)
- assert_true(git.called)
+ revs = self.rorepo.git.rev_list('HEAD',
+ pretty='raw',
+ first_parent=True,
+ bisect_all=True)
+ assert_true(git.called)
- commits = Commit._iter_from_process_or_stream(self.rorepo, ListProcessAdapter(revs), True)
- expected_ids = (
- 'cf37099ea8d1d8c7fbf9b6d12d7ec0249d3acb8b',
- '33ebe7acec14b25c5f84f35a664803fcab2f7781',
- 'a6604a00a652e754cb8b6b0b9f194f839fc38d7c',
- '8df638c22c75ddc9a43ecdde90c0c9939f5009e7',
- 'c231551328faa864848bde6ff8127f59c9566e90',
- )
- for sha1, commit in zip(expected_ids, commits):
- assert_equal(sha1, commit.sha)
+ commits = Commit._iter_from_process_or_stream(self.rorepo, ListProcessAdapter(revs), True)
+ expected_ids = (
+ 'cf37099ea8d1d8c7fbf9b6d12d7ec0249d3acb8b',
+ '33ebe7acec14b25c5f84f35a664803fcab2f7781',
+ 'a6604a00a652e754cb8b6b0b9f194f839fc38d7c',
+ '8df638c22c75ddc9a43ecdde90c0c9939f5009e7',
+ 'c231551328faa864848bde6ff8127f59c9566e90',
+ )
+ for sha1, commit in zip(expected_ids, commits):
+ assert_equal(sha1, commit.sha)
- def test_count(self):
- assert self.rorepo.tag('refs/tags/0.1.5').commit.count( ) == 143
-
- def test_list(self):
- assert isinstance(Commit.list_items(self.rorepo, '0.1.5', max_count=5)['5117c9c8a4d3af19a9958677e45cda9269de1541'], Commit)
+ def test_count(self):
+ assert self.rorepo.tag('refs/tags/0.1.5').commit.count( ) == 143
+
+ def test_list(self):
+ assert isinstance(Commit.list_items(self.rorepo, '0.1.5', max_count=5)['5117c9c8a4d3af19a9958677e45cda9269de1541'], Commit)
- def test_str(self):
- commit = Commit(self.rorepo, 'abc')
- assert_equal ("abc", str(commit))
+ def test_str(self):
+ commit = Commit(self.rorepo, 'abc')
+ assert_equal ("abc", str(commit))
- def test_repr(self):
- commit = Commit(self.rorepo, 'abc')
- assert_equal('<git.Commit "abc">', repr(commit))
+ def test_repr(self):
+ commit = Commit(self.rorepo, 'abc')
+ assert_equal('<git.Commit "abc">', repr(commit))
- def test_equality(self):
- commit1 = Commit(self.rorepo, 'abc')
- commit2 = Commit(self.rorepo, 'abc')
- commit3 = Commit(self.rorepo, 'zyx')
- assert_equal(commit1, commit2)
- assert_not_equal(commit2, commit3)
-
- def test_iter_parents(self):
- # should return all but ourselves, even if skip is defined
- c = self.rorepo.commit('0.1.5')
- for skip in (0, 1):
- piter = c.iter_parents(skip=skip)
- first_parent = piter.next()
- assert first_parent != c
- assert first_parent == c.parents[0]
- # END for each
-
- def test_base(self):
- name_rev = self.rorepo.head.commit.name_rev
- assert isinstance(name_rev, basestring)
-
+ def test_equality(self):
+ commit1 = Commit(self.rorepo, 'abc')
+ commit2 = Commit(self.rorepo, 'abc')
+ commit3 = Commit(self.rorepo, 'zyx')
+ assert_equal(commit1, commit2)
+ assert_not_equal(commit2, commit3)
+
+ def test_iter_parents(self):
+ # should return all but ourselves, even if skip is defined
+ c = self.rorepo.commit('0.1.5')
+ for skip in (0, 1):
+ piter = c.iter_parents(skip=skip)
+ first_parent = piter.next()
+ assert first_parent != c
+ assert first_parent == c.parents[0]
+ # END for each
+
+ def test_base(self):
+ name_rev = self.rorepo.head.commit.name_rev
+ assert isinstance(name_rev, basestring)
+
diff --git a/test/git/test_config.py b/test/git/test_config.py
index f51a4757..604a25f4 100644
--- a/test/git/test_config.py
+++ b/test/git/test_config.py
@@ -11,94 +11,94 @@ from copy import copy
from ConfigParser import NoSectionError
class TestBase(TestCase):
-
- def _to_memcache(self, file_path):
- fp = open(file_path, "r")
- sio = StringIO.StringIO()
- sio.write(fp.read())
- sio.seek(0)
- sio.name = file_path
- return sio
-
- def _parsers_equal_or_raise(self, lhs, rhs):
- pass
-
- def test_read_write(self):
- # writer must create the exact same file as the one read before
- for filename in ("git_config", "git_config_global"):
- file_obj = self._to_memcache(fixture_path(filename))
- file_obj_orig = copy(file_obj)
- w_config = GitConfigParser(file_obj, read_only = False)
- w_config.read() # enforce reading
- assert w_config._sections
- w_config.write() # enforce writing
- assert file_obj.getvalue() == file_obj_orig.getvalue()
-
- # creating an additional config writer must fail due to exclusive access
- self.failUnlessRaises(IOError, GitConfigParser, file_obj, read_only = False)
-
- # should still have a lock and be able to make changes
- assert w_config._lock._has_lock()
-
- # changes should be written right away
- sname = "my_section"
- oname = "mykey"
- val = "myvalue"
- w_config.add_section(sname)
- assert w_config.has_section(sname)
- w_config.set(sname, oname, val)
- assert w_config.has_option(sname,oname)
- assert w_config.get(sname, oname) == val
-
- sname_new = "new_section"
- oname_new = "new_key"
- ival = 10
- w_config.set_value(sname_new, oname_new, ival)
- assert w_config.get_value(sname_new, oname_new) == ival
-
- file_obj.seek(0)
- r_config = GitConfigParser(file_obj, read_only=True)
- assert r_config.has_section(sname)
- assert r_config.has_option(sname, oname)
- assert r_config.get(sname, oname) == val
-
- # END for each filename
-
- def test_base(self):
- path_repo = fixture_path("git_config")
- path_global = fixture_path("git_config_global")
- r_config = GitConfigParser([path_repo, path_global], read_only=True)
- assert r_config.read_only
- num_sections = 0
- num_options = 0
-
- # test reader methods
- assert r_config._is_initialized == False
- for section in r_config.sections():
- num_sections += 1
- for option in r_config.options(section):
- num_options += 1
- val = r_config.get(section, option)
- val_typed = r_config.get_value(section, option)
- assert isinstance(val_typed, (bool, long, float, basestring))
- assert val
- assert "\n" not in option
- assert "\n" not in val
-
- # writing must fail
- self.failUnlessRaises(IOError, r_config.set, section, option, None)
- self.failUnlessRaises(IOError, r_config.remove_option, section, option )
- # END for each option
- self.failUnlessRaises(IOError, r_config.remove_section, section)
- # END for each section
- assert num_sections and num_options
- assert r_config._is_initialized == True
-
- # get value which doesnt exist, with default
- default = "my default value"
- assert r_config.get_value("doesnt", "exist", default) == default
-
- # it raises if there is no default though
- self.failUnlessRaises(NoSectionError, r_config.get_value, "doesnt", "exist")
-
-
+
+ def _to_memcache(self, file_path):
+ fp = open(file_path, "r")
+ sio = StringIO.StringIO()
+ sio.write(fp.read())
+ sio.seek(0)
+ sio.name = file_path
+ return sio
+
+ def _parsers_equal_or_raise(self, lhs, rhs):
+ pass
+
+ def test_read_write(self):
+ # writer must create the exact same file as the one read before
+ for filename in ("git_config", "git_config_global"):
+ file_obj = self._to_memcache(fixture_path(filename))
+ file_obj_orig = copy(file_obj)
+ w_config = GitConfigParser(file_obj, read_only = False)
+ w_config.read() # enforce reading
+ assert w_config._sections
+ w_config.write() # enforce writing
+ assert file_obj.getvalue() == file_obj_orig.getvalue()
+
+ # creating an additional config writer must fail due to exclusive access
+ self.failUnlessRaises(IOError, GitConfigParser, file_obj, read_only = False)
+
+ # should still have a lock and be able to make changes
+ assert w_config._lock._has_lock()
+
+ # changes should be written right away
+ sname = "my_section"
+ oname = "mykey"
+ val = "myvalue"
+ w_config.add_section(sname)
+ assert w_config.has_section(sname)
+ w_config.set(sname, oname, val)
+ assert w_config.has_option(sname,oname)
+ assert w_config.get(sname, oname) == val
+
+ sname_new = "new_section"
+ oname_new = "new_key"
+ ival = 10
+ w_config.set_value(sname_new, oname_new, ival)
+ assert w_config.get_value(sname_new, oname_new) == ival
+
+ file_obj.seek(0)
+ r_config = GitConfigParser(file_obj, read_only=True)
+ assert r_config.has_section(sname)
+ assert r_config.has_option(sname, oname)
+ assert r_config.get(sname, oname) == val
+
+ # END for each filename
+
+ def test_base(self):
+ path_repo = fixture_path("git_config")
+ path_global = fixture_path("git_config_global")
+ r_config = GitConfigParser([path_repo, path_global], read_only=True)
+ assert r_config.read_only
+ num_sections = 0
+ num_options = 0
+
+ # test reader methods
+ assert r_config._is_initialized == False
+ for section in r_config.sections():
+ num_sections += 1
+ for option in r_config.options(section):
+ num_options += 1
+ val = r_config.get(section, option)
+ val_typed = r_config.get_value(section, option)
+ assert isinstance(val_typed, (bool, long, float, basestring))
+ assert val
+ assert "\n" not in option
+ assert "\n" not in val
+
+ # writing must fail
+ self.failUnlessRaises(IOError, r_config.set, section, option, None)
+ self.failUnlessRaises(IOError, r_config.remove_option, section, option )
+ # END for each option
+ self.failUnlessRaises(IOError, r_config.remove_section, section)
+ # END for each section
+ assert num_sections and num_options
+ assert r_config._is_initialized == True
+
+ # get value which doesnt exist, with default
+ default = "my default value"
+ assert r_config.get_value("doesnt", "exist", default) == default
+
+ # it raises if there is no default though
+ self.failUnlessRaises(NoSectionError, r_config.get_value, "doesnt", "exist")
+
+
diff --git a/test/git/test_diff.py b/test/git/test_diff.py
index e559e62d..5df39df1 100644
--- a/test/git/test_diff.py
+++ b/test/git/test_diff.py
@@ -8,82 +8,82 @@ from test.testlib import *
from git import *
class TestDiff(TestBase):
-
- def test_list_from_string_new_mode(self):
- output = ListProcessAdapter(fixture('diff_new_mode'))
- diffs = Diff._index_from_patch_format(self.rorepo, output.stdout)
- assert_equal(1, len(diffs))
- assert_equal(10, len(diffs[0].diff.splitlines()))
+
+ def test_list_from_string_new_mode(self):
+ output = ListProcessAdapter(fixture('diff_new_mode'))
+ diffs = Diff._index_from_patch_format(self.rorepo, output.stdout)
+ assert_equal(1, len(diffs))
+ assert_equal(10, len(diffs[0].diff.splitlines()))
- def test_diff_with_rename(self):
- output = ListProcessAdapter(fixture('diff_rename'))
- diffs = Diff._index_from_patch_format(self.rorepo, output.stdout)
+ def test_diff_with_rename(self):
+ output = ListProcessAdapter(fixture('diff_rename'))
+ diffs = Diff._index_from_patch_format(self.rorepo, output.stdout)
- assert_equal(1, len(diffs))
+ assert_equal(1, len(diffs))
- diff = diffs[0]
- assert_true(diff.renamed)
- assert_equal(diff.rename_from, 'AUTHORS')
- assert_equal(diff.rename_to, 'CONTRIBUTORS')
+ diff = diffs[0]
+ assert_true(diff.renamed)
+ assert_equal(diff.rename_from, 'AUTHORS')
+ assert_equal(diff.rename_to, 'CONTRIBUTORS')
- def test_diff_patch_format(self):
- # test all of the 'old' format diffs for completness - it should at least
- # be able to deal with it
- fixtures = ("diff_2", "diff_2f", "diff_f", "diff_i", "diff_mode_only",
- "diff_new_mode", "diff_numstat", "diff_p", "diff_rename",
- "diff_tree_numstat_root" )
-
- for fixture_name in fixtures:
- diff_proc = ListProcessAdapter(fixture(fixture_name))
- diffs = Diff._index_from_patch_format(self.rorepo, diff_proc.stdout)
- # END for each fixture
+ def test_diff_patch_format(self):
+ # test all of the 'old' format diffs for completness - it should at least
+ # be able to deal with it
+ fixtures = ("diff_2", "diff_2f", "diff_f", "diff_i", "diff_mode_only",
+ "diff_new_mode", "diff_numstat", "diff_p", "diff_rename",
+ "diff_tree_numstat_root" )
+
+ for fixture_name in fixtures:
+ diff_proc = ListProcessAdapter(fixture(fixture_name))
+ diffs = Diff._index_from_patch_format(self.rorepo, diff_proc.stdout)
+ # END for each fixture
- def test_diff_interface(self):
- # test a few variations of the main diff routine
- assertion_map = dict()
- for i, commit in enumerate(self.rorepo.iter_commits('0.1.6', max_count=2)):
- diff_item = commit
- if i%2 == 0:
- diff_item = commit.tree
- # END use tree every second item
-
- for other in (None, commit.Index, commit.parents[0]):
- for paths in (None, "CHANGES", ("CHANGES", "lib")):
- for create_patch in range(2):
- diff_index = diff_item.diff(other, paths, create_patch)
- assert isinstance(diff_index, DiffIndex)
-
- if diff_index:
- for ct in DiffIndex.change_type:
- key = 'ct_%s'%ct
- assertion_map.setdefault(key, 0)
- assertion_map[key] = assertion_map[key]+len(list(diff_index.iter_change_type(ct)))
- # END for each changetype
-
- # check entries
- diff_set = set()
- diff_set.add(diff_index[0])
- diff_set.add(diff_index[0])
- assert len(diff_set) == 1
- assert diff_index[0] == diff_index[0]
- assert not (diff_index[0] != diff_index[0])
- # END diff index checking
- # END for each patch option
- # END for each path option
- # END for each other side
- # END for each commit
-
- # assert we could always find at least one instance of the members we
- # can iterate in the diff index - if not this indicates its not working correctly
- # or our test does not span the whole range of possibilities
- for key,value in assertion_map.items():
- assert value, "Did not find diff for %s" % key
- # END for each iteration type
-
- # test path not existing in the index - should be ignored
- c = self.rorepo.head.commit
- cp = c.parents[0]
- diff_index = c.diff(cp, ["does/not/exist"])
- assert len(diff_index) == 0
-
-
+ def test_diff_interface(self):
+ # test a few variations of the main diff routine
+ assertion_map = dict()
+ for i, commit in enumerate(self.rorepo.iter_commits('0.1.6', max_count=2)):
+ diff_item = commit
+ if i%2 == 0:
+ diff_item = commit.tree
+ # END use tree every second item
+
+ for other in (None, commit.Index, commit.parents[0]):
+ for paths in (None, "CHANGES", ("CHANGES", "lib")):
+ for create_patch in range(2):
+ diff_index = diff_item.diff(other, paths, create_patch)
+ assert isinstance(diff_index, DiffIndex)
+
+ if diff_index:
+ for ct in DiffIndex.change_type:
+ key = 'ct_%s'%ct
+ assertion_map.setdefault(key, 0)
+ assertion_map[key] = assertion_map[key]+len(list(diff_index.iter_change_type(ct)))
+ # END for each changetype
+
+ # check entries
+ diff_set = set()
+ diff_set.add(diff_index[0])
+ diff_set.add(diff_index[0])
+ assert len(diff_set) == 1
+ assert diff_index[0] == diff_index[0]
+ assert not (diff_index[0] != diff_index[0])
+ # END diff index checking
+ # END for each patch option
+ # END for each path option
+ # END for each other side
+ # END for each commit
+
+ # assert we could always find at least one instance of the members we
+ # can iterate in the diff index - if not this indicates its not working correctly
+ # or our test does not span the whole range of possibilities
+ for key,value in assertion_map.items():
+ assert value, "Did not find diff for %s" % key
+ # END for each iteration type
+
+ # test path not existing in the index - should be ignored
+ c = self.rorepo.head.commit
+ cp = c.parents[0]
+ diff_index = c.diff(cp, ["does/not/exist"])
+ assert len(diff_index) == 0
+
+
diff --git a/test/git/test_git.py b/test/git/test_git.py
index 6e4ab394..518c464a 100644
--- a/test/git/test_git.py
+++ b/test/git/test_git.py
@@ -9,76 +9,76 @@ from test.testlib import *
from git import Git, GitCommandError
class TestGit(TestCase):
-
- @classmethod
- def setUpAll(cls):
- cls.git = Git(GIT_REPO)
+
+ @classmethod
+ def setUpAll(cls):
+ cls.git = Git(GIT_REPO)
- @patch_object(Git, 'execute')
- def test_call_process_calls_execute(self, git):
- git.return_value = ''
- self.git.version()
- assert_true(git.called)
- assert_equal(git.call_args, ((['git', 'version'],), {}))
+ @patch_object(Git, 'execute')
+ def test_call_process_calls_execute(self, git):
+ git.return_value = ''
+ self.git.version()
+ assert_true(git.called)
+ assert_equal(git.call_args, ((['git', 'version'],), {}))
- @raises(GitCommandError)
- def test_it_raises_errors(self):
- self.git.this_does_not_exist()
+ @raises(GitCommandError)
+ def test_it_raises_errors(self):
+ self.git.this_does_not_exist()
- def test_it_transforms_kwargs_into_git_command_arguments(self):
- assert_equal(["-s"], self.git.transform_kwargs(**{'s': True}))
- assert_equal(["-s5"], self.git.transform_kwargs(**{'s': 5}))
+ def test_it_transforms_kwargs_into_git_command_arguments(self):
+ assert_equal(["-s"], self.git.transform_kwargs(**{'s': True}))
+ assert_equal(["-s5"], self.git.transform_kwargs(**{'s': 5}))
- assert_equal(["--max-count"], self.git.transform_kwargs(**{'max_count': True}))
- assert_equal(["--max-count=5"], self.git.transform_kwargs(**{'max_count': 5}))
+ assert_equal(["--max-count"], self.git.transform_kwargs(**{'max_count': True}))
+ assert_equal(["--max-count=5"], self.git.transform_kwargs(**{'max_count': 5}))
- assert_equal(["-s", "-t"], self.git.transform_kwargs(**{'s': True, 't': True}))
+ assert_equal(["-s", "-t"], self.git.transform_kwargs(**{'s': True, 't': True}))
- def test_it_executes_git_to_shell_and_returns_result(self):
- assert_match('^git version [\d\.]{2}.*$', self.git.execute(["git","version"]))
+ def test_it_executes_git_to_shell_and_returns_result(self):
+ assert_match('^git version [\d\.]{2}.*$', self.git.execute(["git","version"]))
- def test_it_accepts_stdin(self):
- filename = fixture_path("cat_file_blob")
- fh = open(filename, 'r')
- assert_equal("70c379b63ffa0795fdbfbc128e5a2818397b7ef8",
- self.git.hash_object(istream=fh, stdin=True))
- fh.close()
+ def test_it_accepts_stdin(self):
+ filename = fixture_path("cat_file_blob")
+ fh = open(filename, 'r')
+ assert_equal("70c379b63ffa0795fdbfbc128e5a2818397b7ef8",
+ self.git.hash_object(istream=fh, stdin=True))
+ fh.close()
- @patch_object(Git, 'execute')
- def test_it_ignores_false_kwargs(self, git):
- # this_should_not_be_ignored=False implies it *should* be ignored
- output = self.git.version(pass_this_kwarg=False)
- assert_true("pass_this_kwarg" not in git.call_args[1])
-
- def test_persistent_cat_file_command(self):
- # read header only
- import subprocess as sp
- hexsha = "b2339455342180c7cc1e9bba3e9f181f7baa5167"
- g = self.git.cat_file(batch_check=True, istream=sp.PIPE,as_process=True)
- g.stdin.write("b2339455342180c7cc1e9bba3e9f181f7baa5167\n")
- g.stdin.flush()
- obj_info = g.stdout.readline()
-
- # read header + data
- g = self.git.cat_file(batch=True, istream=sp.PIPE,as_process=True)
- g.stdin.write("b2339455342180c7cc1e9bba3e9f181f7baa5167\n")
- g.stdin.flush()
- obj_info_two = g.stdout.readline()
- assert obj_info == obj_info_two
-
- # read data - have to read it in one large chunk
- size = int(obj_info.split()[2])
- data = g.stdout.read(size)
- terminating_newline = g.stdout.read(1)
-
- # now we should be able to read a new object
- g.stdin.write("b2339455342180c7cc1e9bba3e9f181f7baa5167\n")
- g.stdin.flush()
- assert g.stdout.readline() == obj_info
-
-
- # same can be achived using the respective command functions
- hexsha, typename, size = self.git.get_object_header(hexsha)
- hexsha, typename_two, size_two, data = self.git.get_object_data(hexsha)
- assert typename == typename_two and size == size_two
+ @patch_object(Git, 'execute')
+ def test_it_ignores_false_kwargs(self, git):
+ # this_should_not_be_ignored=False implies it *should* be ignored
+ output = self.git.version(pass_this_kwarg=False)
+ assert_true("pass_this_kwarg" not in git.call_args[1])
+
+ def test_persistent_cat_file_command(self):
+ # read header only
+ import subprocess as sp
+ hexsha = "b2339455342180c7cc1e9bba3e9f181f7baa5167"
+ g = self.git.cat_file(batch_check=True, istream=sp.PIPE,as_process=True)
+ g.stdin.write("b2339455342180c7cc1e9bba3e9f181f7baa5167\n")
+ g.stdin.flush()
+ obj_info = g.stdout.readline()
+
+ # read header + data
+ g = self.git.cat_file(batch=True, istream=sp.PIPE,as_process=True)
+ g.stdin.write("b2339455342180c7cc1e9bba3e9f181f7baa5167\n")
+ g.stdin.flush()
+ obj_info_two = g.stdout.readline()
+ assert obj_info == obj_info_two
+
+ # read data - have to read it in one large chunk
+ size = int(obj_info.split()[2])
+ data = g.stdout.read(size)
+ terminating_newline = g.stdout.read(1)
+
+ # now we should be able to read a new object
+ g.stdin.write("b2339455342180c7cc1e9bba3e9f181f7baa5167\n")
+ g.stdin.flush()
+ assert g.stdout.readline() == obj_info
+
+
+ # same can be achived using the respective command functions
+ hexsha, typename, size = self.git.get_object_header(hexsha)
+ hexsha, typename_two, size_two, data = self.git.get_object_data(hexsha)
+ assert typename == typename_two and size == size_two
diff --git a/test/git/test_index.py b/test/git/test_index.py
index 7d3f13cd..9eed5678 100644
--- a/test/git/test_index.py
+++ b/test/git/test_index.py
@@ -15,515 +15,515 @@ import shutil
from stat import *
class TestIndex(TestBase):
-
- def __init__(self, *args):
- super(TestIndex, self).__init__(*args)
- self._reset_progress()
-
- def _assert_fprogress(self, entries):
- assert len(entries) == len(self._fprogress_map)
- for path, call_count in self._fprogress_map.iteritems():
- assert call_count == 2
- self._reset_progress()
+
+ def __init__(self, *args):
+ super(TestIndex, self).__init__(*args)
+ self._reset_progress()
+
+ def _assert_fprogress(self, entries):
+ assert len(entries) == len(self._fprogress_map)
+ for path, call_count in self._fprogress_map.iteritems():
+ assert call_count == 2
+ self._reset_progress()
- def _fprogress(self, path, done, item):
- self._fprogress_map.setdefault(path, 0)
- curval = self._fprogress_map[path]
- if curval == 0:
- assert not done
- if curval == 1:
- assert done
- self._fprogress_map[path] = curval + 1
-
- def _fprogress_add(self, path, done, item):
- """Called as progress func - we keep track of the proper
- call order"""
- assert item is not None
- self._fprogress(path, done, item)
-
- def _reset_progress(self):
- # maps paths to the count of calls
- self._fprogress_map = dict()
-
- def test_index_file_base(self):
- # read from file
- index = IndexFile(self.rorepo, fixture_path("index"))
- assert index.entries
- assert index.version > 0
-
- # test entry
- last_val = None
- entry = index.entries.itervalues().next()
- for attr in ("path","ctime","mtime","dev","inode","mode","uid",
- "gid","size","sha","stage"):
- val = getattr(entry, attr)
- # END for each method
-
- # test update
- entries = index.entries
- assert isinstance(index.update(), IndexFile)
- assert entries is not index.entries
-
- # test stage
- index_merge = IndexFile(self.rorepo, fixture_path("index_merge"))
- assert len(index_merge.entries) == 106
- assert len(list(e for e in index_merge.entries.itervalues() if e.stage != 0 ))
-
- # write the data - it must match the original
- tmpfile = tempfile.mktemp()
- index_merge.write(tmpfile)
- fp = open(tmpfile, 'rb')
- assert fp.read() == fixture("index_merge")
- fp.close()
- os.remove(tmpfile)
-
- def _cmp_tree_index(self, tree, index):
- # fail unless both objects contain the same paths and blobs
- if isinstance(tree, str):
- tree = self.rorepo.commit(tree).tree
-
- num_blobs = 0
- blist = list()
- for blob in tree.traverse(predicate = lambda e,d: e.type == "blob", branch_first=False):
- assert (blob.path,0) in index.entries
- blist.append(blob)
- # END for each blob in tree
- if len(blist) != len(index.entries):
- iset = set(k[0] for k in index.entries.keys())
- bset = set(b.path for b in blist)
- raise AssertionError( "CMP Failed: Missing entries in index: %s, missing in tree: %s" % (bset-iset, iset-bset) )
- # END assertion message
-
- def test_index_file_from_tree(self):
- common_ancestor_sha = "5117c9c8a4d3af19a9958677e45cda9269de1541"
- cur_sha = "4b43ca7ff72d5f535134241e7c797ddc9c7a3573"
- other_sha = "39f85c4358b7346fee22169da9cad93901ea9eb9"
-
- # simple index from tree
- base_index = IndexFile.from_tree(self.rorepo, common_ancestor_sha)
- assert base_index.entries
- self._cmp_tree_index(common_ancestor_sha, base_index)
-
- # merge two trees - its like a fast-forward
- two_way_index = IndexFile.from_tree(self.rorepo, common_ancestor_sha, cur_sha)
- assert two_way_index.entries
- self._cmp_tree_index(cur_sha, two_way_index)
-
- # merge three trees - here we have a merge conflict
- three_way_index = IndexFile.from_tree(self.rorepo, common_ancestor_sha, cur_sha, other_sha)
- assert len(list(e for e in three_way_index.entries.values() if e.stage != 0))
-
-
- # ITERATE BLOBS
- merge_required = lambda t: t[0] != 0
- merge_blobs = list(three_way_index.iter_blobs(merge_required))
- assert merge_blobs
- assert merge_blobs[0][0] in (1,2,3)
- assert isinstance(merge_blobs[0][1], Blob)
-
- # test BlobFilter
- prefix = 'lib/git'
- for stage, blob in base_index.iter_blobs(BlobFilter([prefix])):
- assert blob.path.startswith(prefix)
-
-
- # writing a tree should fail with an unmerged index
- self.failUnlessRaises(GitCommandError, three_way_index.write_tree)
-
- # removed unmerged entries
- unmerged_blob_map = three_way_index.unmerged_blobs()
- assert unmerged_blob_map
-
- # pick the first blob at the first stage we find and use it as resolved version
- three_way_index.resolve_blobs( l[0][1] for l in unmerged_blob_map.itervalues() )
- tree = three_way_index.write_tree()
- assert isinstance(tree, Tree)
- num_blobs = 0
- for blob in tree.traverse(predicate=lambda item,d: item.type == "blob"):
- assert (blob.path,0) in three_way_index.entries
- num_blobs += 1
- # END for each blob
- assert num_blobs == len(three_way_index.entries)
-
- @with_rw_repo('0.1.6')
- def test_index_merge_tree(self, rw_repo):
- # SINGLE TREE MERGE
- # current index is at the (virtual) cur_commit
- next_commit = "4c39f9da792792d4e73fc3a5effde66576ae128c"
- parent_commit = rw_repo.head.commit.parents[0]
- manifest_key = IndexFile.get_entries_key('MANIFEST.in', 0)
- manifest_entry = rw_repo.index.entries[manifest_key]
- rw_repo.index.merge_tree(next_commit)
- # only one change should be recorded
- assert manifest_entry.sha != rw_repo.index.entries[manifest_key].sha
-
- rw_repo.index.reset(rw_repo.head)
- assert rw_repo.index.entries[manifest_key].sha == manifest_entry.sha
-
- # FAKE MERGE
- #############
- # Add a change with a NULL sha that should conflict with next_commit. We
- # pretend there was a change, but we do not even bother adding a proper
- # sha for it ( which makes things faster of course )
- manifest_fake_entry = BaseIndexEntry((manifest_entry[0], Diff.null_hex_sha, 0, manifest_entry[3]))
- rw_repo.index.add([manifest_fake_entry])
- # add actually resolves the null-hex-sha for us as a feature, but we can
- # edit the index manually
- assert rw_repo.index.entries[manifest_key].sha != Diff.null_hex_sha
- # must operate on the same index for this ! Its a bit problematic as
- # it might confuse people
- index = rw_repo.index
- index.entries[manifest_key] = IndexEntry.from_base(manifest_fake_entry)
- index.write()
- assert rw_repo.index.entries[manifest_key].sha == Diff.null_hex_sha
-
- # a three way merge would result in a conflict and fails as the command will
- # not overwrite any entries in our index and hence leave them unmerged. This is
- # mainly a protection feature as the current index is not yet in a tree
- self.failUnlessRaises(GitCommandError, index.merge_tree, next_commit, base=parent_commit)
-
- # the only way to get the merged entries is to safe the current index away into a tree,
- # which is like a temporary commit for us. This fails as well as the NULL sha deos not
- # have a corresponding object
- self.failUnlessRaises(GitCommandError, index.write_tree)
-
- # if missing objects are okay, this would work though
- tree = index.write_tree(missing_ok = True)
-
- # now make a proper three way merge with unmerged entries
- unmerged_tree = IndexFile.from_tree(rw_repo, parent_commit, tree, next_commit)
- unmerged_blobs = unmerged_tree.unmerged_blobs()
- assert len(unmerged_blobs) == 1 and unmerged_blobs.keys()[0] == manifest_key[0]
-
-
- @with_rw_repo('0.1.6')
- def test_index_file_diffing(self, rw_repo):
- # default Index instance points to our index
- index = IndexFile(rw_repo)
- assert index.path is not None
- assert len(index.entries)
-
- # write the file back
- index.write()
-
- # could sha it, or check stats
-
- # test diff
- # resetting the head will leave the index in a different state, and the
- # diff will yield a few changes
- cur_head_commit = rw_repo.head.reference.commit
- ref = rw_repo.head.reset('HEAD~6', index=True, working_tree=False)
-
- # diff against same index is 0
- diff = index.diff()
- assert len(diff) == 0
-
- # against HEAD as string, must be the same as it matches index
- diff = index.diff('HEAD')
- assert len(diff) == 0
-
- # against previous head, there must be a difference
- diff = index.diff(cur_head_commit)
- assert len(diff)
-
- # we reverse the result
- adiff = index.diff(str(cur_head_commit), R=True)
- odiff = index.diff(cur_head_commit, R=False) # now its not reversed anymore
- assert adiff != odiff
- assert odiff == diff # both unreversed diffs against HEAD
-
- # against working copy - its still at cur_commit
- wdiff = index.diff(None)
- assert wdiff != adiff
- assert wdiff != odiff
-
- # against something unusual
- self.failUnlessRaises(ValueError, index.diff, int)
-
- # adjust the index to match an old revision
- cur_branch = rw_repo.active_branch
- cur_commit = cur_branch.commit
- rev_head_parent = 'HEAD~1'
- assert index.reset(rev_head_parent) is index
-
- assert cur_branch == rw_repo.active_branch
- assert cur_commit == rw_repo.head.commit
-
- # there must be differences towards the working tree which is in the 'future'
- assert index.diff(None)
-
- # reset the working copy as well to current head,to pull 'back' as well
- new_data = "will be reverted"
- file_path = os.path.join(rw_repo.working_tree_dir, "CHANGES")
- fp = open(file_path, "wb")
- fp.write(new_data)
- fp.close()
- index.reset(rev_head_parent, working_tree=True)
- assert not index.diff(None)
- assert cur_branch == rw_repo.active_branch
- assert cur_commit == rw_repo.head.commit
- fp = open(file_path,'rb')
- try:
- assert fp.read() != new_data
- finally:
- fp.close()
-
- # test full checkout
- test_file = os.path.join(rw_repo.working_tree_dir, "CHANGES")
- open(test_file, 'ab').write("some data")
- rval = index.checkout(None, force=True, fprogress=self._fprogress)
- assert 'CHANGES' in list(rval)
- self._assert_fprogress([None])
- assert os.path.isfile(test_file)
-
- os.remove(test_file)
- rval = index.checkout(None, force=False, fprogress=self._fprogress)
- assert 'CHANGES' in list(rval)
- self._assert_fprogress([None])
- assert os.path.isfile(test_file)
-
- # individual file
- os.remove(test_file)
- rval = index.checkout(test_file, fprogress=self._fprogress)
- assert list(rval)[0] == 'CHANGES'
- self._assert_fprogress([test_file])
- assert os.path.exists(test_file)
-
- # checking out non-existing file throws
- self.failUnlessRaises(CheckoutError, index.checkout, "doesnt_exist_ever.txt.that")
- self.failUnlessRaises(CheckoutError, index.checkout, paths=["doesnt/exist"])
-
- # checkout file with modifications
- append_data = "hello"
- fp = open(test_file, "ab")
- fp.write(append_data)
- fp.close()
- try:
- index.checkout(test_file)
- except CheckoutError, e:
- assert len(e.failed_files) == 1 and e.failed_files[0] == os.path.basename(test_file)
- assert (len(e.failed_files) == len(e.failed_reasons)) and isinstance(e.failed_reasons[0], basestring)
- assert len(e.valid_files) == 0
- assert open(test_file).read().endswith(append_data)
- else:
- raise AssertionError("Exception CheckoutError not thrown")
-
- # if we force it it should work
- index.checkout(test_file, force=True)
- assert not open(test_file).read().endswith(append_data)
-
- # checkout directory
- shutil.rmtree(os.path.join(rw_repo.working_tree_dir, "lib"))
- rval = index.checkout('lib')
- assert len(list(rval)) > 1
-
- def _count_existing(self, repo, files):
- """
- Returns count of files that actually exist in the repository directory.
- """
- existing = 0
- basedir = repo.working_tree_dir
- for f in files:
- existing += os.path.isfile(os.path.join(basedir, f))
- # END for each deleted file
- return existing
- # END num existing helper
-
- @with_rw_repo('0.1.6')
- def test_index_mutation(self, rw_repo):
- index = rw_repo.index
- num_entries = len(index.entries)
- cur_head = rw_repo.head
-
- # remove all of the files, provide a wild mix of paths, BaseIndexEntries,
- # IndexEntries
- def mixed_iterator():
- count = 0
- for entry in index.entries.itervalues():
- type_id = count % 4
- if type_id == 0: # path
- yield entry.path
- elif type_id == 1: # blob
- yield Blob(rw_repo, entry.sha, entry.mode, entry.path)
- elif type_id == 2: # BaseIndexEntry
- yield BaseIndexEntry(entry[:4])
- elif type_id == 3: # IndexEntry
- yield entry
- else:
- raise AssertionError("Invalid Type")
- count += 1
- # END for each entry
- # END mixed iterator
- deleted_files = index.remove(mixed_iterator(), working_tree=False)
- assert deleted_files
- assert self._count_existing(rw_repo, deleted_files) == len(deleted_files)
- assert len(index.entries) == 0
-
- # reset the index to undo our changes
- index.reset()
- assert len(index.entries) == num_entries
-
- # remove with working copy
- deleted_files = index.remove(mixed_iterator(), working_tree=True)
- assert deleted_files
- assert self._count_existing(rw_repo, deleted_files) == 0
-
- # reset everything
- index.reset(working_tree=True)
- assert self._count_existing(rw_repo, deleted_files) == len(deleted_files)
-
- # invalid type
- self.failUnlessRaises(TypeError, index.remove, [1])
-
- # absolute path
- deleted_files = index.remove([os.path.join(rw_repo.working_tree_dir,"lib")], r=True)
- assert len(deleted_files) > 1
- self.failUnlessRaises(ValueError, index.remove, ["/doesnt/exists"])
-
- # TEST COMMITTING
- # commit changed index
- cur_commit = cur_head.commit
- commit_message = "commit default head"
-
- new_commit = index.commit(commit_message, head=False)
- assert new_commit.message == commit_message
- assert new_commit.parents[0] == cur_commit
- assert len(new_commit.parents) == 1
- assert cur_head.commit == cur_commit
-
- # same index, no parents
- commit_message = "index without parents"
- commit_no_parents = index.commit(commit_message, parent_commits=list(), head=True)
- assert commit_no_parents.message == commit_message
- assert len(commit_no_parents.parents) == 0
- assert cur_head.commit == commit_no_parents
-
- # same index, multiple parents
- commit_message = "Index with multiple parents\n commit with another line"
- commit_multi_parent = index.commit(commit_message,parent_commits=(commit_no_parents, new_commit))
- assert commit_multi_parent.message == commit_message
- assert len(commit_multi_parent.parents) == 2
- assert commit_multi_parent.parents[0] == commit_no_parents
- assert commit_multi_parent.parents[1] == new_commit
- assert cur_head.commit == commit_multi_parent
-
- # re-add all files in lib
- # get the lib folder back on disk, but get an index without it
- index.reset(new_commit.parents[0], working_tree=True).reset(new_commit, working_tree=False)
- lib_file_path = "lib/git/__init__.py"
- assert (lib_file_path, 0) not in index.entries
- assert os.path.isfile(os.path.join(rw_repo.working_tree_dir, lib_file_path))
-
- # directory
- entries = index.add(['lib'], fprogress=self._fprogress_add)
- self._assert_fprogress(entries)
- assert len(entries)>1
-
- # glob
- entries = index.reset(new_commit).add(['lib/git/*.py'], fprogress=self._fprogress_add)
- self._assert_fprogress(entries)
- assert len(entries) == 14
-
- # same file
- entries = index.reset(new_commit).add(['lib/git/head.py']*2, fprogress=self._fprogress_add)
- # would fail, test is too primitive to handle this case
- # self._assert_fprogress(entries)
- self._reset_progress()
- assert len(entries) == 2
-
- # missing path
- self.failUnlessRaises(GitCommandError, index.reset(new_commit).add, ['doesnt/exist/must/raise'])
-
- # blob from older revision overrides current index revision
- old_blob = new_commit.parents[0].tree.blobs[0]
- entries = index.reset(new_commit).add([old_blob], fprogress=self._fprogress_add)
- self._assert_fprogress(entries)
- assert index.entries[(old_blob.path,0)].sha == old_blob.sha and len(entries) == 1
-
- # mode 0 not allowed
- null_sha = "0"*40
- self.failUnlessRaises(ValueError, index.reset(new_commit).add, [BaseIndexEntry((0, null_sha,0,"doesntmatter"))])
-
- # add new file
- new_file_relapath = "my_new_file"
- new_file_path = self._make_file(new_file_relapath, "hello world", rw_repo)
- entries = index.reset(new_commit).add([BaseIndexEntry((010644, null_sha, 0, new_file_relapath))], fprogress=self._fprogress_add)
- self._assert_fprogress(entries)
- assert len(entries) == 1 and entries[0].sha != null_sha
-
- # add symlink
- if sys.platform != "win32":
- link_file = os.path.join(rw_repo.working_tree_dir, "my_real_symlink")
- os.symlink("/etc/that", link_file)
- entries = index.reset(new_commit).add([link_file], fprogress=self._fprogress_add)
- self._assert_fprogress(entries)
- assert len(entries) == 1 and S_ISLNK(entries[0].mode)
- # END real symlink test
-
- # add fake symlink and assure it checks-our as symlink
- fake_symlink_relapath = "my_fake_symlink"
- link_target = "/etc/that"
- fake_symlink_path = self._make_file(fake_symlink_relapath, link_target, rw_repo)
- fake_entry = BaseIndexEntry((0120000, null_sha, 0, fake_symlink_relapath))
- entries = index.reset(new_commit).add([fake_entry], fprogress=self._fprogress_add)
- self._assert_fprogress(entries)
- assert entries[0].sha != null_sha
- assert len(entries) == 1 and S_ISLNK(entries[0].mode)
-
- # assure this also works with an alternate method
- full_index_entry = IndexEntry.from_base(BaseIndexEntry((0120000, entries[0].sha, 0, entries[0].path)))
- entry_key = index.get_entries_key(full_index_entry)
- index.reset(new_commit)
-
- assert entry_key not in index.entries
- index.entries[entry_key] = full_index_entry
- index.write()
- index.update() # force reread of entries
- new_entry = index.entries[entry_key]
- assert S_ISLNK(new_entry.mode)
-
- # a tree created from this should contain the symlink
- tree = index.write_tree(True)
- assert fake_symlink_relapath in tree
-
- # checkout the fakelink, should be a link then
- assert not S_ISLNK(os.stat(fake_symlink_path)[ST_MODE])
- os.remove(fake_symlink_path)
- index.checkout(fake_symlink_path)
-
- # on windows we will never get symlinks
- if os.name == 'nt':
- # simlinks should contain the link as text ( which is what a
- # symlink actually is )
- open(fake_symlink_path,'rb').read() == link_target
- else:
- assert S_ISLNK(os.lstat(fake_symlink_path)[ST_MODE])
-
- # TEST RENAMING
- def assert_mv_rval(rval):
- for source, dest in rval:
- assert not os.path.exists(source) and os.path.exists(dest)
- # END for each renamed item
- # END move assertion utility
-
- self.failUnlessRaises(ValueError, index.move, ['just_one_path'])
- # file onto existing file
- files = ['AUTHORS', 'LICENSE']
- self.failUnlessRaises(GitCommandError, index.move, files)
-
- # again, with force
- assert_mv_rval(index.move(files, force=True))
-
- # files into directory - dry run
- paths = ['LICENSE', 'VERSION', 'doc']
- rval = index.move(paths, dry_run=True)
- assert len(rval) == 2
- assert os.path.exists(paths[0])
-
- # again, no dry run
- rval = index.move(paths)
- assert_mv_rval(rval)
-
- # dir into dir
- rval = index.move(['doc', 'test'])
- assert_mv_rval(rval)
+ def _fprogress(self, path, done, item):
+ self._fprogress_map.setdefault(path, 0)
+ curval = self._fprogress_map[path]
+ if curval == 0:
+ assert not done
+ if curval == 1:
+ assert done
+ self._fprogress_map[path] = curval + 1
+
+ def _fprogress_add(self, path, done, item):
+ """Called as progress func - we keep track of the proper
+ call order"""
+ assert item is not None
+ self._fprogress(path, done, item)
+
+ def _reset_progress(self):
+ # maps paths to the count of calls
+ self._fprogress_map = dict()
+
+ def test_index_file_base(self):
+ # read from file
+ index = IndexFile(self.rorepo, fixture_path("index"))
+ assert index.entries
+ assert index.version > 0
+
+ # test entry
+ last_val = None
+ entry = index.entries.itervalues().next()
+ for attr in ("path","ctime","mtime","dev","inode","mode","uid",
+ "gid","size","sha","stage"):
+ val = getattr(entry, attr)
+ # END for each method
+
+ # test update
+ entries = index.entries
+ assert isinstance(index.update(), IndexFile)
+ assert entries is not index.entries
+
+ # test stage
+ index_merge = IndexFile(self.rorepo, fixture_path("index_merge"))
+ assert len(index_merge.entries) == 106
+ assert len(list(e for e in index_merge.entries.itervalues() if e.stage != 0 ))
+
+ # write the data - it must match the original
+ tmpfile = tempfile.mktemp()
+ index_merge.write(tmpfile)
+ fp = open(tmpfile, 'rb')
+ assert fp.read() == fixture("index_merge")
+ fp.close()
+ os.remove(tmpfile)
+
+ def _cmp_tree_index(self, tree, index):
+ # fail unless both objects contain the same paths and blobs
+ if isinstance(tree, str):
+ tree = self.rorepo.commit(tree).tree
+
+ num_blobs = 0
+ blist = list()
+ for blob in tree.traverse(predicate = lambda e,d: e.type == "blob", branch_first=False):
+ assert (blob.path,0) in index.entries
+ blist.append(blob)
+ # END for each blob in tree
+ if len(blist) != len(index.entries):
+ iset = set(k[0] for k in index.entries.keys())
+ bset = set(b.path for b in blist)
+ raise AssertionError( "CMP Failed: Missing entries in index: %s, missing in tree: %s" % (bset-iset, iset-bset) )
+ # END assertion message
+
+ def test_index_file_from_tree(self):
+ common_ancestor_sha = "5117c9c8a4d3af19a9958677e45cda9269de1541"
+ cur_sha = "4b43ca7ff72d5f535134241e7c797ddc9c7a3573"
+ other_sha = "39f85c4358b7346fee22169da9cad93901ea9eb9"
+
+ # simple index from tree
+ base_index = IndexFile.from_tree(self.rorepo, common_ancestor_sha)
+ assert base_index.entries
+ self._cmp_tree_index(common_ancestor_sha, base_index)
+
+ # merge two trees - its like a fast-forward
+ two_way_index = IndexFile.from_tree(self.rorepo, common_ancestor_sha, cur_sha)
+ assert two_way_index.entries
+ self._cmp_tree_index(cur_sha, two_way_index)
+
+ # merge three trees - here we have a merge conflict
+ three_way_index = IndexFile.from_tree(self.rorepo, common_ancestor_sha, cur_sha, other_sha)
+ assert len(list(e for e in three_way_index.entries.values() if e.stage != 0))
+
+
+ # ITERATE BLOBS
+ merge_required = lambda t: t[0] != 0
+ merge_blobs = list(three_way_index.iter_blobs(merge_required))
+ assert merge_blobs
+ assert merge_blobs[0][0] in (1,2,3)
+ assert isinstance(merge_blobs[0][1], Blob)
+
+ # test BlobFilter
+ prefix = 'lib/git'
+ for stage, blob in base_index.iter_blobs(BlobFilter([prefix])):
+ assert blob.path.startswith(prefix)
+
+
+ # writing a tree should fail with an unmerged index
+ self.failUnlessRaises(GitCommandError, three_way_index.write_tree)
+
+ # removed unmerged entries
+ unmerged_blob_map = three_way_index.unmerged_blobs()
+ assert unmerged_blob_map
+
+ # pick the first blob at the first stage we find and use it as resolved version
+ three_way_index.resolve_blobs( l[0][1] for l in unmerged_blob_map.itervalues() )
+ tree = three_way_index.write_tree()
+ assert isinstance(tree, Tree)
+ num_blobs = 0
+ for blob in tree.traverse(predicate=lambda item,d: item.type == "blob"):
+ assert (blob.path,0) in three_way_index.entries
+ num_blobs += 1
+ # END for each blob
+ assert num_blobs == len(three_way_index.entries)
+
+ @with_rw_repo('0.1.6')
+ def test_index_merge_tree(self, rw_repo):
+ # SINGLE TREE MERGE
+ # current index is at the (virtual) cur_commit
+ next_commit = "4c39f9da792792d4e73fc3a5effde66576ae128c"
+ parent_commit = rw_repo.head.commit.parents[0]
+ manifest_key = IndexFile.get_entries_key('MANIFEST.in', 0)
+ manifest_entry = rw_repo.index.entries[manifest_key]
+ rw_repo.index.merge_tree(next_commit)
+ # only one change should be recorded
+ assert manifest_entry.sha != rw_repo.index.entries[manifest_key].sha
+
+ rw_repo.index.reset(rw_repo.head)
+ assert rw_repo.index.entries[manifest_key].sha == manifest_entry.sha
+
+ # FAKE MERGE
+ #############
+ # Add a change with a NULL sha that should conflict with next_commit. We
+ # pretend there was a change, but we do not even bother adding a proper
+ # sha for it ( which makes things faster of course )
+ manifest_fake_entry = BaseIndexEntry((manifest_entry[0], Diff.null_hex_sha, 0, manifest_entry[3]))
+ rw_repo.index.add([manifest_fake_entry])
+ # add actually resolves the null-hex-sha for us as a feature, but we can
+ # edit the index manually
+ assert rw_repo.index.entries[manifest_key].sha != Diff.null_hex_sha
+ # must operate on the same index for this ! Its a bit problematic as
+ # it might confuse people
+ index = rw_repo.index
+ index.entries[manifest_key] = IndexEntry.from_base(manifest_fake_entry)
+ index.write()
+ assert rw_repo.index.entries[manifest_key].sha == Diff.null_hex_sha
+
+ # a three way merge would result in a conflict and fails as the command will
+ # not overwrite any entries in our index and hence leave them unmerged. This is
+ # mainly a protection feature as the current index is not yet in a tree
+ self.failUnlessRaises(GitCommandError, index.merge_tree, next_commit, base=parent_commit)
+
+ # the only way to get the merged entries is to safe the current index away into a tree,
+ # which is like a temporary commit for us. This fails as well as the NULL sha deos not
+ # have a corresponding object
+ self.failUnlessRaises(GitCommandError, index.write_tree)
+
+ # if missing objects are okay, this would work though
+ tree = index.write_tree(missing_ok = True)
+
+ # now make a proper three way merge with unmerged entries
+ unmerged_tree = IndexFile.from_tree(rw_repo, parent_commit, tree, next_commit)
+ unmerged_blobs = unmerged_tree.unmerged_blobs()
+ assert len(unmerged_blobs) == 1 and unmerged_blobs.keys()[0] == manifest_key[0]
+
+
+ @with_rw_repo('0.1.6')
+ def test_index_file_diffing(self, rw_repo):
+ # default Index instance points to our index
+ index = IndexFile(rw_repo)
+ assert index.path is not None
+ assert len(index.entries)
+
+ # write the file back
+ index.write()
+
+ # could sha it, or check stats
+
+ # test diff
+ # resetting the head will leave the index in a different state, and the
+ # diff will yield a few changes
+ cur_head_commit = rw_repo.head.reference.commit
+ ref = rw_repo.head.reset('HEAD~6', index=True, working_tree=False)
+
+ # diff against same index is 0
+ diff = index.diff()
+ assert len(diff) == 0
+
+ # against HEAD as string, must be the same as it matches index
+ diff = index.diff('HEAD')
+ assert len(diff) == 0
+
+ # against previous head, there must be a difference
+ diff = index.diff(cur_head_commit)
+ assert len(diff)
+
+ # we reverse the result
+ adiff = index.diff(str(cur_head_commit), R=True)
+ odiff = index.diff(cur_head_commit, R=False) # now its not reversed anymore
+ assert adiff != odiff
+ assert odiff == diff # both unreversed diffs against HEAD
+
+ # against working copy - its still at cur_commit
+ wdiff = index.diff(None)
+ assert wdiff != adiff
+ assert wdiff != odiff
+
+ # against something unusual
+ self.failUnlessRaises(ValueError, index.diff, int)
+
+ # adjust the index to match an old revision
+ cur_branch = rw_repo.active_branch
+ cur_commit = cur_branch.commit
+ rev_head_parent = 'HEAD~1'
+ assert index.reset(rev_head_parent) is index
+
+ assert cur_branch == rw_repo.active_branch
+ assert cur_commit == rw_repo.head.commit
+
+ # there must be differences towards the working tree which is in the 'future'
+ assert index.diff(None)
+
+ # reset the working copy as well to current head,to pull 'back' as well
+ new_data = "will be reverted"
+ file_path = os.path.join(rw_repo.working_tree_dir, "CHANGES")
+ fp = open(file_path, "wb")
+ fp.write(new_data)
+ fp.close()
+ index.reset(rev_head_parent, working_tree=True)
+ assert not index.diff(None)
+ assert cur_branch == rw_repo.active_branch
+ assert cur_commit == rw_repo.head.commit
+ fp = open(file_path,'rb')
+ try:
+ assert fp.read() != new_data
+ finally:
+ fp.close()
+
+ # test full checkout
+ test_file = os.path.join(rw_repo.working_tree_dir, "CHANGES")
+ open(test_file, 'ab').write("some data")
+ rval = index.checkout(None, force=True, fprogress=self._fprogress)
+ assert 'CHANGES' in list(rval)
+ self._assert_fprogress([None])
+ assert os.path.isfile(test_file)
+
+ os.remove(test_file)
+ rval = index.checkout(None, force=False, fprogress=self._fprogress)
+ assert 'CHANGES' in list(rval)
+ self._assert_fprogress([None])
+ assert os.path.isfile(test_file)
+
+ # individual file
+ os.remove(test_file)
+ rval = index.checkout(test_file, fprogress=self._fprogress)
+ assert list(rval)[0] == 'CHANGES'
+ self._assert_fprogress([test_file])
+ assert os.path.exists(test_file)
+
+ # checking out non-existing file throws
+ self.failUnlessRaises(CheckoutError, index.checkout, "doesnt_exist_ever.txt.that")
+ self.failUnlessRaises(CheckoutError, index.checkout, paths=["doesnt/exist"])
+
+ # checkout file with modifications
+ append_data = "hello"
+ fp = open(test_file, "ab")
+ fp.write(append_data)
+ fp.close()
+ try:
+ index.checkout(test_file)
+ except CheckoutError, e:
+ assert len(e.failed_files) == 1 and e.failed_files[0] == os.path.basename(test_file)
+ assert (len(e.failed_files) == len(e.failed_reasons)) and isinstance(e.failed_reasons[0], basestring)
+ assert len(e.valid_files) == 0
+ assert open(test_file).read().endswith(append_data)
+ else:
+ raise AssertionError("Exception CheckoutError not thrown")
+
+ # if we force it it should work
+ index.checkout(test_file, force=True)
+ assert not open(test_file).read().endswith(append_data)
+
+ # checkout directory
+ shutil.rmtree(os.path.join(rw_repo.working_tree_dir, "lib"))
+ rval = index.checkout('lib')
+ assert len(list(rval)) > 1
+
+ def _count_existing(self, repo, files):
+ """
+ Returns count of files that actually exist in the repository directory.
+ """
+ existing = 0
+ basedir = repo.working_tree_dir
+ for f in files:
+ existing += os.path.isfile(os.path.join(basedir, f))
+ # END for each deleted file
+ return existing
+ # END num existing helper
+
+ @with_rw_repo('0.1.6')
+ def test_index_mutation(self, rw_repo):
+ index = rw_repo.index
+ num_entries = len(index.entries)
+ cur_head = rw_repo.head
+
+ # remove all of the files, provide a wild mix of paths, BaseIndexEntries,
+ # IndexEntries
+ def mixed_iterator():
+ count = 0
+ for entry in index.entries.itervalues():
+ type_id = count % 4
+ if type_id == 0: # path
+ yield entry.path
+ elif type_id == 1: # blob
+ yield Blob(rw_repo, entry.sha, entry.mode, entry.path)
+ elif type_id == 2: # BaseIndexEntry
+ yield BaseIndexEntry(entry[:4])
+ elif type_id == 3: # IndexEntry
+ yield entry
+ else:
+ raise AssertionError("Invalid Type")
+ count += 1
+ # END for each entry
+ # END mixed iterator
+ deleted_files = index.remove(mixed_iterator(), working_tree=False)
+ assert deleted_files
+ assert self._count_existing(rw_repo, deleted_files) == len(deleted_files)
+ assert len(index.entries) == 0
+
+ # reset the index to undo our changes
+ index.reset()
+ assert len(index.entries) == num_entries
+
+ # remove with working copy
+ deleted_files = index.remove(mixed_iterator(), working_tree=True)
+ assert deleted_files
+ assert self._count_existing(rw_repo, deleted_files) == 0
+
+ # reset everything
+ index.reset(working_tree=True)
+ assert self._count_existing(rw_repo, deleted_files) == len(deleted_files)
+
+ # invalid type
+ self.failUnlessRaises(TypeError, index.remove, [1])
+
+ # absolute path
+ deleted_files = index.remove([os.path.join(rw_repo.working_tree_dir,"lib")], r=True)
+ assert len(deleted_files) > 1
+ self.failUnlessRaises(ValueError, index.remove, ["/doesnt/exists"])
+
+ # TEST COMMITTING
+ # commit changed index
+ cur_commit = cur_head.commit
+ commit_message = "commit default head"
+
+ new_commit = index.commit(commit_message, head=False)
+ assert new_commit.message == commit_message
+ assert new_commit.parents[0] == cur_commit
+ assert len(new_commit.parents) == 1
+ assert cur_head.commit == cur_commit
+
+ # same index, no parents
+ commit_message = "index without parents"
+ commit_no_parents = index.commit(commit_message, parent_commits=list(), head=True)
+ assert commit_no_parents.message == commit_message
+ assert len(commit_no_parents.parents) == 0
+ assert cur_head.commit == commit_no_parents
+
+ # same index, multiple parents
+ commit_message = "Index with multiple parents\n commit with another line"
+ commit_multi_parent = index.commit(commit_message,parent_commits=(commit_no_parents, new_commit))
+ assert commit_multi_parent.message == commit_message
+ assert len(commit_multi_parent.parents) == 2
+ assert commit_multi_parent.parents[0] == commit_no_parents
+ assert commit_multi_parent.parents[1] == new_commit
+ assert cur_head.commit == commit_multi_parent
+
+ # re-add all files in lib
+ # get the lib folder back on disk, but get an index without it
+ index.reset(new_commit.parents[0], working_tree=True).reset(new_commit, working_tree=False)
+ lib_file_path = "lib/git/__init__.py"
+ assert (lib_file_path, 0) not in index.entries
+ assert os.path.isfile(os.path.join(rw_repo.working_tree_dir, lib_file_path))
+
+ # directory
+ entries = index.add(['lib'], fprogress=self._fprogress_add)
+ self._assert_fprogress(entries)
+ assert len(entries)>1
+
+ # glob
+ entries = index.reset(new_commit).add(['lib/git/*.py'], fprogress=self._fprogress_add)
+ self._assert_fprogress(entries)
+ assert len(entries) == 14
+
+ # same file
+ entries = index.reset(new_commit).add(['lib/git/head.py']*2, fprogress=self._fprogress_add)
+ # would fail, test is too primitive to handle this case
+ # self._assert_fprogress(entries)
+ self._reset_progress()
+ assert len(entries) == 2
+
+ # missing path
+ self.failUnlessRaises(GitCommandError, index.reset(new_commit).add, ['doesnt/exist/must/raise'])
+
+ # blob from older revision overrides current index revision
+ old_blob = new_commit.parents[0].tree.blobs[0]
+ entries = index.reset(new_commit).add([old_blob], fprogress=self._fprogress_add)
+ self._assert_fprogress(entries)
+ assert index.entries[(old_blob.path,0)].sha == old_blob.sha and len(entries) == 1
+
+ # mode 0 not allowed
+ null_sha = "0"*40
+ self.failUnlessRaises(ValueError, index.reset(new_commit).add, [BaseIndexEntry((0, null_sha,0,"doesntmatter"))])
+
+ # add new file
+ new_file_relapath = "my_new_file"
+ new_file_path = self._make_file(new_file_relapath, "hello world", rw_repo)
+ entries = index.reset(new_commit).add([BaseIndexEntry((010644, null_sha, 0, new_file_relapath))], fprogress=self._fprogress_add)
+ self._assert_fprogress(entries)
+ assert len(entries) == 1 and entries[0].sha != null_sha
+
+ # add symlink
+ if sys.platform != "win32":
+ link_file = os.path.join(rw_repo.working_tree_dir, "my_real_symlink")
+ os.symlink("/etc/that", link_file)
+ entries = index.reset(new_commit).add([link_file], fprogress=self._fprogress_add)
+ self._assert_fprogress(entries)
+ assert len(entries) == 1 and S_ISLNK(entries[0].mode)
+ # END real symlink test
+
+ # add fake symlink and assure it checks-our as symlink
+ fake_symlink_relapath = "my_fake_symlink"
+ link_target = "/etc/that"
+ fake_symlink_path = self._make_file(fake_symlink_relapath, link_target, rw_repo)
+ fake_entry = BaseIndexEntry((0120000, null_sha, 0, fake_symlink_relapath))
+ entries = index.reset(new_commit).add([fake_entry], fprogress=self._fprogress_add)
+ self._assert_fprogress(entries)
+ assert entries[0].sha != null_sha
+ assert len(entries) == 1 and S_ISLNK(entries[0].mode)
+
+ # assure this also works with an alternate method
+ full_index_entry = IndexEntry.from_base(BaseIndexEntry((0120000, entries[0].sha, 0, entries[0].path)))
+ entry_key = index.get_entries_key(full_index_entry)
+ index.reset(new_commit)
+
+ assert entry_key not in index.entries
+ index.entries[entry_key] = full_index_entry
+ index.write()
+ index.update() # force reread of entries
+ new_entry = index.entries[entry_key]
+ assert S_ISLNK(new_entry.mode)
+
+ # a tree created from this should contain the symlink
+ tree = index.write_tree(True)
+ assert fake_symlink_relapath in tree
+
+ # checkout the fakelink, should be a link then
+ assert not S_ISLNK(os.stat(fake_symlink_path)[ST_MODE])
+ os.remove(fake_symlink_path)
+ index.checkout(fake_symlink_path)
+
+ # on windows we will never get symlinks
+ if os.name == 'nt':
+ # simlinks should contain the link as text ( which is what a
+ # symlink actually is )
+ open(fake_symlink_path,'rb').read() == link_target
+ else:
+ assert S_ISLNK(os.lstat(fake_symlink_path)[ST_MODE])
+
+ # TEST RENAMING
+ def assert_mv_rval(rval):
+ for source, dest in rval:
+ assert not os.path.exists(source) and os.path.exists(dest)
+ # END for each renamed item
+ # END move assertion utility
+
+ self.failUnlessRaises(ValueError, index.move, ['just_one_path'])
+ # file onto existing file
+ files = ['AUTHORS', 'LICENSE']
+ self.failUnlessRaises(GitCommandError, index.move, files)
+
+ # again, with force
+ assert_mv_rval(index.move(files, force=True))
+
+ # files into directory - dry run
+ paths = ['LICENSE', 'VERSION', 'doc']
+ rval = index.move(paths, dry_run=True)
+ assert len(rval) == 2
+ assert os.path.exists(paths[0])
+
+ # again, no dry run
+ rval = index.move(paths)
+ assert_mv_rval(rval)
+
+ # dir into dir
+ rval = index.move(['doc', 'test'])
+ assert_mv_rval(rval)
diff --git a/test/git/test_performance.py b/test/git/test_performance.py
index e1dfc42c..588f1e14 100644
--- a/test/git/test_performance.py
+++ b/test/git/test_performance.py
@@ -10,40 +10,40 @@ from time import time
class TestPerformance(TestBase):
- def _query_commit_info(self, c):
- c.author
- c.authored_date
- c.committer
- c.committed_date
- c.message
-
- def test_iteration(self):
- num_objs = 0
- num_commits = 0
-
- # find the first commit containing the given path - always do a full
- # iteration ( restricted to the path in question ), but in fact it should
- # return quite a lot of commits, we just take one and hence abort the operation
-
- st = time()
- for c in self.rorepo.iter_commits('0.1.6'):
- num_commits += 1
- self._query_commit_info(c)
- for obj in c.tree.traverse():
- obj.size
- num_objs += 1
- # END for each object
- # END for each commit
- elapsed_time = time() - st
- print "Traversed %i Trees and a total of %i unchached objects in %s [s] ( %f objs/s )" % (num_commits, num_objs, elapsed_time, num_objs/elapsed_time)
-
- def test_commit_traversal(self):
- num_commits = 0
-
- st = time()
- for c in self.rorepo.commit('0.1.6').traverse(branch_first=False):
- num_commits += 1
- self._query_commit_info(c)
- # END for each traversed commit
- elapsed_time = time() - st
- print "Traversed %i Commits in %s [s] ( %f commits/s )" % (num_commits, elapsed_time, num_commits/elapsed_time)
+ def _query_commit_info(self, c):
+ c.author
+ c.authored_date
+ c.committer
+ c.committed_date
+ c.message
+
+ def test_iteration(self):
+ num_objs = 0
+ num_commits = 0
+
+ # find the first commit containing the given path - always do a full
+ # iteration ( restricted to the path in question ), but in fact it should
+ # return quite a lot of commits, we just take one and hence abort the operation
+
+ st = time()
+ for c in self.rorepo.iter_commits('0.1.6'):
+ num_commits += 1
+ self._query_commit_info(c)
+ for obj in c.tree.traverse():
+ obj.size
+ num_objs += 1
+ # END for each object
+ # END for each commit
+ elapsed_time = time() - st
+ print "Traversed %i Trees and a total of %i unchached objects in %s [s] ( %f objs/s )" % (num_commits, num_objs, elapsed_time, num_objs/elapsed_time)
+
+ def test_commit_traversal(self):
+ num_commits = 0
+
+ st = time()
+ for c in self.rorepo.commit('0.1.6').traverse(branch_first=False):
+ num_commits += 1
+ self._query_commit_info(c)
+ # END for each traversed commit
+ elapsed_time = time() - st
+ print "Traversed %i Commits in %s [s] ( %f commits/s )" % (num_commits, elapsed_time, num_commits/elapsed_time)
diff --git a/test/git/test_refs.py b/test/git/test_refs.py
index 61c421fd..3d85356f 100644
--- a/test/git/test_refs.py
+++ b/test/git/test_refs.py
@@ -14,364 +14,364 @@ import os
class TestRefs(TestBase):
- def test_tag_base(self):
- tag_object_refs = list()
- for tag in self.rorepo.tags:
- assert "refs/tags" in tag.path
- assert tag.name
- assert isinstance( tag.commit, Commit )
- if tag.tag is not None:
- tag_object_refs.append( tag )
- tagobj = tag.tag
- assert isinstance( tagobj, TagObject )
- assert tagobj.tag == tag.name
- assert isinstance( tagobj.tagger, Actor )
- assert isinstance( tagobj.tagged_date, int )
- assert tagobj.message
- # END if we have a tag object
- # END for tag in repo-tags
- assert tag_object_refs
- assert isinstance(self.rorepo.tags['0.1.5'], TagReference)
-
- def test_tags(self):
- # tag refs can point to tag objects or to commits
- s = set()
- ref_count = 0
- for ref in chain(self.rorepo.tags, self.rorepo.heads):
- ref_count += 1
- assert isinstance(ref, refs.Reference)
- assert str(ref) == ref.name
- assert repr(ref)
- assert ref == ref
- assert not ref != ref
- s.add(ref)
- # END for each ref
- assert len(s) == ref_count
- assert len(s|s) == ref_count
-
- def test_heads(self):
- for head in self.rorepo.heads:
- assert head.name
- assert head.path
- assert "refs/heads" in head.path
- prev_object = head.object
- cur_object = head.object
- assert prev_object == cur_object # represent the same git object
- assert prev_object is not cur_object # but are different instances
- # END for each head
-
- def test_refs(self):
- types_found = set()
- for ref in self.rorepo.refs:
- types_found.add(type(ref))
- assert len(types_found) == 3
-
- def test_is_valid(self):
- assert Reference(self.rorepo, 'refs/doesnt/exist').is_valid() == False
- assert self.rorepo.head.is_valid()
- assert self.rorepo.head.reference.is_valid()
- assert SymbolicReference(self.rorepo, 'hellothere').is_valid() == False
-
- @with_rw_repo('0.1.6')
- def test_head_reset(self, rw_repo):
- cur_head = rw_repo.head
- new_head_commit = cur_head.ref.commit.parents[0]
- cur_head.reset(new_head_commit, index=True) # index only
- assert cur_head.reference.commit == new_head_commit
-
- self.failUnlessRaises(ValueError, cur_head.reset, new_head_commit, index=False, working_tree=True)
- new_head_commit = new_head_commit.parents[0]
- cur_head.reset(new_head_commit, index=True, working_tree=True) # index + wt
- assert cur_head.reference.commit == new_head_commit
-
- # paths
- cur_head.reset(new_head_commit, paths = "lib")
-
-
- # now that we have a write write repo, change the HEAD reference - its
- # like git-reset --soft
- heads = rw_repo.heads
- assert heads
- for head in heads:
- cur_head.reference = head
- assert cur_head.reference == head
- assert isinstance(cur_head.reference, Head)
- assert cur_head.commit == head.commit
- assert not cur_head.is_detached
- # END for each head
-
- # detach
- active_head = heads[0]
- curhead_commit = active_head.commit
- cur_head.reference = curhead_commit
- assert cur_head.commit == curhead_commit
- assert cur_head.is_detached
- self.failUnlessRaises(TypeError, getattr, cur_head, "reference")
-
- # tags are references, hence we can point to them
- some_tag = rw_repo.tags[0]
- cur_head.reference = some_tag
- assert not cur_head.is_detached
- assert cur_head.commit == some_tag.commit
- assert isinstance(cur_head.reference, TagReference)
-
- # put HEAD back to a real head, otherwise everything else fails
- cur_head.reference = active_head
-
- # type check
- self.failUnlessRaises(ValueError, setattr, cur_head, "reference", "that")
-
- # head handling
- commit = 'HEAD'
- prev_head_commit = cur_head.commit
- for count, new_name in enumerate(("my_new_head", "feature/feature1")):
- actual_commit = commit+"^"*count
- new_head = Head.create(rw_repo, new_name, actual_commit)
- assert cur_head.commit == prev_head_commit
- assert isinstance(new_head, Head)
- # already exists
- self.failUnlessRaises(GitCommandError, Head.create, rw_repo, new_name)
-
- # force it
- new_head = Head.create(rw_repo, new_name, actual_commit, force=True)
- old_path = new_head.path
- old_name = new_head.name
-
- assert new_head.rename("hello").name == "hello"
- assert new_head.rename("hello/world").name == "hello/world"
- assert new_head.rename(old_name).name == old_name and new_head.path == old_path
-
- # rename with force
- tmp_head = Head.create(rw_repo, "tmphead")
- self.failUnlessRaises(GitCommandError, tmp_head.rename, new_head)
- tmp_head.rename(new_head, force=True)
- assert tmp_head == new_head and tmp_head.object == new_head.object
-
- Head.delete(rw_repo, tmp_head)
- heads = rw_repo.heads
- assert tmp_head not in heads and new_head not in heads
- # force on deletion testing would be missing here, code looks okay though ;)
- # END for each new head name
- self.failUnlessRaises(TypeError, RemoteReference.create, rw_repo, "some_name")
-
- # tag ref
- tag_name = "1.0.2"
- light_tag = TagReference.create(rw_repo, tag_name)
- self.failUnlessRaises(GitCommandError, TagReference.create, rw_repo, tag_name)
- light_tag = TagReference.create(rw_repo, tag_name, "HEAD~1", force = True)
- assert isinstance(light_tag, TagReference)
- assert light_tag.name == tag_name
- assert light_tag.commit == cur_head.commit.parents[0]
- assert light_tag.tag is None
-
- # tag with tag object
- other_tag_name = "releases/1.0.2RC"
- msg = "my mighty tag\nsecond line"
- obj_tag = TagReference.create(rw_repo, other_tag_name, message=msg)
- assert isinstance(obj_tag, TagReference)
- assert obj_tag.name == other_tag_name
- assert obj_tag.commit == cur_head.commit
- assert obj_tag.tag is not None
-
- TagReference.delete(rw_repo, light_tag, obj_tag)
- tags = rw_repo.tags
- assert light_tag not in tags and obj_tag not in tags
-
- # remote deletion
- remote_refs_so_far = 0
- remotes = rw_repo.remotes
- assert remotes
- for remote in remotes:
- refs = remote.refs
- RemoteReference.delete(rw_repo, *refs)
- remote_refs_so_far += len(refs)
- # END for each ref to delete
- assert remote_refs_so_far
-
- for remote in remotes:
- # remotes without references throw
- self.failUnlessRaises(AssertionError, getattr, remote, 'refs')
- # END for each remote
-
- # change where the active head points to
- if cur_head.is_detached:
- cur_head.reference = rw_repo.heads[0]
-
- head = cur_head.reference
- old_commit = head.commit
- head.commit = old_commit.parents[0]
- assert head.commit == old_commit.parents[0]
- assert head.commit == cur_head.commit
- head.commit = old_commit
-
- # setting a non-commit as commit fails, but succeeds as object
- head_tree = head.commit.tree
- self.failUnlessRaises(ValueError, setattr, head, 'commit', head_tree)
- assert head.commit == old_commit # and the ref did not change
- self.failUnlessRaises(GitCommandError, setattr, head, 'object', head_tree)
-
- # set the commit directly using the head. This would never detach the head
- assert not cur_head.is_detached
- head.object = old_commit
- cur_head.reference = head.commit
- assert cur_head.is_detached
- parent_commit = head.commit.parents[0]
- assert cur_head.is_detached
- cur_head.commit = parent_commit
- assert cur_head.is_detached and cur_head.commit == parent_commit
-
- cur_head.reference = head
- assert not cur_head.is_detached
- cur_head.commit = parent_commit
- assert not cur_head.is_detached
- assert head.commit == parent_commit
-
- # test checkout
- active_branch = rw_repo.active_branch
- for head in rw_repo.heads:
- checked_out_head = head.checkout()
- assert checked_out_head == head
- # END for each head to checkout
-
- # checkout with branch creation
- new_head = active_branch.checkout(b="new_head")
- assert active_branch != rw_repo.active_branch
- assert new_head == rw_repo.active_branch
-
- # checkout with force as we have a changed a file
- # clear file
- open(new_head.commit.tree.blobs[-1].abspath,'w').close()
- assert len(new_head.commit.diff(None))
-
- # create a new branch that is likely to touch the file we changed
- far_away_head = rw_repo.create_head("far_head",'HEAD~100')
- self.failUnlessRaises(GitCommandError, far_away_head.checkout)
- assert active_branch == active_branch.checkout(force=True)
- assert rw_repo.head.reference != far_away_head
-
- # test reference creation
- partial_ref = 'sub/ref'
- full_ref = 'refs/%s' % partial_ref
- ref = Reference.create(rw_repo, partial_ref)
- assert ref.path == full_ref
- assert ref.object == rw_repo.head.commit
-
- self.failUnlessRaises(OSError, Reference.create, rw_repo, full_ref, 'HEAD~20')
- # it works if it is at the same spot though and points to the same reference
- assert Reference.create(rw_repo, full_ref, 'HEAD').path == full_ref
- Reference.delete(rw_repo, full_ref)
-
- # recreate the reference using a full_ref
- ref = Reference.create(rw_repo, full_ref)
- assert ref.path == full_ref
- assert ref.object == rw_repo.head.commit
-
- # recreate using force
- ref = Reference.create(rw_repo, partial_ref, 'HEAD~1', force=True)
- assert ref.path == full_ref
- assert ref.object == rw_repo.head.commit.parents[0]
-
- # rename it
- orig_obj = ref.object
- for name in ('refs/absname', 'rela_name', 'feature/rela_name'):
- ref_new_name = ref.rename(name)
- assert isinstance(ref_new_name, Reference)
- assert name in ref_new_name.path
- assert ref_new_name.object == orig_obj
- assert ref_new_name == ref
- # END for each name type
-
- # References that don't exist trigger an error if we want to access them
- self.failUnlessRaises(ValueError, getattr, Reference(rw_repo, "refs/doesntexist"), 'commit')
-
- # exists, fail unless we force
- ex_ref_path = far_away_head.path
- self.failUnlessRaises(OSError, ref.rename, ex_ref_path)
- # if it points to the same commit it works
- far_away_head.commit = ref.commit
- ref.rename(ex_ref_path)
- assert ref.path == ex_ref_path and ref.object == orig_obj
- assert ref.rename(ref.path).path == ex_ref_path # rename to same name
-
- # create symbolic refs
- symref_path = "symrefs/sym"
- symref = SymbolicReference.create(rw_repo, symref_path, cur_head.reference)
- assert symref.path == symref_path
- assert symref.reference == cur_head.reference
-
- self.failUnlessRaises(OSError, SymbolicReference.create, rw_repo, symref_path, cur_head.reference.commit)
- # it works if the new ref points to the same reference
- SymbolicReference.create(rw_repo, symref.path, symref.reference).path == symref.path
- SymbolicReference.delete(rw_repo, symref)
- # would raise if the symref wouldn't have been deletedpbl
- symref = SymbolicReference.create(rw_repo, symref_path, cur_head.reference)
-
- # test symbolic references which are not at default locations like HEAD
- # or FETCH_HEAD - they may also be at spots in refs of course
- symbol_ref_path = "refs/symbol_ref"
- symref = SymbolicReference(rw_repo, symbol_ref_path)
- assert symref.path == symbol_ref_path
- symbol_ref_abspath = os.path.join(rw_repo.git_dir, symref.path)
-
- # set it
- symref.reference = new_head
- assert symref.reference == new_head
- assert os.path.isfile(symbol_ref_abspath)
- assert symref.commit == new_head.commit
-
- for name in ('absname','folder/rela_name'):
- symref_new_name = symref.rename(name)
- assert isinstance(symref_new_name, SymbolicReference)
- assert name in symref_new_name.path
- assert symref_new_name.reference == new_head
- assert symref_new_name == symref
- assert not symref.is_detached
- # END for each ref
-
- # create a new non-head ref just to be sure we handle it even if packed
- Reference.create(rw_repo, full_ref)
-
- # test ref listing - assure we have packed refs
- rw_repo.git.pack_refs(all=True, prune=True)
- heads = rw_repo.heads
- assert heads
- assert new_head in heads
- assert active_branch in heads
- assert rw_repo.tags
-
- # we should be able to iterate all symbolic refs as well - in that case
- # we should expect only symbolic references to be returned
- for symref in SymbolicReference.iter_items(rw_repo):
- assert not symref.is_detached
-
- # when iterating references, we can get references and symrefs
- # when deleting all refs, I'd expect them to be gone ! Even from
- # the packed ones
- # For this to work, we must not be on any branch
- rw_repo.head.reference = rw_repo.head.commit
- deleted_refs = set()
- for ref in Reference.iter_items(rw_repo):
- if ref.is_detached:
- ref.delete(rw_repo, ref)
- deleted_refs.add(ref)
- # END delete ref
- # END for each ref to iterate and to delete
- assert deleted_refs
-
- for ref in Reference.iter_items(rw_repo):
- if ref.is_detached:
- assert ref not in deleted_refs
- # END for each ref
-
- # reattach head - head will not be returned if it is not a symbolic
- # ref
- rw_repo.head.reference = Head.create(rw_repo, "master")
-
- # At least the head should still exist
- assert os.path.isfile(os.path.join(rw_repo.git_dir, 'HEAD'))
- refs = list(SymbolicReference.iter_items(rw_repo))
- assert len(refs) == 1
-
-
-
-
+ def test_tag_base(self):
+ tag_object_refs = list()
+ for tag in self.rorepo.tags:
+ assert "refs/tags" in tag.path
+ assert tag.name
+ assert isinstance( tag.commit, Commit )
+ if tag.tag is not None:
+ tag_object_refs.append( tag )
+ tagobj = tag.tag
+ assert isinstance( tagobj, TagObject )
+ assert tagobj.tag == tag.name
+ assert isinstance( tagobj.tagger, Actor )
+ assert isinstance( tagobj.tagged_date, int )
+ assert tagobj.message
+ # END if we have a tag object
+ # END for tag in repo-tags
+ assert tag_object_refs
+ assert isinstance(self.rorepo.tags['0.1.5'], TagReference)
+
+ def test_tags(self):
+ # tag refs can point to tag objects or to commits
+ s = set()
+ ref_count = 0
+ for ref in chain(self.rorepo.tags, self.rorepo.heads):
+ ref_count += 1
+ assert isinstance(ref, refs.Reference)
+ assert str(ref) == ref.name
+ assert repr(ref)
+ assert ref == ref
+ assert not ref != ref
+ s.add(ref)
+ # END for each ref
+ assert len(s) == ref_count
+ assert len(s|s) == ref_count
+
+ def test_heads(self):
+ for head in self.rorepo.heads:
+ assert head.name
+ assert head.path
+ assert "refs/heads" in head.path
+ prev_object = head.object
+ cur_object = head.object
+ assert prev_object == cur_object # represent the same git object
+ assert prev_object is not cur_object # but are different instances
+ # END for each head
+
+ def test_refs(self):
+ types_found = set()
+ for ref in self.rorepo.refs:
+ types_found.add(type(ref))
+ assert len(types_found) == 3
+
+ def test_is_valid(self):
+ assert Reference(self.rorepo, 'refs/doesnt/exist').is_valid() == False
+ assert self.rorepo.head.is_valid()
+ assert self.rorepo.head.reference.is_valid()
+ assert SymbolicReference(self.rorepo, 'hellothere').is_valid() == False
+
+ @with_rw_repo('0.1.6')
+ def test_head_reset(self, rw_repo):
+ cur_head = rw_repo.head
+ new_head_commit = cur_head.ref.commit.parents[0]
+ cur_head.reset(new_head_commit, index=True) # index only
+ assert cur_head.reference.commit == new_head_commit
+
+ self.failUnlessRaises(ValueError, cur_head.reset, new_head_commit, index=False, working_tree=True)
+ new_head_commit = new_head_commit.parents[0]
+ cur_head.reset(new_head_commit, index=True, working_tree=True) # index + wt
+ assert cur_head.reference.commit == new_head_commit
+
+ # paths
+ cur_head.reset(new_head_commit, paths = "lib")
+
+
+ # now that we have a write write repo, change the HEAD reference - its
+ # like git-reset --soft
+ heads = rw_repo.heads
+ assert heads
+ for head in heads:
+ cur_head.reference = head
+ assert cur_head.reference == head
+ assert isinstance(cur_head.reference, Head)
+ assert cur_head.commit == head.commit
+ assert not cur_head.is_detached
+ # END for each head
+
+ # detach
+ active_head = heads[0]
+ curhead_commit = active_head.commit
+ cur_head.reference = curhead_commit
+ assert cur_head.commit == curhead_commit
+ assert cur_head.is_detached
+ self.failUnlessRaises(TypeError, getattr, cur_head, "reference")
+
+ # tags are references, hence we can point to them
+ some_tag = rw_repo.tags[0]
+ cur_head.reference = some_tag
+ assert not cur_head.is_detached
+ assert cur_head.commit == some_tag.commit
+ assert isinstance(cur_head.reference, TagReference)
+
+ # put HEAD back to a real head, otherwise everything else fails
+ cur_head.reference = active_head
+
+ # type check
+ self.failUnlessRaises(ValueError, setattr, cur_head, "reference", "that")
+
+ # head handling
+ commit = 'HEAD'
+ prev_head_commit = cur_head.commit
+ for count, new_name in enumerate(("my_new_head", "feature/feature1")):
+ actual_commit = commit+"^"*count
+ new_head = Head.create(rw_repo, new_name, actual_commit)
+ assert cur_head.commit == prev_head_commit
+ assert isinstance(new_head, Head)
+ # already exists
+ self.failUnlessRaises(GitCommandError, Head.create, rw_repo, new_name)
+
+ # force it
+ new_head = Head.create(rw_repo, new_name, actual_commit, force=True)
+ old_path = new_head.path
+ old_name = new_head.name
+
+ assert new_head.rename("hello").name == "hello"
+ assert new_head.rename("hello/world").name == "hello/world"
+ assert new_head.rename(old_name).name == old_name and new_head.path == old_path
+
+ # rename with force
+ tmp_head = Head.create(rw_repo, "tmphead")
+ self.failUnlessRaises(GitCommandError, tmp_head.rename, new_head)
+ tmp_head.rename(new_head, force=True)
+ assert tmp_head == new_head and tmp_head.object == new_head.object
+
+ Head.delete(rw_repo, tmp_head)
+ heads = rw_repo.heads
+ assert tmp_head not in heads and new_head not in heads
+ # force on deletion testing would be missing here, code looks okay though ;)
+ # END for each new head name
+ self.failUnlessRaises(TypeError, RemoteReference.create, rw_repo, "some_name")
+
+ # tag ref
+ tag_name = "1.0.2"
+ light_tag = TagReference.create(rw_repo, tag_name)
+ self.failUnlessRaises(GitCommandError, TagReference.create, rw_repo, tag_name)
+ light_tag = TagReference.create(rw_repo, tag_name, "HEAD~1", force = True)
+ assert isinstance(light_tag, TagReference)
+ assert light_tag.name == tag_name
+ assert light_tag.commit == cur_head.commit.parents[0]
+ assert light_tag.tag is None
+
+ # tag with tag object
+ other_tag_name = "releases/1.0.2RC"
+ msg = "my mighty tag\nsecond line"
+ obj_tag = TagReference.create(rw_repo, other_tag_name, message=msg)
+ assert isinstance(obj_tag, TagReference)
+ assert obj_tag.name == other_tag_name
+ assert obj_tag.commit == cur_head.commit
+ assert obj_tag.tag is not None
+
+ TagReference.delete(rw_repo, light_tag, obj_tag)
+ tags = rw_repo.tags
+ assert light_tag not in tags and obj_tag not in tags
+
+ # remote deletion
+ remote_refs_so_far = 0
+ remotes = rw_repo.remotes
+ assert remotes
+ for remote in remotes:
+ refs = remote.refs
+ RemoteReference.delete(rw_repo, *refs)
+ remote_refs_so_far += len(refs)
+ # END for each ref to delete
+ assert remote_refs_so_far
+
+ for remote in remotes:
+ # remotes without references throw
+ self.failUnlessRaises(AssertionError, getattr, remote, 'refs')
+ # END for each remote
+
+ # change where the active head points to
+ if cur_head.is_detached:
+ cur_head.reference = rw_repo.heads[0]
+
+ head = cur_head.reference
+ old_commit = head.commit
+ head.commit = old_commit.parents[0]
+ assert head.commit == old_commit.parents[0]
+ assert head.commit == cur_head.commit
+ head.commit = old_commit
+
+ # setting a non-commit as commit fails, but succeeds as object
+ head_tree = head.commit.tree
+ self.failUnlessRaises(ValueError, setattr, head, 'commit', head_tree)
+ assert head.commit == old_commit # and the ref did not change
+ self.failUnlessRaises(GitCommandError, setattr, head, 'object', head_tree)
+
+ # set the commit directly using the head. This would never detach the head
+ assert not cur_head.is_detached
+ head.object = old_commit
+ cur_head.reference = head.commit
+ assert cur_head.is_detached
+ parent_commit = head.commit.parents[0]
+ assert cur_head.is_detached
+ cur_head.commit = parent_commit
+ assert cur_head.is_detached and cur_head.commit == parent_commit
+
+ cur_head.reference = head
+ assert not cur_head.is_detached
+ cur_head.commit = parent_commit
+ assert not cur_head.is_detached
+ assert head.commit == parent_commit
+
+ # test checkout
+ active_branch = rw_repo.active_branch
+ for head in rw_repo.heads:
+ checked_out_head = head.checkout()
+ assert checked_out_head == head
+ # END for each head to checkout
+
+ # checkout with branch creation
+ new_head = active_branch.checkout(b="new_head")
+ assert active_branch != rw_repo.active_branch
+ assert new_head == rw_repo.active_branch
+
+ # checkout with force as we have a changed a file
+ # clear file
+ open(new_head.commit.tree.blobs[-1].abspath,'w').close()
+ assert len(new_head.commit.diff(None))
+
+ # create a new branch that is likely to touch the file we changed
+ far_away_head = rw_repo.create_head("far_head",'HEAD~100')
+ self.failUnlessRaises(GitCommandError, far_away_head.checkout)
+ assert active_branch == active_branch.checkout(force=True)
+ assert rw_repo.head.reference != far_away_head
+
+ # test reference creation
+ partial_ref = 'sub/ref'
+ full_ref = 'refs/%s' % partial_ref
+ ref = Reference.create(rw_repo, partial_ref)
+ assert ref.path == full_ref
+ assert ref.object == rw_repo.head.commit
+
+ self.failUnlessRaises(OSError, Reference.create, rw_repo, full_ref, 'HEAD~20')
+ # it works if it is at the same spot though and points to the same reference
+ assert Reference.create(rw_repo, full_ref, 'HEAD').path == full_ref
+ Reference.delete(rw_repo, full_ref)
+
+ # recreate the reference using a full_ref
+ ref = Reference.create(rw_repo, full_ref)
+ assert ref.path == full_ref
+ assert ref.object == rw_repo.head.commit
+
+ # recreate using force
+ ref = Reference.create(rw_repo, partial_ref, 'HEAD~1', force=True)
+ assert ref.path == full_ref
+ assert ref.object == rw_repo.head.commit.parents[0]
+
+ # rename it
+ orig_obj = ref.object
+ for name in ('refs/absname', 'rela_name', 'feature/rela_name'):
+ ref_new_name = ref.rename(name)
+ assert isinstance(ref_new_name, Reference)
+ assert name in ref_new_name.path
+ assert ref_new_name.object == orig_obj
+ assert ref_new_name == ref
+ # END for each name type
+
+ # References that don't exist trigger an error if we want to access them
+ self.failUnlessRaises(ValueError, getattr, Reference(rw_repo, "refs/doesntexist"), 'commit')
+
+ # exists, fail unless we force
+ ex_ref_path = far_away_head.path
+ self.failUnlessRaises(OSError, ref.rename, ex_ref_path)
+ # if it points to the same commit it works
+ far_away_head.commit = ref.commit
+ ref.rename(ex_ref_path)
+ assert ref.path == ex_ref_path and ref.object == orig_obj
+ assert ref.rename(ref.path).path == ex_ref_path # rename to same name
+
+ # create symbolic refs
+ symref_path = "symrefs/sym"
+ symref = SymbolicReference.create(rw_repo, symref_path, cur_head.reference)
+ assert symref.path == symref_path
+ assert symref.reference == cur_head.reference
+
+ self.failUnlessRaises(OSError, SymbolicReference.create, rw_repo, symref_path, cur_head.reference.commit)
+ # it works if the new ref points to the same reference
+ SymbolicReference.create(rw_repo, symref.path, symref.reference).path == symref.path
+ SymbolicReference.delete(rw_repo, symref)
+ # would raise if the symref wouldn't have been deletedpbl
+ symref = SymbolicReference.create(rw_repo, symref_path, cur_head.reference)
+
+ # test symbolic references which are not at default locations like HEAD
+ # or FETCH_HEAD - they may also be at spots in refs of course
+ symbol_ref_path = "refs/symbol_ref"
+ symref = SymbolicReference(rw_repo, symbol_ref_path)
+ assert symref.path == symbol_ref_path
+ symbol_ref_abspath = os.path.join(rw_repo.git_dir, symref.path)
+
+ # set it
+ symref.reference = new_head
+ assert symref.reference == new_head
+ assert os.path.isfile(symbol_ref_abspath)
+ assert symref.commit == new_head.commit
+
+ for name in ('absname','folder/rela_name'):
+ symref_new_name = symref.rename(name)
+ assert isinstance(symref_new_name, SymbolicReference)
+ assert name in symref_new_name.path
+ assert symref_new_name.reference == new_head
+ assert symref_new_name == symref
+ assert not symref.is_detached
+ # END for each ref
+
+ # create a new non-head ref just to be sure we handle it even if packed
+ Reference.create(rw_repo, full_ref)
+
+ # test ref listing - assure we have packed refs
+ rw_repo.git.pack_refs(all=True, prune=True)
+ heads = rw_repo.heads
+ assert heads
+ assert new_head in heads
+ assert active_branch in heads
+ assert rw_repo.tags
+
+ # we should be able to iterate all symbolic refs as well - in that case
+ # we should expect only symbolic references to be returned
+ for symref in SymbolicReference.iter_items(rw_repo):
+ assert not symref.is_detached
+
+ # when iterating references, we can get references and symrefs
+ # when deleting all refs, I'd expect them to be gone ! Even from
+ # the packed ones
+ # For this to work, we must not be on any branch
+ rw_repo.head.reference = rw_repo.head.commit
+ deleted_refs = set()
+ for ref in Reference.iter_items(rw_repo):
+ if ref.is_detached:
+ ref.delete(rw_repo, ref)
+ deleted_refs.add(ref)
+ # END delete ref
+ # END for each ref to iterate and to delete
+ assert deleted_refs
+
+ for ref in Reference.iter_items(rw_repo):
+ if ref.is_detached:
+ assert ref not in deleted_refs
+ # END for each ref
+
+ # reattach head - head will not be returned if it is not a symbolic
+ # ref
+ rw_repo.head.reference = Head.create(rw_repo, "master")
+
+ # At least the head should still exist
+ assert os.path.isfile(os.path.join(rw_repo.git_dir, 'HEAD'))
+ refs = list(SymbolicReference.iter_items(rw_repo))
+ assert len(refs) == 1
+
+
+
+
diff --git a/test/git/test_remote.py b/test/git/test_remote.py
index c240202f..747bb527 100644
--- a/test/git/test_remote.py
+++ b/test/git/test_remote.py
@@ -15,428 +15,428 @@ import random
random.seed(0)
class TestRemoteProgress(RemoteProgress):
- __slots__ = ( "_seen_lines", "_stages_per_op", '_num_progress_messages' )
- def __init__(self):
- super(TestRemoteProgress, self).__init__()
- self._seen_lines = list()
- self._stages_per_op = dict()
- self._num_progress_messages = 0
-
- def _parse_progress_line(self, line):
- # we may remove the line later if it is dropped
- # Keep it for debugging
- self._seen_lines.append(line)
- rval = super(TestRemoteProgress, self)._parse_progress_line(line)
- assert len(line) > 1, "line %r too short" % line
- return rval
-
- def line_dropped(self, line):
- try:
- self._seen_lines.remove(line)
- except ValueError:
- pass
-
- def update(self, op_code, cur_count, max_count=None, message=''):
- # check each stage only comes once
- op_id = op_code & self.OP_MASK
- assert op_id in (self.COUNTING, self.COMPRESSING, self.WRITING)
-
- self._stages_per_op.setdefault(op_id, 0)
- self._stages_per_op[ op_id ] = self._stages_per_op[ op_id ] | (op_code & self.STAGE_MASK)
-
- if op_code & (self.WRITING|self.END) == (self.WRITING|self.END):
- assert message
- # END check we get message
-
- self._num_progress_messages += 1
-
-
- def make_assertion(self):
- # we don't always receive messages
- if not self._seen_lines:
- return
-
- # sometimes objects are not compressed which is okay
- assert len(self._seen_ops) in (2,3)
- assert self._stages_per_op
-
- # must have seen all stages
- for op, stages in self._stages_per_op.items():
- assert stages & self.STAGE_MASK == self.STAGE_MASK
- # END for each op/stage
+ __slots__ = ( "_seen_lines", "_stages_per_op", '_num_progress_messages' )
+ def __init__(self):
+ super(TestRemoteProgress, self).__init__()
+ self._seen_lines = list()
+ self._stages_per_op = dict()
+ self._num_progress_messages = 0
+
+ def _parse_progress_line(self, line):
+ # we may remove the line later if it is dropped
+ # Keep it for debugging
+ self._seen_lines.append(line)
+ rval = super(TestRemoteProgress, self)._parse_progress_line(line)
+ assert len(line) > 1, "line %r too short" % line
+ return rval
+
+ def line_dropped(self, line):
+ try:
+ self._seen_lines.remove(line)
+ except ValueError:
+ pass
+
+ def update(self, op_code, cur_count, max_count=None, message=''):
+ # check each stage only comes once
+ op_id = op_code & self.OP_MASK
+ assert op_id in (self.COUNTING, self.COMPRESSING, self.WRITING)
+
+ self._stages_per_op.setdefault(op_id, 0)
+ self._stages_per_op[ op_id ] = self._stages_per_op[ op_id ] | (op_code & self.STAGE_MASK)
+
+ if op_code & (self.WRITING|self.END) == (self.WRITING|self.END):
+ assert message
+ # END check we get message
+
+ self._num_progress_messages += 1
+
+
+ def make_assertion(self):
+ # we don't always receive messages
+ if not self._seen_lines:
+ return
+
+ # sometimes objects are not compressed which is okay
+ assert len(self._seen_ops) in (2,3)
+ assert self._stages_per_op
+
+ # must have seen all stages
+ for op, stages in self._stages_per_op.items():
+ assert stages & self.STAGE_MASK == self.STAGE_MASK
+ # END for each op/stage
- def assert_received_message(self):
- assert self._num_progress_messages
-
+ def assert_received_message(self):
+ assert self._num_progress_messages
+
class TestRemote(TestBase):
-
- def _print_fetchhead(self, repo):
- fp = open(os.path.join(repo.git_dir, "FETCH_HEAD"))
- fp.close()
-
-
- def _test_fetch_result(self, results, remote):
- # self._print_fetchhead(remote.repo)
- assert len(results) > 0 and isinstance(results[0], FetchInfo)
- for info in results:
- assert isinstance(info.note, basestring)
- if isinstance(info.ref, Reference):
- assert info.flags != 0
- # END reference type flags handling
- assert isinstance(info.ref, (SymbolicReference, Reference))
- if info.flags & (info.FORCED_UPDATE|info.FAST_FORWARD):
- assert isinstance(info.old_commit, Commit)
- else:
- assert info.old_commit is None
- # END forced update checking
- # END for each info
-
- def _test_push_result(self, results, remote):
- assert len(results) > 0 and isinstance(results[0], PushInfo)
- for info in results:
- assert info.flags
- assert isinstance(info.summary, basestring)
- if info.old_commit is not None:
- assert isinstance(info.old_commit, Commit)
- if info.flags & info.ERROR:
- has_one = False
- for bitflag in (info.REJECTED, info.REMOTE_REJECTED, info.REMOTE_FAILURE):
- has_one |= bool(info.flags & bitflag)
- # END for each bitflag
- assert has_one
- else:
- # there must be a remote commit
- if info.flags & info.DELETED == 0:
- assert isinstance(info.local_ref, Reference)
- else:
- assert info.local_ref is None
- assert type(info.remote_ref) in (TagReference, RemoteReference)
- # END error checking
- # END for each info
-
-
- def _test_fetch_info(self, repo):
- self.failUnlessRaises(ValueError, FetchInfo._from_line, repo, "nonsense", '')
- self.failUnlessRaises(ValueError, FetchInfo._from_line, repo, "? [up to date] 0.1.7RC -> origin/0.1.7RC", '')
-
- def _commit_random_file(self, repo):
- #Create a file with a random name and random data and commit it to repo.
- # Return the commited absolute file path
- index = repo.index
- new_file = self._make_file(os.path.basename(tempfile.mktemp()),str(random.random()), repo)
- index.add([new_file])
- index.commit("Committing %s" % new_file)
- return new_file
-
- def _test_fetch(self,remote, rw_repo, remote_repo):
- # specialized fetch testing to de-clutter the main test
- self._test_fetch_info(rw_repo)
-
- def fetch_and_test(remote, **kwargs):
- progress = TestRemoteProgress()
- kwargs['progress'] = progress
- res = remote.fetch(**kwargs)
- progress.make_assertion()
- self._test_fetch_result(res, remote)
- return res
- # END fetch and check
-
- def get_info(res, remote, name):
- return res["%s/%s"%(remote,name)]
-
- # put remote head to master as it is garantueed to exist
- remote_repo.head.reference = remote_repo.heads.master
-
- res = fetch_and_test(remote)
- # all uptodate
- for info in res:
- assert info.flags & info.HEAD_UPTODATE
-
- # rewind remote head to trigger rejection
- # index must be false as remote is a bare repo
- rhead = remote_repo.head
- remote_commit = rhead.commit
- rhead.reset("HEAD~2", index=False)
- res = fetch_and_test(remote)
- mkey = "%s/%s"%(remote,'master')
- master_info = res[mkey]
- assert master_info.flags & FetchInfo.FORCED_UPDATE and master_info.note is not None
-
- # normal fast forward - set head back to previous one
- rhead.commit = remote_commit
- res = fetch_and_test(remote)
- assert res[mkey].flags & FetchInfo.FAST_FORWARD
-
- # new remote branch
- new_remote_branch = Head.create(remote_repo, "new_branch")
- res = fetch_and_test(remote)
- new_branch_info = get_info(res, remote, new_remote_branch)
- assert new_branch_info.flags & FetchInfo.NEW_HEAD
-
- # remote branch rename ( causes creation of a new one locally )
- new_remote_branch.rename("other_branch_name")
- res = fetch_and_test(remote)
- other_branch_info = get_info(res, remote, new_remote_branch)
- assert other_branch_info.ref.commit == new_branch_info.ref.commit
-
- # remove new branch
- Head.delete(new_remote_branch.repo, new_remote_branch)
- res = fetch_and_test(remote)
- # deleted remote will not be fetched
- self.failUnlessRaises(IndexError, get_info, res, remote, new_remote_branch)
-
- # prune stale tracking branches
- stale_refs = remote.stale_refs
- assert len(stale_refs) == 2 and isinstance(stale_refs[0], RemoteReference)
- RemoteReference.delete(rw_repo, *stale_refs)
-
- # test single branch fetch with refspec including target remote
- res = fetch_and_test(remote, refspec="master:refs/remotes/%s/master"%remote)
- assert len(res) == 1 and get_info(res, remote, 'master')
-
- # ... with respec and no target
- res = fetch_and_test(remote, refspec='master')
- assert len(res) == 1
-
- # add new tag reference
- rtag = TagReference.create(remote_repo, "1.0-RV_hello.there")
- res = fetch_and_test(remote, tags=True)
- tinfo = res[str(rtag)]
- assert isinstance(tinfo.ref, TagReference) and tinfo.ref.commit == rtag.commit
- assert tinfo.flags & tinfo.NEW_TAG
-
- # adjust tag commit
- rtag.object = rhead.commit.parents[0].parents[0]
- res = fetch_and_test(remote, tags=True)
- tinfo = res[str(rtag)]
- assert tinfo.commit == rtag.commit
- assert tinfo.flags & tinfo.TAG_UPDATE
-
- # delete remote tag - local one will stay
- TagReference.delete(remote_repo, rtag)
- res = fetch_and_test(remote, tags=True)
- self.failUnlessRaises(IndexError, get_info, res, remote, str(rtag))
-
- # provoke to receive actual objects to see what kind of output we have to
- # expect. For that we need a remote transport protocol
- # Create a new UN-shared repo and fetch into it after we pushed a change
- # to the shared repo
- other_repo_dir = tempfile.mktemp("other_repo")
- # must clone with a local path for the repo implementation not to freak out
- # as it wants local paths only ( which I can understand )
- other_repo = remote_repo.clone(other_repo_dir, shared=False)
- remote_repo_url = "git://localhost%s"%remote_repo.git_dir
-
- # put origin to git-url
- other_origin = other_repo.remotes.origin
- other_origin.config_writer.set("url", remote_repo_url)
- # it automatically creates alternates as remote_repo is shared as well.
- # It will use the transport though and ignore alternates when fetching
- # assert not other_repo.alternates # this would fail
-
- # assure we are in the right state
- rw_repo.head.reset(remote.refs.master, working_tree=True)
- try:
- self._commit_random_file(rw_repo)
- remote.push(rw_repo.head.reference)
-
- # here I would expect to see remote-information about packing
- # objects and so on. Unfortunately, this does not happen
- # if we are redirecting the output - git explicitly checks for this
- # and only provides progress information to ttys
- res = fetch_and_test(other_origin)
- finally:
- shutil.rmtree(other_repo_dir)
- # END test and cleanup
-
- def _test_push_and_pull(self,remote, rw_repo, remote_repo):
- # push our changes
- lhead = rw_repo.head
- lindex = rw_repo.index
- # assure we are on master and it is checked out where the remote is
- try:
- lhead.reference = rw_repo.heads.master
- except AttributeError:
- # if the author is on a non-master branch, the clones might not have
- # a local master yet. We simply create it
- lhead.reference = rw_repo.create_head('master')
- # END master handling
- lhead.reset(remote.refs.master, working_tree=True)
-
- # push without spec should fail ( without further configuration )
- # well, works nicely
- # self.failUnlessRaises(GitCommandError, remote.push)
-
- # simple file push
- self._commit_random_file(rw_repo)
- progress = TestRemoteProgress()
- res = remote.push(lhead.reference, progress)
- assert isinstance(res, IterableList)
- self._test_push_result(res, remote)
- progress.make_assertion()
-
- # rejected - undo last commit
- lhead.reset("HEAD~1")
- res = remote.push(lhead.reference)
- assert res[0].flags & PushInfo.ERROR
- assert res[0].flags & PushInfo.REJECTED
- self._test_push_result(res, remote)
-
- # force rejected pull
- res = remote.push('+%s' % lhead.reference)
- assert res[0].flags & PushInfo.ERROR == 0
- assert res[0].flags & PushInfo.FORCED_UPDATE
- self._test_push_result(res, remote)
-
- # invalid refspec
- res = remote.push("hellothere")
- assert len(res) == 0
-
- # push new tags
- progress = TestRemoteProgress()
- to_be_updated = "my_tag.1.0RV"
- new_tag = TagReference.create(rw_repo, to_be_updated)
- other_tag = TagReference.create(rw_repo, "my_obj_tag.2.1aRV", message="my message")
- res = remote.push(progress=progress, tags=True)
- assert res[-1].flags & PushInfo.NEW_TAG
- progress.make_assertion()
- self._test_push_result(res, remote)
-
- # update push new tags
- # Rejection is default
- new_tag = TagReference.create(rw_repo, to_be_updated, ref='HEAD~1', force=True)
- res = remote.push(tags=True)
- self._test_push_result(res, remote)
- assert res[-1].flags & PushInfo.REJECTED and res[-1].flags & PushInfo.ERROR
-
- # push force this tag
- res = remote.push("+%s" % new_tag.path)
- assert res[-1].flags & PushInfo.ERROR == 0 and res[-1].flags & PushInfo.FORCED_UPDATE
-
- # delete tag - have to do it using refspec
- res = remote.push(":%s" % new_tag.path)
- self._test_push_result(res, remote)
- assert res[0].flags & PushInfo.DELETED
- progress.assert_received_message()
-
- # push new branch
- new_head = Head.create(rw_repo, "my_new_branch")
- progress = TestRemoteProgress()
- res = remote.push(new_head, progress)
- assert res[0].flags & PushInfo.NEW_HEAD
- progress.make_assertion()
- self._test_push_result(res, remote)
-
- # delete new branch on the remote end and locally
- res = remote.push(":%s" % new_head.path)
- self._test_push_result(res, remote)
- Head.delete(rw_repo, new_head)
- assert res[-1].flags & PushInfo.DELETED
-
- # --all
- res = remote.push(all=True)
- self._test_push_result(res, remote)
-
- remote.pull('master')
-
- # cleanup - delete created tags and branches as we are in an innerloop on
- # the same repository
- TagReference.delete(rw_repo, new_tag, other_tag)
- remote.push(":%s" % other_tag.path)
-
- @with_rw_and_rw_remote_repo('0.1.6')
- def test_base(self, rw_repo, remote_repo):
- num_remotes = 0
- remote_set = set()
- ran_fetch_test = False
-
- for remote in rw_repo.remotes:
- num_remotes += 1
- assert remote == remote
- assert str(remote) != repr(remote)
- remote_set.add(remote)
- remote_set.add(remote) # should already exist
-
- # REFS
- refs = remote.refs
- assert refs
- for ref in refs:
- assert ref.remote_name == remote.name
- assert ref.remote_head
- # END for each ref
-
- # OPTIONS
- # cannot use 'fetch' key anymore as it is now a method
- for opt in ("url", ):
- val = getattr(remote, opt)
- reader = remote.config_reader
- assert reader.get(opt) == val
- assert reader.get_value(opt, None) == val
-
- # unable to write with a reader
- self.failUnlessRaises(IOError, reader.set, opt, "test")
-
- # change value
- writer = remote.config_writer
- new_val = "myval"
- writer.set(opt, new_val)
- assert writer.get(opt) == new_val
- writer.set(opt, val)
- assert writer.get(opt) == val
- del(writer)
- assert getattr(remote, opt) == val
- # END for each default option key
-
- # RENAME
- other_name = "totally_other_name"
- prev_name = remote.name
- assert remote.rename(other_name) == remote
- assert prev_name != remote.name
- # multiple times
- for time in range(2):
- assert remote.rename(prev_name).name == prev_name
- # END for each rename ( back to prev_name )
-
- # PUSH/PULL TESTING
- self._test_push_and_pull(remote, rw_repo, remote_repo)
-
- # FETCH TESTING
- # Only for remotes - local cases are the same or less complicated
- # as additional progress information will never be emitted
- if remote.name == "daemon_origin":
- self._test_fetch(remote, rw_repo, remote_repo)
- ran_fetch_test = True
- # END fetch test
-
- remote.update()
- # END for each remote
-
- assert ran_fetch_test
- assert num_remotes
- assert num_remotes == len(remote_set)
-
- origin = rw_repo.remote('origin')
- assert origin == rw_repo.remotes.origin
-
- @with_bare_rw_repo
- def test_creation_and_removal(self, bare_rw_repo):
- new_name = "test_new_one"
- arg_list = (new_name, "git@server:hello.git")
- remote = Remote.create(bare_rw_repo, *arg_list )
- assert remote.name == "test_new_one"
- assert remote in bare_rw_repo.remotes
-
- # create same one again
- self.failUnlessRaises(GitCommandError, Remote.create, bare_rw_repo, *arg_list)
-
- Remote.remove(bare_rw_repo, new_name)
-
- for remote in bare_rw_repo.remotes:
- if remote.name == new_name:
- raise AssertionError("Remote removal failed")
- # END if deleted remote matches existing remote's name
- # END for each remote
-
-
-
+
+ def _print_fetchhead(self, repo):
+ fp = open(os.path.join(repo.git_dir, "FETCH_HEAD"))
+ fp.close()
+
+
+ def _test_fetch_result(self, results, remote):
+ # self._print_fetchhead(remote.repo)
+ assert len(results) > 0 and isinstance(results[0], FetchInfo)
+ for info in results:
+ assert isinstance(info.note, basestring)
+ if isinstance(info.ref, Reference):
+ assert info.flags != 0
+ # END reference type flags handling
+ assert isinstance(info.ref, (SymbolicReference, Reference))
+ if info.flags & (info.FORCED_UPDATE|info.FAST_FORWARD):
+ assert isinstance(info.old_commit, Commit)
+ else:
+ assert info.old_commit is None
+ # END forced update checking
+ # END for each info
+
+ def _test_push_result(self, results, remote):
+ assert len(results) > 0 and isinstance(results[0], PushInfo)
+ for info in results:
+ assert info.flags
+ assert isinstance(info.summary, basestring)
+ if info.old_commit is not None:
+ assert isinstance(info.old_commit, Commit)
+ if info.flags & info.ERROR:
+ has_one = False
+ for bitflag in (info.REJECTED, info.REMOTE_REJECTED, info.REMOTE_FAILURE):
+ has_one |= bool(info.flags & bitflag)
+ # END for each bitflag
+ assert has_one
+ else:
+ # there must be a remote commit
+ if info.flags & info.DELETED == 0:
+ assert isinstance(info.local_ref, Reference)
+ else:
+ assert info.local_ref is None
+ assert type(info.remote_ref) in (TagReference, RemoteReference)
+ # END error checking
+ # END for each info
+
+
+ def _test_fetch_info(self, repo):
+ self.failUnlessRaises(ValueError, FetchInfo._from_line, repo, "nonsense", '')
+ self.failUnlessRaises(ValueError, FetchInfo._from_line, repo, "? [up to date] 0.1.7RC -> origin/0.1.7RC", '')
+
+ def _commit_random_file(self, repo):
+ #Create a file with a random name and random data and commit it to repo.
+ # Return the commited absolute file path
+ index = repo.index
+ new_file = self._make_file(os.path.basename(tempfile.mktemp()),str(random.random()), repo)
+ index.add([new_file])
+ index.commit("Committing %s" % new_file)
+ return new_file
+
+ def _test_fetch(self,remote, rw_repo, remote_repo):
+ # specialized fetch testing to de-clutter the main test
+ self._test_fetch_info(rw_repo)
+
+ def fetch_and_test(remote, **kwargs):
+ progress = TestRemoteProgress()
+ kwargs['progress'] = progress
+ res = remote.fetch(**kwargs)
+ progress.make_assertion()
+ self._test_fetch_result(res, remote)
+ return res
+ # END fetch and check
+
+ def get_info(res, remote, name):
+ return res["%s/%s"%(remote,name)]
+
+ # put remote head to master as it is garantueed to exist
+ remote_repo.head.reference = remote_repo.heads.master
+
+ res = fetch_and_test(remote)
+ # all uptodate
+ for info in res:
+ assert info.flags & info.HEAD_UPTODATE
+
+ # rewind remote head to trigger rejection
+ # index must be false as remote is a bare repo
+ rhead = remote_repo.head
+ remote_commit = rhead.commit
+ rhead.reset("HEAD~2", index=False)
+ res = fetch_and_test(remote)
+ mkey = "%s/%s"%(remote,'master')
+ master_info = res[mkey]
+ assert master_info.flags & FetchInfo.FORCED_UPDATE and master_info.note is not None
+
+ # normal fast forward - set head back to previous one
+ rhead.commit = remote_commit
+ res = fetch_and_test(remote)
+ assert res[mkey].flags & FetchInfo.FAST_FORWARD
+
+ # new remote branch
+ new_remote_branch = Head.create(remote_repo, "new_branch")
+ res = fetch_and_test(remote)
+ new_branch_info = get_info(res, remote, new_remote_branch)
+ assert new_branch_info.flags & FetchInfo.NEW_HEAD
+
+ # remote branch rename ( causes creation of a new one locally )
+ new_remote_branch.rename("other_branch_name")
+ res = fetch_and_test(remote)
+ other_branch_info = get_info(res, remote, new_remote_branch)
+ assert other_branch_info.ref.commit == new_branch_info.ref.commit
+
+ # remove new branch
+ Head.delete(new_remote_branch.repo, new_remote_branch)
+ res = fetch_and_test(remote)
+ # deleted remote will not be fetched
+ self.failUnlessRaises(IndexError, get_info, res, remote, new_remote_branch)
+
+ # prune stale tracking branches
+ stale_refs = remote.stale_refs
+ assert len(stale_refs) == 2 and isinstance(stale_refs[0], RemoteReference)
+ RemoteReference.delete(rw_repo, *stale_refs)
+
+ # test single branch fetch with refspec including target remote
+ res = fetch_and_test(remote, refspec="master:refs/remotes/%s/master"%remote)
+ assert len(res) == 1 and get_info(res, remote, 'master')
+
+ # ... with respec and no target
+ res = fetch_and_test(remote, refspec='master')
+ assert len(res) == 1
+
+ # add new tag reference
+ rtag = TagReference.create(remote_repo, "1.0-RV_hello.there")
+ res = fetch_and_test(remote, tags=True)
+ tinfo = res[str(rtag)]
+ assert isinstance(tinfo.ref, TagReference) and tinfo.ref.commit == rtag.commit
+ assert tinfo.flags & tinfo.NEW_TAG
+
+ # adjust tag commit
+ rtag.object = rhead.commit.parents[0].parents[0]
+ res = fetch_and_test(remote, tags=True)
+ tinfo = res[str(rtag)]
+ assert tinfo.commit == rtag.commit
+ assert tinfo.flags & tinfo.TAG_UPDATE
+
+ # delete remote tag - local one will stay
+ TagReference.delete(remote_repo, rtag)
+ res = fetch_and_test(remote, tags=True)
+ self.failUnlessRaises(IndexError, get_info, res, remote, str(rtag))
+
+ # provoke to receive actual objects to see what kind of output we have to
+ # expect. For that we need a remote transport protocol
+ # Create a new UN-shared repo and fetch into it after we pushed a change
+ # to the shared repo
+ other_repo_dir = tempfile.mktemp("other_repo")
+ # must clone with a local path for the repo implementation not to freak out
+ # as it wants local paths only ( which I can understand )
+ other_repo = remote_repo.clone(other_repo_dir, shared=False)
+ remote_repo_url = "git://localhost%s"%remote_repo.git_dir
+
+ # put origin to git-url
+ other_origin = other_repo.remotes.origin
+ other_origin.config_writer.set("url", remote_repo_url)
+ # it automatically creates alternates as remote_repo is shared as well.
+ # It will use the transport though and ignore alternates when fetching
+ # assert not other_repo.alternates # this would fail
+
+ # assure we are in the right state
+ rw_repo.head.reset(remote.refs.master, working_tree=True)
+ try:
+ self._commit_random_file(rw_repo)
+ remote.push(rw_repo.head.reference)
+
+ # here I would expect to see remote-information about packing
+ # objects and so on. Unfortunately, this does not happen
+ # if we are redirecting the output - git explicitly checks for this
+ # and only provides progress information to ttys
+ res = fetch_and_test(other_origin)
+ finally:
+ shutil.rmtree(other_repo_dir)
+ # END test and cleanup
+
+ def _test_push_and_pull(self,remote, rw_repo, remote_repo):
+ # push our changes
+ lhead = rw_repo.head
+ lindex = rw_repo.index
+ # assure we are on master and it is checked out where the remote is
+ try:
+ lhead.reference = rw_repo.heads.master
+ except AttributeError:
+ # if the author is on a non-master branch, the clones might not have
+ # a local master yet. We simply create it
+ lhead.reference = rw_repo.create_head('master')
+ # END master handling
+ lhead.reset(remote.refs.master, working_tree=True)
+
+ # push without spec should fail ( without further configuration )
+ # well, works nicely
+ # self.failUnlessRaises(GitCommandError, remote.push)
+
+ # simple file push
+ self._commit_random_file(rw_repo)
+ progress = TestRemoteProgress()
+ res = remote.push(lhead.reference, progress)
+ assert isinstance(res, IterableList)
+ self._test_push_result(res, remote)
+ progress.make_assertion()
+
+ # rejected - undo last commit
+ lhead.reset("HEAD~1")
+ res = remote.push(lhead.reference)
+ assert res[0].flags & PushInfo.ERROR
+ assert res[0].flags & PushInfo.REJECTED
+ self._test_push_result(res, remote)
+
+ # force rejected pull
+ res = remote.push('+%s' % lhead.reference)
+ assert res[0].flags & PushInfo.ERROR == 0
+ assert res[0].flags & PushInfo.FORCED_UPDATE
+ self._test_push_result(res, remote)
+
+ # invalid refspec
+ res = remote.push("hellothere")
+ assert len(res) == 0
+
+ # push new tags
+ progress = TestRemoteProgress()
+ to_be_updated = "my_tag.1.0RV"
+ new_tag = TagReference.create(rw_repo, to_be_updated)
+ other_tag = TagReference.create(rw_repo, "my_obj_tag.2.1aRV", message="my message")
+ res = remote.push(progress=progress, tags=True)
+ assert res[-1].flags & PushInfo.NEW_TAG
+ progress.make_assertion()
+ self._test_push_result(res, remote)
+
+ # update push new tags
+ # Rejection is default
+ new_tag = TagReference.create(rw_repo, to_be_updated, ref='HEAD~1', force=True)
+ res = remote.push(tags=True)
+ self._test_push_result(res, remote)
+ assert res[-1].flags & PushInfo.REJECTED and res[-1].flags & PushInfo.ERROR
+
+ # push force this tag
+ res = remote.push("+%s" % new_tag.path)
+ assert res[-1].flags & PushInfo.ERROR == 0 and res[-1].flags & PushInfo.FORCED_UPDATE
+
+ # delete tag - have to do it using refspec
+ res = remote.push(":%s" % new_tag.path)
+ self._test_push_result(res, remote)
+ assert res[0].flags & PushInfo.DELETED
+ progress.assert_received_message()
+
+ # push new branch
+ new_head = Head.create(rw_repo, "my_new_branch")
+ progress = TestRemoteProgress()
+ res = remote.push(new_head, progress)
+ assert res[0].flags & PushInfo.NEW_HEAD
+ progress.make_assertion()
+ self._test_push_result(res, remote)
+
+ # delete new branch on the remote end and locally
+ res = remote.push(":%s" % new_head.path)
+ self._test_push_result(res, remote)
+ Head.delete(rw_repo, new_head)
+ assert res[-1].flags & PushInfo.DELETED
+
+ # --all
+ res = remote.push(all=True)
+ self._test_push_result(res, remote)
+
+ remote.pull('master')
+
+ # cleanup - delete created tags and branches as we are in an innerloop on
+ # the same repository
+ TagReference.delete(rw_repo, new_tag, other_tag)
+ remote.push(":%s" % other_tag.path)
+
+ @with_rw_and_rw_remote_repo('0.1.6')
+ def test_base(self, rw_repo, remote_repo):
+ num_remotes = 0
+ remote_set = set()
+ ran_fetch_test = False
+
+ for remote in rw_repo.remotes:
+ num_remotes += 1
+ assert remote == remote
+ assert str(remote) != repr(remote)
+ remote_set.add(remote)
+ remote_set.add(remote) # should already exist
+
+ # REFS
+ refs = remote.refs
+ assert refs
+ for ref in refs:
+ assert ref.remote_name == remote.name
+ assert ref.remote_head
+ # END for each ref
+
+ # OPTIONS
+ # cannot use 'fetch' key anymore as it is now a method
+ for opt in ("url", ):
+ val = getattr(remote, opt)
+ reader = remote.config_reader
+ assert reader.get(opt) == val
+ assert reader.get_value(opt, None) == val
+
+ # unable to write with a reader
+ self.failUnlessRaises(IOError, reader.set, opt, "test")
+
+ # change value
+ writer = remote.config_writer
+ new_val = "myval"
+ writer.set(opt, new_val)
+ assert writer.get(opt) == new_val
+ writer.set(opt, val)
+ assert writer.get(opt) == val
+ del(writer)
+ assert getattr(remote, opt) == val
+ # END for each default option key
+
+ # RENAME
+ other_name = "totally_other_name"
+ prev_name = remote.name
+ assert remote.rename(other_name) == remote
+ assert prev_name != remote.name
+ # multiple times
+ for time in range(2):
+ assert remote.rename(prev_name).name == prev_name
+ # END for each rename ( back to prev_name )
+
+ # PUSH/PULL TESTING
+ self._test_push_and_pull(remote, rw_repo, remote_repo)
+
+ # FETCH TESTING
+ # Only for remotes - local cases are the same or less complicated
+ # as additional progress information will never be emitted
+ if remote.name == "daemon_origin":
+ self._test_fetch(remote, rw_repo, remote_repo)
+ ran_fetch_test = True
+ # END fetch test
+
+ remote.update()
+ # END for each remote
+
+ assert ran_fetch_test
+ assert num_remotes
+ assert num_remotes == len(remote_set)
+
+ origin = rw_repo.remote('origin')
+ assert origin == rw_repo.remotes.origin
+
+ @with_bare_rw_repo
+ def test_creation_and_removal(self, bare_rw_repo):
+ new_name = "test_new_one"
+ arg_list = (new_name, "git@server:hello.git")
+ remote = Remote.create(bare_rw_repo, *arg_list )
+ assert remote.name == "test_new_one"
+ assert remote in bare_rw_repo.remotes
+
+ # create same one again
+ self.failUnlessRaises(GitCommandError, Remote.create, bare_rw_repo, *arg_list)
+
+ Remote.remove(bare_rw_repo, new_name)
+
+ for remote in bare_rw_repo.remotes:
+ if remote.name == new_name:
+ raise AssertionError("Remote removal failed")
+ # END if deleted remote matches existing remote's name
+ # END for each remote
+
+
+
diff --git a/test/git/test_repo.py b/test/git/test_repo.py
index 4af9161c..b3ce74cb 100644
--- a/test/git/test_repo.py
+++ b/test/git/test_repo.py
@@ -12,286 +12,286 @@ import tempfile
import shutil
class TestRepo(TestBase):
-
- @raises(InvalidGitRepositoryError)
- def test_new_should_raise_on_invalid_repo_location(self):
- Repo(tempfile.gettempdir())
+
+ @raises(InvalidGitRepositoryError)
+ def test_new_should_raise_on_invalid_repo_location(self):
+ Repo(tempfile.gettempdir())
- @raises(NoSuchPathError)
- def test_new_should_raise_on_non_existant_path(self):
- Repo("repos/foobar")
+ @raises(NoSuchPathError)
+ def test_new_should_raise_on_non_existant_path(self):
+ Repo("repos/foobar")
- def test_repo_creation_from_different_paths(self):
- r_from_gitdir = Repo(self.rorepo.git_dir)
- assert r_from_gitdir.git_dir == self.rorepo.git_dir
- assert r_from_gitdir.git_dir.endswith('.git')
- assert not self.rorepo.git.working_dir.endswith('.git')
- assert r_from_gitdir.git.working_dir == self.rorepo.git.working_dir
+ def test_repo_creation_from_different_paths(self):
+ r_from_gitdir = Repo(self.rorepo.git_dir)
+ assert r_from_gitdir.git_dir == self.rorepo.git_dir
+ assert r_from_gitdir.git_dir.endswith('.git')
+ assert not self.rorepo.git.working_dir.endswith('.git')
+ assert r_from_gitdir.git.working_dir == self.rorepo.git.working_dir
- def test_description(self):
- txt = "Test repository"
- self.rorepo.description = txt
- assert_equal(self.rorepo.description, txt)
+ def test_description(self):
+ txt = "Test repository"
+ self.rorepo.description = txt
+ assert_equal(self.rorepo.description, txt)
- def test_heads_should_return_array_of_head_objects(self):
- for head in self.rorepo.heads:
- assert_equal(Head, head.__class__)
+ def test_heads_should_return_array_of_head_objects(self):
+ for head in self.rorepo.heads:
+ assert_equal(Head, head.__class__)
- def test_heads_should_populate_head_data(self):
- for head in self.rorepo.heads:
- assert head.name
- assert isinstance(head.commit,Commit)
- # END for each head
-
- assert isinstance(self.rorepo.heads.master, Head)
- assert isinstance(self.rorepo.heads['master'], Head)
-
- def test_tree_from_revision(self):
- tree = self.rorepo.tree('0.1.6')
- assert tree.type == "tree"
- assert self.rorepo.tree(tree) == tree
-
- # try from invalid revision that does not exist
- self.failUnlessRaises(ValueError, self.rorepo.tree, 'hello world')
+ def test_heads_should_populate_head_data(self):
+ for head in self.rorepo.heads:
+ assert head.name
+ assert isinstance(head.commit,Commit)
+ # END for each head
+
+ assert isinstance(self.rorepo.heads.master, Head)
+ assert isinstance(self.rorepo.heads['master'], Head)
+
+ def test_tree_from_revision(self):
+ tree = self.rorepo.tree('0.1.6')
+ assert tree.type == "tree"
+ assert self.rorepo.tree(tree) == tree
+
+ # try from invalid revision that does not exist
+ self.failUnlessRaises(ValueError, self.rorepo.tree, 'hello world')
- @patch_object(Git, '_call_process')
- def test_commits(self, git):
- git.return_value = ListProcessAdapter(fixture('rev_list'))
+ @patch_object(Git, '_call_process')
+ def test_commits(self, git):
+ git.return_value = ListProcessAdapter(fixture('rev_list'))
- commits = list( self.rorepo.iter_commits('master', max_count=10) )
-
- c = commits[0]
- assert_equal('4c8124ffcf4039d292442eeccabdeca5af5c5017', c.sha)
- assert_equal(["634396b2f541a9f2d58b00be1a07f0c358b999b3"], [p.sha for p in c.parents])
- assert_equal("672eca9b7f9e09c22dcb128c283e8c3c8d7697a4", c.tree.sha)
- assert_equal("Tom Preston-Werner", c.author.name)
- assert_equal("tom@mojombo.com", c.author.email)
- assert_equal(1191999972, c.authored_date)
- assert_equal("Tom Preston-Werner", c.committer.name)
- assert_equal("tom@mojombo.com", c.committer.email)
- assert_equal(1191999972, c.committed_date)
- assert_equal("implement Grit#heads", c.message)
+ commits = list( self.rorepo.iter_commits('master', max_count=10) )
+
+ c = commits[0]
+ assert_equal('4c8124ffcf4039d292442eeccabdeca5af5c5017', c.sha)
+ assert_equal(["634396b2f541a9f2d58b00be1a07f0c358b999b3"], [p.sha for p in c.parents])
+ assert_equal("672eca9b7f9e09c22dcb128c283e8c3c8d7697a4", c.tree.sha)
+ assert_equal("Tom Preston-Werner", c.author.name)
+ assert_equal("tom@mojombo.com", c.author.email)
+ assert_equal(1191999972, c.authored_date)
+ assert_equal("Tom Preston-Werner", c.committer.name)
+ assert_equal("tom@mojombo.com", c.committer.email)
+ assert_equal(1191999972, c.committed_date)
+ assert_equal("implement Grit#heads", c.message)
- c = commits[1]
- assert_equal(tuple(), c.parents)
+ c = commits[1]
+ assert_equal(tuple(), c.parents)
- c = commits[2]
- assert_equal(["6e64c55896aabb9a7d8e9f8f296f426d21a78c2c", "7f874954efb9ba35210445be456c74e037ba6af2"], map(lambda p: p.sha, c.parents))
- assert_equal("Merge branch 'site'", c.summary)
+ c = commits[2]
+ assert_equal(["6e64c55896aabb9a7d8e9f8f296f426d21a78c2c", "7f874954efb9ba35210445be456c74e037ba6af2"], map(lambda p: p.sha, c.parents))
+ assert_equal("Merge branch 'site'", c.summary)
- assert_true(git.called)
+ assert_true(git.called)
- def test_trees(self):
- mc = 30
- num_trees = 0
- for tree in self.rorepo.iter_trees('0.1.5', max_count=mc):
- num_trees += 1
- assert isinstance(tree, Tree)
- # END for each tree
- assert num_trees == mc
-
+ def test_trees(self):
+ mc = 30
+ num_trees = 0
+ for tree in self.rorepo.iter_trees('0.1.5', max_count=mc):
+ num_trees += 1
+ assert isinstance(tree, Tree)
+ # END for each tree
+ assert num_trees == mc
+
- def test_init(self):
- prev_cwd = os.getcwd()
- os.chdir(tempfile.gettempdir())
- git_dir_rela = "repos/foo/bar.git"
- del_dir_abs = os.path.abspath("repos")
- git_dir_abs = os.path.abspath(git_dir_rela)
- try:
- # with specific path
- for path in (git_dir_rela, git_dir_abs):
- r = Repo.init(path=path, bare=True)
- assert isinstance(r, Repo)
- assert r.bare == True
- assert os.path.isdir(r.git_dir)
- shutil.rmtree(git_dir_abs)
- # END for each path
-
- os.makedirs(git_dir_rela)
- os.chdir(git_dir_rela)
- r = Repo.init(bare=False)
- r.bare == False
- finally:
- try:
- shutil.rmtree(del_dir_abs)
- except OSError:
- pass
- os.chdir(prev_cwd)
- # END restore previous state
-
- def test_bare_property(self):
- self.rorepo.bare
+ def test_init(self):
+ prev_cwd = os.getcwd()
+ os.chdir(tempfile.gettempdir())
+ git_dir_rela = "repos/foo/bar.git"
+ del_dir_abs = os.path.abspath("repos")
+ git_dir_abs = os.path.abspath(git_dir_rela)
+ try:
+ # with specific path
+ for path in (git_dir_rela, git_dir_abs):
+ r = Repo.init(path=path, bare=True)
+ assert isinstance(r, Repo)
+ assert r.bare == True
+ assert os.path.isdir(r.git_dir)
+ shutil.rmtree(git_dir_abs)
+ # END for each path
+
+ os.makedirs(git_dir_rela)
+ os.chdir(git_dir_rela)
+ r = Repo.init(bare=False)
+ r.bare == False
+ finally:
+ try:
+ shutil.rmtree(del_dir_abs)
+ except OSError:
+ pass
+ os.chdir(prev_cwd)
+ # END restore previous state
+
+ def test_bare_property(self):
+ self.rorepo.bare
- @patch_object(Repo, '__init__')
- @patch_object(Git, '_call_process')
- def test_init_with_options(self, git, repo):
- git.return_value = True
- repo.return_value = None
+ @patch_object(Repo, '__init__')
+ @patch_object(Git, '_call_process')
+ def test_init_with_options(self, git, repo):
+ git.return_value = True
+ repo.return_value = None
- r = Repo.init("repos/foo/bar.git", **{'bare' : True,'template': "/baz/sweet"})
- assert isinstance(r, Repo)
+ r = Repo.init("repos/foo/bar.git", **{'bare' : True,'template': "/baz/sweet"})
+ assert isinstance(r, Repo)
- assert_true(git.called)
- assert_true(repo.called)
+ assert_true(git.called)
+ assert_true(repo.called)
- @patch_object(Repo, '__init__')
- @patch_object(Git, '_call_process')
- def test_clone(self, git, repo):
- git.return_value = None
- repo.return_value = None
+ @patch_object(Repo, '__init__')
+ @patch_object(Git, '_call_process')
+ def test_clone(self, git, repo):
+ git.return_value = None
+ repo.return_value = None
- self.rorepo.clone("repos/foo/bar.git")
+ self.rorepo.clone("repos/foo/bar.git")
- assert_true(git.called)
- path = os.path.join(absolute_project_path(), '.git')
- assert_equal(git.call_args, (('clone', path, 'repos/foo/bar.git'), {}))
- assert_true(repo.called)
+ assert_true(git.called)
+ path = os.path.join(absolute_project_path(), '.git')
+ assert_equal(git.call_args, (('clone', path, 'repos/foo/bar.git'), {}))
+ assert_true(repo.called)
- @patch_object(Repo, '__init__')
- @patch_object(Git, '_call_process')
- def test_clone_with_options(self, git, repo):
- git.return_value = None
- repo.return_value = None
+ @patch_object(Repo, '__init__')
+ @patch_object(Git, '_call_process')
+ def test_clone_with_options(self, git, repo):
+ git.return_value = None
+ repo.return_value = None
- self.rorepo.clone("repos/foo/bar.git", **{'template': '/awesome'})
+ self.rorepo.clone("repos/foo/bar.git", **{'template': '/awesome'})
- assert_true(git.called)
- path = os.path.join(absolute_project_path(), '.git')
- assert_equal(git.call_args, (('clone', path, 'repos/foo/bar.git'),
- { 'template': '/awesome'}))
- assert_true(repo.called)
+ assert_true(git.called)
+ path = os.path.join(absolute_project_path(), '.git')
+ assert_equal(git.call_args, (('clone', path, 'repos/foo/bar.git'),
+ { 'template': '/awesome'}))
+ assert_true(repo.called)
- def test_daemon_export(self):
- orig_val = self.rorepo.daemon_export
- self.rorepo.daemon_export = not orig_val
- assert self.rorepo.daemon_export == ( not orig_val )
- self.rorepo.daemon_export = orig_val
- assert self.rorepo.daemon_export == orig_val
+ def test_daemon_export(self):
+ orig_val = self.rorepo.daemon_export
+ self.rorepo.daemon_export = not orig_val
+ assert self.rorepo.daemon_export == ( not orig_val )
+ self.rorepo.daemon_export = orig_val
+ assert self.rorepo.daemon_export == orig_val
- def test_alternates(self):
- cur_alternates = self.rorepo.alternates
- # empty alternates
- self.rorepo.alternates = []
- assert self.rorepo.alternates == []
- alts = [ "other/location", "this/location" ]
- self.rorepo.alternates = alts
- assert alts == self.rorepo.alternates
- self.rorepo.alternates = cur_alternates
+ def test_alternates(self):
+ cur_alternates = self.rorepo.alternates
+ # empty alternates
+ self.rorepo.alternates = []
+ assert self.rorepo.alternates == []
+ alts = [ "other/location", "this/location" ]
+ self.rorepo.alternates = alts
+ assert alts == self.rorepo.alternates
+ self.rorepo.alternates = cur_alternates
- def test_repr(self):
- path = os.path.join(os.path.abspath(GIT_REPO), '.git')
- assert_equal('<git.Repo "%s">' % path, repr(self.rorepo))
+ def test_repr(self):
+ path = os.path.join(os.path.abspath(GIT_REPO), '.git')
+ assert_equal('<git.Repo "%s">' % path, repr(self.rorepo))
- def test_is_dirty_with_bare_repository(self):
- self.rorepo._bare = True
- assert_false(self.rorepo.is_dirty())
+ def test_is_dirty_with_bare_repository(self):
+ self.rorepo._bare = True
+ assert_false(self.rorepo.is_dirty())
- def test_is_dirty(self):
- self.rorepo._bare = False
- for index in (0,1):
- for working_tree in (0,1):
- for untracked_files in (0,1):
- assert self.rorepo.is_dirty(index, working_tree, untracked_files) in (True, False)
- # END untracked files
- # END working tree
- # END index
- self.rorepo._bare = True
- assert self.rorepo.is_dirty() == False
+ def test_is_dirty(self):
+ self.rorepo._bare = False
+ for index in (0,1):
+ for working_tree in (0,1):
+ for untracked_files in (0,1):
+ assert self.rorepo.is_dirty(index, working_tree, untracked_files) in (True, False)
+ # END untracked files
+ # END working tree
+ # END index
+ self.rorepo._bare = True
+ assert self.rorepo.is_dirty() == False
- def test_head(self):
- assert self.rorepo.head.reference.object == self.rorepo.active_branch.object
+ def test_head(self):
+ assert self.rorepo.head.reference.object == self.rorepo.active_branch.object
- def test_index(self):
- index = self.rorepo.index
- assert isinstance(index, IndexFile)
-
- def test_tag(self):
- assert self.rorepo.tag('refs/tags/0.1.5').commit
-
- def test_archive(self):
- tmpfile = os.tmpfile()
- self.rorepo.archive(tmpfile, '0.1.5')
- assert tmpfile.tell()
-
- @patch_object(Git, '_call_process')
- def test_should_display_blame_information(self, git):
- git.return_value = fixture('blame')
- b = self.rorepo.blame( 'master', 'lib/git.py')
- assert_equal(13, len(b))
- assert_equal( 2, len(b[0]) )
- # assert_equal(25, reduce(lambda acc, x: acc + len(x[-1]), b))
- assert_equal(hash(b[0][0]), hash(b[9][0]))
- c = b[0][0]
- assert_true(git.called)
- assert_equal(git.call_args, (('blame', 'master', '--', 'lib/git.py'), {'p': True}))
-
- assert_equal('634396b2f541a9f2d58b00be1a07f0c358b999b3', c.sha)
- assert_equal('Tom Preston-Werner', c.author.name)
- assert_equal('tom@mojombo.com', c.author.email)
- assert_equal(1191997100, c.authored_date)
- assert_equal('Tom Preston-Werner', c.committer.name)
- assert_equal('tom@mojombo.com', c.committer.email)
- assert_equal(1191997100, c.committed_date)
- assert_equal('initial grit setup', c.message)
-
- # test the 'lines per commit' entries
- tlist = b[0][1]
- assert_true( tlist )
- assert_true( isinstance( tlist[0], basestring ) )
- assert_true( len( tlist ) < sum( len(t) for t in tlist ) ) # test for single-char bug
-
- def test_untracked_files(self):
- base = self.rorepo.working_tree_dir
- files = ( join_path_native(base, "__test_myfile"),
- join_path_native(base, "__test_other_file") )
- num_recently_untracked = 0
- try:
- for fpath in files:
- fd = open(fpath,"wb")
- fd.close()
- # END for each filename
- untracked_files = self.rorepo.untracked_files
- num_recently_untracked = len(untracked_files)
-
- # assure we have all names - they are relative to the git-dir
- num_test_untracked = 0
- for utfile in untracked_files:
- num_test_untracked += join_path_native(base, utfile) in files
- assert len(files) == num_test_untracked
- finally:
- for fpath in files:
- if os.path.isfile(fpath):
- os.remove(fpath)
- # END handle files
-
- assert len(self.rorepo.untracked_files) == (num_recently_untracked - len(files))
-
- def test_config_reader(self):
- reader = self.rorepo.config_reader() # all config files
- assert reader.read_only
- reader = self.rorepo.config_reader("repository") # single config file
- assert reader.read_only
-
- def test_config_writer(self):
- for config_level in self.rorepo.config_level:
- try:
- writer = self.rorepo.config_writer(config_level)
- assert not writer.read_only
- except IOError:
- # its okay not to get a writer for some configuration files if we
- # have no permissions
- pass
- # END for each config level
-
- def test_creation_deletion(self):
- # just a very quick test to assure it generally works. There are
- # specialized cases in the test_refs module
- head = self.rorepo.create_head("new_head", "HEAD~1")
- self.rorepo.delete_head(head)
-
- tag = self.rorepo.create_tag("new_tag", "HEAD~2")
- self.rorepo.delete_tag(tag)
-
- remote = self.rorepo.create_remote("new_remote", "git@server:repo.git")
- self.rorepo.delete_remote(remote)
+ def test_index(self):
+ index = self.rorepo.index
+ assert isinstance(index, IndexFile)
+
+ def test_tag(self):
+ assert self.rorepo.tag('refs/tags/0.1.5').commit
+
+ def test_archive(self):
+ tmpfile = os.tmpfile()
+ self.rorepo.archive(tmpfile, '0.1.5')
+ assert tmpfile.tell()
+
+ @patch_object(Git, '_call_process')
+ def test_should_display_blame_information(self, git):
+ git.return_value = fixture('blame')
+ b = self.rorepo.blame( 'master', 'lib/git.py')
+ assert_equal(13, len(b))
+ assert_equal( 2, len(b[0]) )
+ # assert_equal(25, reduce(lambda acc, x: acc + len(x[-1]), b))
+ assert_equal(hash(b[0][0]), hash(b[9][0]))
+ c = b[0][0]
+ assert_true(git.called)
+ assert_equal(git.call_args, (('blame', 'master', '--', 'lib/git.py'), {'p': True}))
+
+ assert_equal('634396b2f541a9f2d58b00be1a07f0c358b999b3', c.sha)
+ assert_equal('Tom Preston-Werner', c.author.name)
+ assert_equal('tom@mojombo.com', c.author.email)
+ assert_equal(1191997100, c.authored_date)
+ assert_equal('Tom Preston-Werner', c.committer.name)
+ assert_equal('tom@mojombo.com', c.committer.email)
+ assert_equal(1191997100, c.committed_date)
+ assert_equal('initial grit setup', c.message)
+
+ # test the 'lines per commit' entries
+ tlist = b[0][1]
+ assert_true( tlist )
+ assert_true( isinstance( tlist[0], basestring ) )
+ assert_true( len( tlist ) < sum( len(t) for t in tlist ) ) # test for single-char bug
+
+ def test_untracked_files(self):
+ base = self.rorepo.working_tree_dir
+ files = ( join_path_native(base, "__test_myfile"),
+ join_path_native(base, "__test_other_file") )
+ num_recently_untracked = 0
+ try:
+ for fpath in files:
+ fd = open(fpath,"wb")
+ fd.close()
+ # END for each filename
+ untracked_files = self.rorepo.untracked_files
+ num_recently_untracked = len(untracked_files)
+
+ # assure we have all names - they are relative to the git-dir
+ num_test_untracked = 0
+ for utfile in untracked_files:
+ num_test_untracked += join_path_native(base, utfile) in files
+ assert len(files) == num_test_untracked
+ finally:
+ for fpath in files:
+ if os.path.isfile(fpath):
+ os.remove(fpath)
+ # END handle files
+
+ assert len(self.rorepo.untracked_files) == (num_recently_untracked - len(files))
+
+ def test_config_reader(self):
+ reader = self.rorepo.config_reader() # all config files
+ assert reader.read_only
+ reader = self.rorepo.config_reader("repository") # single config file
+ assert reader.read_only
+
+ def test_config_writer(self):
+ for config_level in self.rorepo.config_level:
+ try:
+ writer = self.rorepo.config_writer(config_level)
+ assert not writer.read_only
+ except IOError:
+ # its okay not to get a writer for some configuration files if we
+ # have no permissions
+ pass
+ # END for each config level
+
+ def test_creation_deletion(self):
+ # just a very quick test to assure it generally works. There are
+ # specialized cases in the test_refs module
+ head = self.rorepo.create_head("new_head", "HEAD~1")
+ self.rorepo.delete_head(head)
+
+ tag = self.rorepo.create_tag("new_tag", "HEAD~2")
+ self.rorepo.delete_tag(tag)
+
+ remote = self.rorepo.create_remote("new_remote", "git@server:repo.git")
+ self.rorepo.delete_remote(remote)
diff --git a/test/git/test_stats.py b/test/git/test_stats.py
index 7392a96e..cc30bf92 100644
--- a/test/git/test_stats.py
+++ b/test/git/test_stats.py
@@ -8,18 +8,18 @@ from test.testlib import *
from git import *
class TestStats(TestBase):
-
- def test__list_from_string(self):
- output = fixture('diff_numstat')
- stats = Stats._list_from_string(self.rorepo, output)
-
- assert_equal(2, stats.total['files'])
- assert_equal(52, stats.total['lines'])
- assert_equal(29, stats.total['insertions'])
- assert_equal(23, stats.total['deletions'])
-
- assert_equal(29, stats.files["a.txt"]['insertions'])
- assert_equal(18, stats.files["a.txt"]['deletions'])
-
- assert_equal(0, stats.files["b.txt"]['insertions'])
- assert_equal(5, stats.files["b.txt"]['deletions'])
+
+ def test__list_from_string(self):
+ output = fixture('diff_numstat')
+ stats = Stats._list_from_string(self.rorepo, output)
+
+ assert_equal(2, stats.total['files'])
+ assert_equal(52, stats.total['lines'])
+ assert_equal(29, stats.total['insertions'])
+ assert_equal(23, stats.total['deletions'])
+
+ assert_equal(29, stats.files["a.txt"]['insertions'])
+ assert_equal(18, stats.files["a.txt"]['deletions'])
+
+ assert_equal(0, stats.files["b.txt"]['insertions'])
+ assert_equal(5, stats.files["b.txt"]['deletions'])
diff --git a/test/git/test_tree.py b/test/git/test_tree.py
index 54a3352b..04b8c41a 100644
--- a/test/git/test_tree.py
+++ b/test/git/test_tree.py
@@ -9,58 +9,58 @@ from test.testlib import *
from git import *
class TestTree(TestCase):
-
- def setUp(self):
- self.repo = Repo(GIT_REPO)
+
+ def setUp(self):
+ self.repo = Repo(GIT_REPO)
-
-
- def test_traverse(self):
- root = self.repo.tree('0.1.6')
- num_recursive = 0
- all_items = list()
- for obj in root.traverse():
- if "/" in obj.path:
- num_recursive += 1
-
- assert isinstance(obj, (Blob, Tree))
- all_items.append(obj)
- # END for each object
- # limit recursion level to 0 - should be same as default iteration
- assert all_items
- assert 'CHANGES' in root
- assert len(list(root)) == len(list(root.traverse(depth=1)))
-
- # only choose trees
- trees_only = lambda i,d: i.type == "tree"
- trees = list(root.traverse(predicate = trees_only))
- assert len(trees) == len(list( i for i in root.traverse() if trees_only(i,0) ))
-
- # test prune
- lib_folder = lambda t,d: t.path == "lib"
- pruned_trees = list(root.traverse(predicate = trees_only,prune = lib_folder))
- assert len(pruned_trees) < len(trees)
-
- # trees and blobs
- assert len(set(trees)|set(root.trees)) == len(trees)
- assert len(set(b for b in root if isinstance(b, Blob)) | set(root.blobs)) == len( root.blobs )
- subitem = trees[0][0]
- assert "/" in subitem.path
- assert subitem.name == os.path.basename(subitem.path)
-
- # assure that at some point the traversed paths have a slash in them
- found_slash = False
- for item in root.traverse():
- assert os.path.isabs(item.abspath)
- if '/' in item.path:
- found_slash = True
- # END check for slash
-
- # slashes in paths are supported as well
- assert root[item.path] == item == root/item.path
- # END for each item
- assert found_slash
+
+
+ def test_traverse(self):
+ root = self.repo.tree('0.1.6')
+ num_recursive = 0
+ all_items = list()
+ for obj in root.traverse():
+ if "/" in obj.path:
+ num_recursive += 1
+
+ assert isinstance(obj, (Blob, Tree))
+ all_items.append(obj)
+ # END for each object
+ # limit recursion level to 0 - should be same as default iteration
+ assert all_items
+ assert 'CHANGES' in root
+ assert len(list(root)) == len(list(root.traverse(depth=1)))
+
+ # only choose trees
+ trees_only = lambda i,d: i.type == "tree"
+ trees = list(root.traverse(predicate = trees_only))
+ assert len(trees) == len(list( i for i in root.traverse() if trees_only(i,0) ))
+
+ # test prune
+ lib_folder = lambda t,d: t.path == "lib"
+ pruned_trees = list(root.traverse(predicate = trees_only,prune = lib_folder))
+ assert len(pruned_trees) < len(trees)
+
+ # trees and blobs
+ assert len(set(trees)|set(root.trees)) == len(trees)
+ assert len(set(b for b in root if isinstance(b, Blob)) | set(root.blobs)) == len( root.blobs )
+ subitem = trees[0][0]
+ assert "/" in subitem.path
+ assert subitem.name == os.path.basename(subitem.path)
+
+ # assure that at some point the traversed paths have a slash in them
+ found_slash = False
+ for item in root.traverse():
+ assert os.path.isabs(item.abspath)
+ if '/' in item.path:
+ found_slash = True
+ # END check for slash
+
+ # slashes in paths are supported as well
+ assert root[item.path] == item == root/item.path
+ # END for each item
+ assert found_slash
- def test_repr(self):
- tree = Tree(self.repo, 'abc')
- assert_equal('<git.Tree "abc">', repr(tree))
+ def test_repr(self):
+ tree = Tree(self.repo, 'abc')
+ assert_equal('<git.Tree "abc">', repr(tree))
diff --git a/test/git/test_utils.py b/test/git/test_utils.py
index ade2bffa..f843c12e 100644
--- a/test/git/test_utils.py
+++ b/test/git/test_utils.py
@@ -15,106 +15,106 @@ import time
class TestUtils(TestCase):
- def setup(self):
- self.testdict = {
- "string": "42",
- "int": 42,
- "array": [ 42 ],
- }
+ def setup(self):
+ self.testdict = {
+ "string": "42",
+ "int": 42,
+ "array": [ 42 ],
+ }
- def test_it_should_dashify(self):
- assert_equal('this-is-my-argument', dashify('this_is_my_argument'))
- assert_equal('foo', dashify('foo'))
-
-
- def test_lock_file(self):
- my_file = tempfile.mktemp()
- lock_file = LockFile(my_file)
- assert not lock_file._has_lock()
- # release lock we don't have - fine
- lock_file._release_lock()
-
- # get lock
- lock_file._obtain_lock_or_raise()
- assert lock_file._has_lock()
-
- # concurrent access
- other_lock_file = LockFile(my_file)
- assert not other_lock_file._has_lock()
- self.failUnlessRaises(IOError, other_lock_file._obtain_lock_or_raise)
-
- lock_file._release_lock()
- assert not lock_file._has_lock()
-
- other_lock_file._obtain_lock_or_raise()
- self.failUnlessRaises(IOError, lock_file._obtain_lock_or_raise)
-
- # auto-release on destruction
- del(other_lock_file)
- lock_file._obtain_lock_or_raise()
- lock_file._release_lock()
-
- def test_blocking_lock_file(self):
- my_file = tempfile.mktemp()
- lock_file = BlockingLockFile(my_file)
- lock_file._obtain_lock()
-
- # next one waits for the lock
- start = time.time()
- wait_time = 0.1
- wait_lock = BlockingLockFile(my_file, 0.05, wait_time)
- self.failUnlessRaises(IOError, wait_lock._obtain_lock)
- elapsed = time.time() - start
- assert elapsed <= wait_time + 0.02 # some extra time it may cost
-
- def _cmp_contents(self, file_path, data):
- # raise if data from file at file_path
- # does not match data string
- fp = open(file_path, "rb")
- try:
- assert fp.read() == data
- finally:
- fp.close()
-
- def test_safe_operation(self):
- my_file = tempfile.mktemp()
- orig_data = "hello"
- new_data = "world"
- my_file_fp = open(my_file, "wb")
- my_file_fp.write(orig_data)
- my_file_fp.close()
-
- try:
- cwrite = ConcurrentWriteOperation(my_file)
-
- # didn't start writing, doesnt matter
- cwrite._end_writing(False)
- cwrite._end_writing(True)
- assert not cwrite._is_writing()
-
- # write data and fail
- stream = cwrite._begin_writing()
- assert cwrite._is_writing()
- stream.write(new_data)
- cwrite._end_writing(successful=False)
- self._cmp_contents(my_file, orig_data)
- assert not os.path.exists(stream.name)
-
- # write data - concurrently
- ocwrite = ConcurrentWriteOperation(my_file)
- stream = cwrite._begin_writing()
- self.failUnlessRaises(IOError, ocwrite._begin_writing)
-
- stream.write("world")
- cwrite._end_writing(successful=True)
- self._cmp_contents(my_file, new_data)
- assert not os.path.exists(stream.name)
-
- # could test automatic _end_writing on destruction
- finally:
- os.remove(my_file)
- # END final cleanup
-
-
-
-
+ def test_it_should_dashify(self):
+ assert_equal('this-is-my-argument', dashify('this_is_my_argument'))
+ assert_equal('foo', dashify('foo'))
+
+
+ def test_lock_file(self):
+ my_file = tempfile.mktemp()
+ lock_file = LockFile(my_file)
+ assert not lock_file._has_lock()
+ # release lock we don't have - fine
+ lock_file._release_lock()
+
+ # get lock
+ lock_file._obtain_lock_or_raise()
+ assert lock_file._has_lock()
+
+ # concurrent access
+ other_lock_file = LockFile(my_file)
+ assert not other_lock_file._has_lock()
+ self.failUnlessRaises(IOError, other_lock_file._obtain_lock_or_raise)
+
+ lock_file._release_lock()
+ assert not lock_file._has_lock()
+
+ other_lock_file._obtain_lock_or_raise()
+ self.failUnlessRaises(IOError, lock_file._obtain_lock_or_raise)
+
+ # auto-release on destruction
+ del(other_lock_file)
+ lock_file._obtain_lock_or_raise()
+ lock_file._release_lock()
+
+ def test_blocking_lock_file(self):
+ my_file = tempfile.mktemp()
+ lock_file = BlockingLockFile(my_file)
+ lock_file._obtain_lock()
+
+ # next one waits for the lock
+ start = time.time()
+ wait_time = 0.1
+ wait_lock = BlockingLockFile(my_file, 0.05, wait_time)
+ self.failUnlessRaises(IOError, wait_lock._obtain_lock)
+ elapsed = time.time() - start
+ assert elapsed <= wait_time + 0.02 # some extra time it may cost
+
+ def _cmp_contents(self, file_path, data):
+ # raise if data from file at file_path
+ # does not match data string
+ fp = open(file_path, "rb")
+ try:
+ assert fp.read() == data
+ finally:
+ fp.close()
+
+ def test_safe_operation(self):
+ my_file = tempfile.mktemp()
+ orig_data = "hello"
+ new_data = "world"
+ my_file_fp = open(my_file, "wb")
+ my_file_fp.write(orig_data)
+ my_file_fp.close()
+
+ try:
+ cwrite = ConcurrentWriteOperation(my_file)
+
+ # didn't start writing, doesnt matter
+ cwrite._end_writing(False)
+ cwrite._end_writing(True)
+ assert not cwrite._is_writing()
+
+ # write data and fail
+ stream = cwrite._begin_writing()
+ assert cwrite._is_writing()
+ stream.write(new_data)
+ cwrite._end_writing(successful=False)
+ self._cmp_contents(my_file, orig_data)
+ assert not os.path.exists(stream.name)
+
+ # write data - concurrently
+ ocwrite = ConcurrentWriteOperation(my_file)
+ stream = cwrite._begin_writing()
+ self.failUnlessRaises(IOError, ocwrite._begin_writing)
+
+ stream.write("world")
+ cwrite._end_writing(successful=True)
+ self._cmp_contents(my_file, new_data)
+ assert not os.path.exists(stream.name)
+
+ # could test automatic _end_writing on destruction
+ finally:
+ os.remove(my_file)
+ # END final cleanup
+
+
+
+
diff --git a/test/testlib/__init__.py b/test/testlib/__init__.py
index 2133eb8c..77512794 100644
--- a/test/testlib/__init__.py
+++ b/test/testlib/__init__.py
@@ -10,4 +10,4 @@ from asserts import *
from helper import *
__all__ = [ name for name, obj in locals().items()
- if not (name.startswith('_') or inspect.ismodule(obj)) ]
+ if not (name.startswith('_') or inspect.ismodule(obj)) ]
diff --git a/test/testlib/asserts.py b/test/testlib/asserts.py
index 46fcf20e..fa754b92 100644
--- a/test/testlib/asserts.py
+++ b/test/testlib/asserts.py
@@ -11,40 +11,40 @@ from nose.tools import *
import stat
__all__ = ['assert_instance_of', 'assert_not_instance_of',
- 'assert_none', 'assert_not_none',
- 'assert_match', 'assert_not_match', 'assert_mode_644',
- 'assert_mode_755'] + tools.__all__
+ 'assert_none', 'assert_not_none',
+ 'assert_match', 'assert_not_match', 'assert_mode_644',
+ 'assert_mode_755'] + tools.__all__
def assert_instance_of(expected, actual, msg=None):
- """Verify that object is an instance of expected """
- assert isinstance(actual, expected), msg
+ """Verify that object is an instance of expected """
+ assert isinstance(actual, expected), msg
def assert_not_instance_of(expected, actual, msg=None):
- """Verify that object is not an instance of expected """
- assert not isinstance(actual, expected, msg)
-
+ """Verify that object is not an instance of expected """
+ assert not isinstance(actual, expected, msg)
+
def assert_none(actual, msg=None):
- """verify that item is None"""
- assert actual is None, msg
+ """verify that item is None"""
+ assert actual is None, msg
def assert_not_none(actual, msg=None):
- """verify that item is None"""
- assert actual is not None, msg
+ """verify that item is None"""
+ assert actual is not None, msg
def assert_match(pattern, string, msg=None):
- """verify that the pattern matches the string"""
- assert_not_none(re.search(pattern, string), msg)
+ """verify that the pattern matches the string"""
+ assert_not_none(re.search(pattern, string), msg)
def assert_not_match(pattern, string, msg=None):
- """verify that the pattern does not match the string"""
- assert_none(re.search(pattern, string), msg)
-
+ """verify that the pattern does not match the string"""
+ assert_none(re.search(pattern, string), msg)
+
def assert_mode_644(mode):
- """Verify given mode is 644"""
- assert (mode & stat.S_IROTH) and (mode & stat.S_IRGRP)
- assert (mode & stat.S_IWUSR) and (mode & stat.S_IRUSR) and not (mode & stat.S_IXUSR)
+ """Verify given mode is 644"""
+ assert (mode & stat.S_IROTH) and (mode & stat.S_IRGRP)
+ assert (mode & stat.S_IWUSR) and (mode & stat.S_IRUSR) and not (mode & stat.S_IXUSR)
def assert_mode_755(mode):
- """Verify given mode is 755"""
- assert (mode & stat.S_IROTH) and (mode & stat.S_IRGRP) and (mode & stat.S_IXOTH) and (mode & stat.S_IXGRP)
- assert (mode & stat.S_IWUSR) and (mode & stat.S_IRUSR) and (mode & stat.S_IXUSR) \ No newline at end of file
+ """Verify given mode is 755"""
+ assert (mode & stat.S_IROTH) and (mode & stat.S_IRGRP) and (mode & stat.S_IXOTH) and (mode & stat.S_IXGRP)
+ assert (mode & stat.S_IWUSR) and (mode & stat.S_IRUSR) and (mode & stat.S_IXUSR) \ No newline at end of file
diff --git a/test/testlib/helper.py b/test/testlib/helper.py
index ba748a15..9c38ffd5 100644
--- a/test/testlib/helper.py
+++ b/test/testlib/helper.py
@@ -13,249 +13,249 @@ import shutil
GIT_REPO = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
def fixture_path(name):
- test_dir = os.path.dirname(os.path.dirname(__file__))
- return os.path.join(test_dir, "fixtures", name)
+ test_dir = os.path.dirname(os.path.dirname(__file__))
+ return os.path.join(test_dir, "fixtures", name)
def fixture(name):
- return open(fixture_path(name), 'rb').read()
+ return open(fixture_path(name), 'rb').read()
def absolute_project_path():
- return os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
-
-
+ return os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
+
+
class ListProcessAdapter(object):
- """Allows to use lists as Process object as returned by SubProcess.Popen.
- Its tailored to work with the test system only"""
-
- class Stream(object):
- """Simple stream emulater meant to work only with tests"""
- def __init__(self, data):
- self.data = data
- self.cur_iter = None
-
- def __iter__(self):
- dat = self.data
- if isinstance(dat, basestring):
- dat = dat.splitlines()
- if self.cur_iter is None:
- self.cur_iter = iter(dat)
- return self.cur_iter
-
- def read(self):
- dat = self.data
- if isinstance(dat, (tuple,list)):
- dat = "\n".join(dat)
- return dat
-
- def next(self):
- if self.cur_iter is None:
- self.cur_iter = iter(self)
- return self.cur_iter.next()
-
- # END stream
-
- def __init__(self, input_list_or_string):
- self.stdout = self.Stream(input_list_or_string)
- self.stderr = self.Stream('')
-
- def wait(self):
- return 0
-
- poll = wait
-
+ """Allows to use lists as Process object as returned by SubProcess.Popen.
+ Its tailored to work with the test system only"""
+
+ class Stream(object):
+ """Simple stream emulater meant to work only with tests"""
+ def __init__(self, data):
+ self.data = data
+ self.cur_iter = None
+
+ def __iter__(self):
+ dat = self.data
+ if isinstance(dat, basestring):
+ dat = dat.splitlines()
+ if self.cur_iter is None:
+ self.cur_iter = iter(dat)
+ return self.cur_iter
+
+ def read(self):
+ dat = self.data
+ if isinstance(dat, (tuple,list)):
+ dat = "\n".join(dat)
+ return dat
+
+ def next(self):
+ if self.cur_iter is None:
+ self.cur_iter = iter(self)
+ return self.cur_iter.next()
+
+ # END stream
+
+ def __init__(self, input_list_or_string):
+ self.stdout = self.Stream(input_list_or_string)
+ self.stderr = self.Stream('')
+
+ def wait(self):
+ return 0
+
+ poll = wait
+
def _rmtree_onerror(osremove, fullpath, exec_info):
- """
- Handle the case on windows that read-only files cannot be deleted by
- os.remove by setting it to mode 777, then retry deletion.
- """
- if os.name != 'nt' or osremove is not os.remove:
- raise
-
- os.chmod(fullpath, 0777)
- os.remove(fullpath)
+ """
+ Handle the case on windows that read-only files cannot be deleted by
+ os.remove by setting it to mode 777, then retry deletion.
+ """
+ if os.name != 'nt' or osremove is not os.remove:
+ raise
+
+ os.chmod(fullpath, 0777)
+ os.remove(fullpath)
def with_bare_rw_repo(func):
- """
- Decorator providing a specially made read-write repository to the test case
- decorated with it. The test case requires the following signature::
- def case(self, rw_repo)
-
- The rwrepo will be a bare clone or the types rorepo. Once the method finishes,
- it will be removed completely.
-
- Use this if you want to make purely index based adjustments, change refs, create
- heads, generally operations that do not need a working tree.
- """
- def bare_repo_creator(self):
- repo_dir = tempfile.mktemp("bare_repo")
- rw_repo = self.rorepo.clone(repo_dir, shared=True, bare=True)
- prev_cwd = os.getcwd()
- try:
- return func(self, rw_repo)
- finally:
- rw_repo.git.clear_cache()
- shutil.rmtree(repo_dir, onerror=_rmtree_onerror)
- # END cleanup
- # END bare repo creator
- bare_repo_creator.__name__ = func.__name__
- return bare_repo_creator
-
+ """
+ Decorator providing a specially made read-write repository to the test case
+ decorated with it. The test case requires the following signature::
+ def case(self, rw_repo)
+
+ The rwrepo will be a bare clone or the types rorepo. Once the method finishes,
+ it will be removed completely.
+
+ Use this if you want to make purely index based adjustments, change refs, create
+ heads, generally operations that do not need a working tree.
+ """
+ def bare_repo_creator(self):
+ repo_dir = tempfile.mktemp("bare_repo")
+ rw_repo = self.rorepo.clone(repo_dir, shared=True, bare=True)
+ prev_cwd = os.getcwd()
+ try:
+ return func(self, rw_repo)
+ finally:
+ rw_repo.git.clear_cache()
+ shutil.rmtree(repo_dir, onerror=_rmtree_onerror)
+ # END cleanup
+ # END bare repo creator
+ bare_repo_creator.__name__ = func.__name__
+ return bare_repo_creator
+
def with_rw_repo(working_tree_ref):
- """
- Same as with_bare_repo, but clones the rorepo as non-bare repository, checking
- out the working tree at the given working_tree_ref.
-
- This repository type is more costly due to the working copy checkout.
-
- To make working with relative paths easier, the cwd will be set to the working
- dir of the repository.
- """
- assert isinstance(working_tree_ref, basestring), "Decorator requires ref name for working tree checkout"
- def argument_passer(func):
- def repo_creator(self):
- repo_dir = tempfile.mktemp("non_bare_repo")
- rw_repo = self.rorepo.clone(repo_dir, shared=True, bare=False, n=True)
-
- rw_repo.head.commit = working_tree_ref
- rw_repo.head.reference.checkout()
-
- prev_cwd = os.getcwd()
- os.chdir(rw_repo.working_dir)
- try:
- return func(self, rw_repo)
- finally:
- os.chdir(prev_cwd)
- rw_repo.git.clear_cache()
- shutil.rmtree(repo_dir, onerror=_rmtree_onerror)
- # END cleanup
- # END rw repo creator
- repo_creator.__name__ = func.__name__
- return repo_creator
- # END argument passer
- return argument_passer
-
+ """
+ Same as with_bare_repo, but clones the rorepo as non-bare repository, checking
+ out the working tree at the given working_tree_ref.
+
+ This repository type is more costly due to the working copy checkout.
+
+ To make working with relative paths easier, the cwd will be set to the working
+ dir of the repository.
+ """
+ assert isinstance(working_tree_ref, basestring), "Decorator requires ref name for working tree checkout"
+ def argument_passer(func):
+ def repo_creator(self):
+ repo_dir = tempfile.mktemp("non_bare_repo")
+ rw_repo = self.rorepo.clone(repo_dir, shared=True, bare=False, n=True)
+
+ rw_repo.head.commit = working_tree_ref
+ rw_repo.head.reference.checkout()
+
+ prev_cwd = os.getcwd()
+ os.chdir(rw_repo.working_dir)
+ try:
+ return func(self, rw_repo)
+ finally:
+ os.chdir(prev_cwd)
+ rw_repo.git.clear_cache()
+ shutil.rmtree(repo_dir, onerror=_rmtree_onerror)
+ # END cleanup
+ # END rw repo creator
+ repo_creator.__name__ = func.__name__
+ return repo_creator
+ # END argument passer
+ return argument_passer
+
def with_rw_and_rw_remote_repo(working_tree_ref):
- """
- Same as with_rw_repo, but also provides a writable remote repository from which the
- rw_repo has been forked as well as a handle for a git-daemon that may be started to
- run the remote_repo.
- The remote repository was cloned as bare repository from the rorepo, wheras
- the rw repo has a working tree and was cloned from the remote repository.
-
- remote_repo has two remotes: origin and daemon_origin. One uses a local url,
- the other uses a server url. The daemon setup must be done on system level
- and should be an inetd service that serves tempdir.gettempdir() and all
- directories in it.
-
- The following scetch demonstrates this::
- rorepo ---<bare clone>---> rw_remote_repo ---<clone>---> rw_repo
-
- The test case needs to support the following signature::
- def case(self, rw_repo, rw_remote_repo)
-
- This setup allows you to test push and pull scenarios and hooks nicely.
-
- See working dir info in with_rw_repo
- """
- assert isinstance(working_tree_ref, basestring), "Decorator requires ref name for working tree checkout"
- def argument_passer(func):
- def remote_repo_creator(self):
- remote_repo_dir = tempfile.mktemp("remote_repo")
- repo_dir = tempfile.mktemp("remote_clone_non_bare_repo")
-
- rw_remote_repo = self.rorepo.clone(remote_repo_dir, shared=True, bare=True)
- rw_repo = rw_remote_repo.clone(repo_dir, shared=True, bare=False, n=True) # recursive alternates info ?
- rw_repo.head.commit = working_tree_ref
- rw_repo.head.reference.checkout()
-
- # prepare for git-daemon
- rw_remote_repo.daemon_export = True
-
- # this thing is just annoying !
- crw = rw_remote_repo.config_writer()
- section = "daemon"
- try:
- crw.add_section(section)
- except Exception:
- pass
- crw.set(section, "receivepack", True)
- # release lock
- del(crw)
-
- # initialize the remote - first do it as local remote and pull, then
- # we change the url to point to the daemon. The daemon should be started
- # by the user, not by us
- d_remote = Remote.create(rw_repo, "daemon_origin", remote_repo_dir)
- d_remote.fetch()
- remote_repo_url = "git://localhost%s" % remote_repo_dir
- d_remote.config_writer.set('url', remote_repo_url)
-
- # try to list remotes to diagnoes whether the server is up
- try:
- rw_repo.git.ls_remote(d_remote)
- except GitCommandError,e:
- print str(e)
- if os.name == 'nt':
- raise AssertionError('git-daemon needs to run this test, but windows does not have one. Otherwise, run: git-daemon "%s"'%tempfile.gettempdir())
- else:
- raise AssertionError('Please start a git-daemon to run this test, execute: git-daemon "%s"'%tempfile.gettempdir())
-
- # adjust working dir
- prev_cwd = os.getcwd()
- os.chdir(rw_repo.working_dir)
- try:
- return func(self, rw_repo, rw_remote_repo)
- finally:
- os.chdir(prev_cwd)
- rw_repo.git.clear_cache()
- rw_remote_repo.git.clear_cache()
- shutil.rmtree(repo_dir, onerror=_rmtree_onerror)
- shutil.rmtree(remote_repo_dir, onerror=_rmtree_onerror)
- # END cleanup
- # END bare repo creator
- remote_repo_creator.__name__ = func.__name__
- return remote_repo_creator
- # END remote repo creator
- # END argument parsser
-
- return argument_passer
-
-
+ """
+ Same as with_rw_repo, but also provides a writable remote repository from which the
+ rw_repo has been forked as well as a handle for a git-daemon that may be started to
+ run the remote_repo.
+ The remote repository was cloned as bare repository from the rorepo, wheras
+ the rw repo has a working tree and was cloned from the remote repository.
+
+ remote_repo has two remotes: origin and daemon_origin. One uses a local url,
+ the other uses a server url. The daemon setup must be done on system level
+ and should be an inetd service that serves tempdir.gettempdir() and all
+ directories in it.
+
+ The following scetch demonstrates this::
+ rorepo ---<bare clone>---> rw_remote_repo ---<clone>---> rw_repo
+
+ The test case needs to support the following signature::
+ def case(self, rw_repo, rw_remote_repo)
+
+ This setup allows you to test push and pull scenarios and hooks nicely.
+
+ See working dir info in with_rw_repo
+ """
+ assert isinstance(working_tree_ref, basestring), "Decorator requires ref name for working tree checkout"
+ def argument_passer(func):
+ def remote_repo_creator(self):
+ remote_repo_dir = tempfile.mktemp("remote_repo")
+ repo_dir = tempfile.mktemp("remote_clone_non_bare_repo")
+
+ rw_remote_repo = self.rorepo.clone(remote_repo_dir, shared=True, bare=True)
+ rw_repo = rw_remote_repo.clone(repo_dir, shared=True, bare=False, n=True) # recursive alternates info ?
+ rw_repo.head.commit = working_tree_ref
+ rw_repo.head.reference.checkout()
+
+ # prepare for git-daemon
+ rw_remote_repo.daemon_export = True
+
+ # this thing is just annoying !
+ crw = rw_remote_repo.config_writer()
+ section = "daemon"
+ try:
+ crw.add_section(section)
+ except Exception:
+ pass
+ crw.set(section, "receivepack", True)
+ # release lock
+ del(crw)
+
+ # initialize the remote - first do it as local remote and pull, then
+ # we change the url to point to the daemon. The daemon should be started
+ # by the user, not by us
+ d_remote = Remote.create(rw_repo, "daemon_origin", remote_repo_dir)
+ d_remote.fetch()
+ remote_repo_url = "git://localhost%s" % remote_repo_dir
+ d_remote.config_writer.set('url', remote_repo_url)
+
+ # try to list remotes to diagnoes whether the server is up
+ try:
+ rw_repo.git.ls_remote(d_remote)
+ except GitCommandError,e:
+ print str(e)
+ if os.name == 'nt':
+ raise AssertionError('git-daemon needs to run this test, but windows does not have one. Otherwise, run: git-daemon "%s"'%tempfile.gettempdir())
+ else:
+ raise AssertionError('Please start a git-daemon to run this test, execute: git-daemon "%s"'%tempfile.gettempdir())
+
+ # adjust working dir
+ prev_cwd = os.getcwd()
+ os.chdir(rw_repo.working_dir)
+ try:
+ return func(self, rw_repo, rw_remote_repo)
+ finally:
+ os.chdir(prev_cwd)
+ rw_repo.git.clear_cache()
+ rw_remote_repo.git.clear_cache()
+ shutil.rmtree(repo_dir, onerror=_rmtree_onerror)
+ shutil.rmtree(remote_repo_dir, onerror=_rmtree_onerror)
+ # END cleanup
+ # END bare repo creator
+ remote_repo_creator.__name__ = func.__name__
+ return remote_repo_creator
+ # END remote repo creator
+ # END argument parsser
+
+ return argument_passer
+
+
class TestBase(TestCase):
- """
- Base Class providing default functionality to all tests such as:
-
- - Utility functions provided by the TestCase base of the unittest method such as::
- self.fail("todo")
- self.failUnlessRaises(...)
-
- - Class level repository which is considered read-only as it is shared among
- all test cases in your type.
- Access it using::
- self.rorepo # 'ro' stands for read-only
-
- The rorepo is in fact your current project's git repo. If you refer to specific
- shas for your objects, be sure you choose some that are part of the immutable portion
- of the project history ( to assure tests don't fail for others ).
- """
-
- @classmethod
- def setUpAll(cls):
- """
- Dynamically add a read-only repository to our actual type. This way
- each test type has its own repository
- """
- cls.rorepo = Repo(GIT_REPO)
-
- def _make_file(self, rela_path, data, repo=None):
- """
- Create a file at the given path relative to our repository, filled
- with the given data. Returns absolute path to created file.
- """
- repo = repo or self.rorepo
- abs_path = os.path.join(repo.working_tree_dir, rela_path)
- fp = open(abs_path, "w")
- fp.write(data)
- fp.close()
- return abs_path
+ """
+ Base Class providing default functionality to all tests such as:
+
+ - Utility functions provided by the TestCase base of the unittest method such as::
+ self.fail("todo")
+ self.failUnlessRaises(...)
+
+ - Class level repository which is considered read-only as it is shared among
+ all test cases in your type.
+ Access it using::
+ self.rorepo # 'ro' stands for read-only
+
+ The rorepo is in fact your current project's git repo. If you refer to specific
+ shas for your objects, be sure you choose some that are part of the immutable portion
+ of the project history ( to assure tests don't fail for others ).
+ """
+
+ @classmethod
+ def setUpAll(cls):
+ """
+ Dynamically add a read-only repository to our actual type. This way
+ each test type has its own repository
+ """
+ cls.rorepo = Repo(GIT_REPO)
+
+ def _make_file(self, rela_path, data, repo=None):
+ """
+ Create a file at the given path relative to our repository, filled
+ with the given data. Returns absolute path to created file.
+ """
+ repo = repo or self.rorepo
+ abs_path = os.path.join(repo.working_tree_dir, rela_path)
+ fp = open(abs_path, "w")
+ fp.write(data)
+ fp.close()
+ return abs_path