diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2009-11-04 19:53:42 +0100 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2009-11-04 19:53:42 +0100 |
commit | d884adc80c80300b4cc05321494713904ef1df2d (patch) | |
tree | 3878d5e0282531596d42505d8725482dde002c20 /test/git | |
parent | 05d2687afcc78cd192714ee3d71fdf36a37d110f (diff) | |
parent | ace1fed6321bb8dd6d38b2f58d7cf815fa16db7a (diff) | |
download | gitpython-d884adc80c80300b4cc05321494713904ef1df2d.tar.gz |
Merge branch 'improvements'
Diffstat (limited to 'test/git')
-rw-r--r-- | test/git/test_actor.py | 8 | ||||
-rw-r--r-- | test/git/test_base.py | 76 | ||||
-rw-r--r-- | test/git/test_blob.py | 12 | ||||
-rw-r--r-- | test/git/test_commit.py | 196 | ||||
-rw-r--r-- | test/git/test_config.py | 87 | ||||
-rw-r--r-- | test/git/test_diff.py | 68 | ||||
-rw-r--r-- | test/git/test_git.py | 8 | ||||
-rw-r--r-- | test/git/test_head.py | 26 | ||||
-rw-r--r-- | test/git/test_index.py | 345 | ||||
-rw-r--r-- | test/git/test_performance.py | 6 | ||||
-rw-r--r-- | test/git/test_refs.py | 239 | ||||
-rw-r--r-- | test/git/test_remote.py | 423 | ||||
-rw-r--r-- | test/git/test_repo.py | 230 | ||||
-rw-r--r-- | test/git/test_stats.py | 6 | ||||
-rw-r--r-- | test/git/test_tag.py | 34 | ||||
-rw-r--r-- | test/git/test_tree.py | 23 | ||||
-rw-r--r-- | test/git/test_utils.py | 87 |
17 files changed, 1491 insertions, 383 deletions
diff --git a/test/git/test_actor.py b/test/git/test_actor.py index b7c2af7c..2941468d 100644 --- a/test/git/test_actor.py +++ b/test/git/test_actor.py @@ -13,6 +13,14 @@ class TestActor(object): a = Actor._from_string("Michael Trier <mtrier@example.com>") assert_equal("Michael Trier", a.name) assert_equal("mtrier@example.com", a.email) + + # base type capabilities + assert a == a + assert not ( a != a ) + m = set() + m.add(a) + m.add(a) + assert len(m) == 1 def test_from_string_should_handle_just_name(self): a = Actor._from_string("Michael Trier") diff --git a/test/git/test_base.py b/test/git/test_base.py index 04222e2e..497f90fb 100644 --- a/test/git/test_base.py +++ b/test/git/test_base.py @@ -4,23 +4,23 @@ # This module is part of GitPython and is released under # the BSD License: http://www.opensource.org/licenses/bsd-license.php -from test.testlib import * -from git import * import git.objects.base as base import git.refs as refs +import os + +from test.testlib import * +from git import * from itertools import chain from git.objects.utils import get_object_type_by_name +import tempfile -class TestBase(object): +class TestBase(TestBase): type_tuples = ( ("blob", "8741fc1d09d61f02ffd8cded15ff603eff1ec070"), ("tree", "3a6a5e3eeed3723c09f1ef0399f81ed6b8d82e79"), ("commit", "4251bd59fb8e11e40c40548cba38180a9536118c"), ("tag", "e56a60e8e9cd333cfba0140a77cd12b0d9398f10") ) - def setup(self): - self.repo = Repo(GIT_REPO) - def test_base_object(self): # test interface of base object classes types = (Blob, Tree, Commit, TagObject) @@ -30,15 +30,15 @@ class TestBase(object): num_objs = 0 num_index_objs = 0 for obj_type, (typename, hexsha) in zip(types, self.type_tuples): - item = obj_type(self.repo,hexsha) + item = obj_type(self.rorepo,hexsha) num_objs += 1 - assert item.id == hexsha + assert item.sha == hexsha assert item.type == typename assert item.size assert item.data assert item == item assert not item != item - assert str(item) == item.id + assert str(item) == item.sha assert repr(item) s.add(item) @@ -48,6 +48,17 @@ class TestBase(object): assert not item.path.startswith("/") # must be relative assert isinstance(item.mode, int) # END index object check + + # read from stream + data_stream = item.data_stream + data = data_stream.read() + assert data + + tmpfile = os.tmpfile() + assert item == item.stream_data(tmpfile) + tmpfile.seek(0) + assert tmpfile.read() == data + # END stream to file directly # END for each object type to create # each has a unique sha @@ -55,34 +66,6 @@ class TestBase(object): assert len(s|s) == num_objs assert num_index_objs == 2 - - def test_tags(self): - # tag refs can point to tag objects or to commits - s = set() - ref_count = 0 - for ref in chain(self.repo.tags, self.repo.heads): - ref_count += 1 - assert isinstance(ref, refs.Reference) - assert str(ref) == ref.name - assert repr(ref) - assert ref == ref - assert not ref != ref - s.add(ref) - # END for each ref - assert len(s) == ref_count - assert len(s|s) == ref_count - - def test_heads(self): - # see how it dynmically updates its object - for head in self.repo.heads: - head.name - head.path - prev_object = head.object - cur_object = head.object - assert prev_object == cur_object # represent the same git object - assert prev_object is not cur_object # but are different instances - # END for each head - def test_get_object_type_by_name(self): for tname in base.Object.TYPES: assert base.Object in get_object_type_by_name(tname).mro() @@ -90,3 +73,22 @@ class TestBase(object): assert_raises( ValueError, get_object_type_by_name, "doesntexist" ) + def test_object_resolution(self): + # objects must be resolved to shas so they compare equal + assert self.rorepo.head.reference.object == self.rorepo.active_branch.object + + @with_bare_rw_repo + def test_with_bare_rw_repo(self, bare_rw_repo): + assert bare_rw_repo.config_reader("repository").getboolean("core", "bare") + assert os.path.isfile(os.path.join(bare_rw_repo.path,'HEAD')) + + @with_rw_repo('0.1.6') + def test_with_rw_repo(self, rw_repo): + assert not rw_repo.config_reader("repository").getboolean("core", "bare") + assert os.path.isdir(os.path.join(rw_repo.git.git_dir,'lib')) + + @with_rw_and_rw_remote_repo('0.1.6') + def test_with_rw_remote_and_rw_repo(self, rw_repo, rw_remote_repo): + assert not rw_repo.config_reader("repository").getboolean("core", "bare") + assert rw_remote_repo.config_reader("repository").getboolean("core", "bare") + assert os.path.isdir(os.path.join(rw_repo.git.git_dir,'lib')) diff --git a/test/git/test_blob.py b/test/git/test_blob.py index e151b3c8..464cfd6b 100644 --- a/test/git/test_blob.py +++ b/test/git/test_blob.py @@ -7,26 +7,24 @@ from test.testlib import * from git import * -class TestBlob(object): - def setup(self): - self.repo = Repo(GIT_REPO) +class TestBlob(TestBase): def test_should_cache_data(self): bid = 'a802c139d4767c89dcad79d836d05f7004d39aac' - blob = Blob(self.repo, bid) + blob = Blob(self.rorepo, bid) blob.data assert blob.data blob.size blob.size def test_mime_type_should_return_mime_type_for_known_types(self): - blob = Blob(self.repo, **{'id': 'abc', 'path': 'foo.png'}) + blob = Blob(self.rorepo, **{'sha': 'abc', 'path': 'foo.png'}) assert_equal("image/png", blob.mime_type) def test_mime_type_should_return_text_plain_for_unknown_types(self): - blob = Blob(self.repo, **{'id': 'abc','path': 'something'}) + blob = Blob(self.rorepo, **{'sha': 'abc','path': 'something'}) assert_equal("text/plain", blob.mime_type) def test_should_return_appropriate_representation(self): - blob = Blob(self.repo, **{'id': 'abc'}) + blob = Blob(self.rorepo, **{'sha': 'abc'}) assert_equal('<git.Blob "abc">', repr(blob)) diff --git a/test/git/test_commit.py b/test/git/test_commit.py index 4e698ed0..be6d1a28 100644 --- a/test/git/test_commit.py +++ b/test/git/test_commit.py @@ -7,171 +7,22 @@ from test.testlib import * from git import * -class TestCommit(object): - def setup(self): - self.repo = Repo(GIT_REPO) +class TestCommit(TestBase): def test_bake(self): - commit = Commit(self.repo, **{'id': '2454ae89983a4496a445ce347d7a41c0bb0ea7ae'}) + commit = Commit(self.rorepo, **{'sha': '2454ae89983a4496a445ce347d7a41c0bb0ea7ae'}) commit.author # bake assert_equal("Sebastian Thiel", commit.author.name) assert_equal("byronimo@gmail.com", commit.author.email) + assert commit.author == commit.committer + assert isinstance(commit.authored_date, int) and isinstance(commit.committed_date, int) + assert commit.message == "Added missing information to docstrings of commit and stats module" - @patch_object(Git, '_call_process') - def test_diff(self, git): - git.return_value = fixture('diff_p') - - diffs = Commit.diff(self.repo, 'master') - - assert_equal(15, len(diffs)) - - diff = diffs[0] - assert_equal('.gitignore', diff.a_blob.path) - assert_equal('.gitignore', diff.b_blob.path) - assert_equal('4ebc8aea50e0a67e000ba29a30809d0a7b9b2666', diff.a_blob.id) - assert_equal('2dd02534615434d88c51307beb0f0092f21fd103', diff.b_blob.id) - - assert_mode_644(diff.b_blob.mode) - - assert_equal(False, diff.new_file) - assert_equal(False, diff.deleted_file) - assert_equal("--- a/.gitignore\n+++ b/.gitignore\n@@ -1 +1,2 @@\n coverage\n+pkg", diff.diff) - - diff = diffs[5] - assert_equal('lib/grit/actor.rb', diff.b_blob.path) - assert_equal(None, diff.a_blob) - assert_equal('f733bce6b57c0e5e353206e692b0e3105c2527f4', diff.b_blob.id) - assert_equal( None, diff.a_mode ) - assert_equal(True, diff.new_file) - - assert_true(git.called) - assert_equal(git.call_args, (('diff', '-M', 'master'), {'full_index': True})) - - @patch_object(Git, '_call_process') - def test_diff_with_rename(self, git): - git.return_value = fixture('diff_rename') - - diffs = Commit.diff(self.repo, 'rename') - - assert_equal(1, len(diffs)) - - diff = diffs[0] - assert_true(diff.renamed) - assert_equal(diff.rename_from, 'AUTHORS') - assert_equal(diff.rename_to, 'CONTRIBUTORS') - - assert_true(git.called) - assert_equal(git.call_args, (('diff', '-M', 'rename'), {'full_index': True})) - - @patch_object(Git, '_call_process') - def test_diff_with_two_commits(self, git): - git.return_value = fixture('diff_2') - - diffs = Commit.diff(self.repo, '59ddc32', '13d27d5') - - assert_equal(3, len(diffs)) - - assert_true(git.called) - assert_equal(git.call_args, (('diff', '-M', '59ddc32', '13d27d5'), {'full_index': True})) - - @patch_object(Git, '_call_process') - def test_diff_with_files(self, git): - git.return_value = fixture('diff_f') - - diffs = Commit.diff(self.repo, '59ddc32', ['lib']) - - assert_equal(1, len(diffs)) - assert_equal('lib/grit/diff.rb', diffs[0].a_blob.path) - - assert_true(git.called) - assert_equal(git.call_args, (('diff', '-M', '59ddc32', '--', 'lib'), {'full_index': True})) - - @patch_object(Git, '_call_process') - def test_diff_with_two_commits_and_files(self, git): - git.return_value = fixture('diff_2f') - - diffs = Commit.diff(self.repo, '59ddc32', '13d27d5', ['lib']) - - assert_equal(1, len(diffs)) - assert_equal('lib/grit/commit.rb', diffs[0].a_blob.path) - - assert_true(git.called) - assert_equal(git.call_args, (('diff', '-M', '59ddc32', '13d27d5', '--', 'lib'), {'full_index': True})) - - @patch_object(Git, '_call_process') - def test_diffs(self, git): - git.return_value = fixture('diff_p') - - commit = Commit(self.repo, id='91169e1f5fa4de2eaea3f176461f5dc784796769', parents=['038af8c329ef7c1bae4568b98bd5c58510465493']) - diffs = commit.diffs - - assert_equal(15, len(diffs)) - - diff = diffs[0] - assert_equal('.gitignore', diff.a_blob.path) - assert_equal('.gitignore', diff.b_blob.path) - assert_equal('4ebc8aea50e0a67e000ba29a30809d0a7b9b2666', diff.a_blob.id) - assert_equal('2dd02534615434d88c51307beb0f0092f21fd103', diff.b_blob.id) - assert_mode_644(diff.b_blob.mode) - assert_equal(False, diff.new_file) - assert_equal(False, diff.deleted_file) - assert_equal("--- a/.gitignore\n+++ b/.gitignore\n@@ -1 +1,2 @@\n coverage\n+pkg", diff.diff) - - diff = diffs[5] - assert_equal('lib/grit/actor.rb', diff.b_blob.path) - assert_equal(None, diff.a_blob) - assert_equal('f733bce6b57c0e5e353206e692b0e3105c2527f4', diff.b_blob.id) - assert_equal(True, diff.new_file) - - assert_true(git.called) - assert_equal(git.call_args, (('diff', '-M', - '038af8c329ef7c1bae4568b98bd5c58510465493', - '91169e1f5fa4de2eaea3f176461f5dc784796769', - ), {'full_index': True})) - - def test_diffs_on_initial_import(self): - commit = Commit(self.repo, '33ebe7acec14b25c5f84f35a664803fcab2f7781') - - for diff in commit.diffs: - assert isinstance(diff, Diff) - assert isinstance(diff.a_blob, Blob) or isinstance(diff.b_blob, Blob) - - if diff.a_mode is not None: - assert isinstance(diff.a_mode, int) - if diff.b_mode is not None: - isinstance(diff.b_mode, int) - - assert diff.diff is not None # can be empty - - if diff.renamed: - assert diff.rename_from and diff.rename_to and diff.rename_from != diff.rename_to - if diff.a_blob is None: - assert diff.new_file and isinstance(diff.new_file, bool) - if diff.b_blob is None: - assert diff.deleted_file and isinstance(diff.deleted_file, bool) - # END for each diff in initial import commit - - def test_diffs_on_initial_import_without_parents(self): - commit = Commit(self.repo, id='33ebe7acec14b25c5f84f35a664803fcab2f7781') - diffs = commit.diffs - assert diffs - - def test_diffs_with_mode_only_change(self): - commit = Commit(self.repo, id='ccde80b7a3037a004a7807a6b79916ce2a1e9729') - diffs = commit.diffs - - # in case of mode-only changes, there is no blob - assert_equal(1, len(diffs)) - assert_equal(None, diffs[0].a_blob) - assert_equal(None, diffs[0].b_blob) - assert_mode_644(diffs[0].a_mode) - assert_mode_755(diffs[0].b_mode) - def test_stats(self): - commit = Commit(self.repo, id='33ebe7acec14b25c5f84f35a664803fcab2f7781') + commit = Commit(self.rorepo, '33ebe7acec14b25c5f84f35a664803fcab2f7781') stats = commit.stats def check_entries(d): @@ -189,6 +40,14 @@ class TestCommit(object): check_entries(d) # END for each stated file + # assure data is parsed properly + michael = Actor._from_string("Michael Trier <mtrier@gmail.com>") + assert commit.author == michael + assert commit.committer == michael + assert commit.authored_date == 1210193388 + assert commit.committed_date == 1210193388 + assert commit.message == "initial project" + @patch_object(Git, '_call_process') def test_rev_list_bisect_all(self, git): """ @@ -198,13 +57,13 @@ class TestCommit(object): git.return_value = fixture('rev_list_bisect_all') - revs = self.repo.git.rev_list('HEAD', + revs = self.rorepo.git.rev_list('HEAD', pretty='raw', first_parent=True, bisect_all=True) assert_true(git.called) - commits = Commit._iter_from_process_or_stream(self.repo, ListProcessAdapter(revs)) + commits = Commit._iter_from_process_or_stream(self.rorepo, ListProcessAdapter(revs), True) expected_ids = ( 'cf37099ea8d1d8c7fbf9b6d12d7ec0249d3acb8b', '33ebe7acec14b25c5f84f35a664803fcab2f7781', @@ -213,29 +72,32 @@ class TestCommit(object): 'c231551328faa864848bde6ff8127f59c9566e90', ) for sha1, commit in zip(expected_ids, commits): - assert_equal(sha1, commit.id) + assert_equal(sha1, commit.sha) def test_count(self): - assert Commit.count( self.repo, '0.1.5' ) == 141 + assert self.rorepo.tag('refs/tags/0.1.5').commit.count( ) == 141 + + def test_list(self): + assert isinstance(Commit.list_items(self.rorepo, '0.1.5', max_count=5)['5117c9c8a4d3af19a9958677e45cda9269de1541'], Commit) def test_str(self): - commit = Commit(self.repo, id='abc') + commit = Commit(self.rorepo, 'abc') assert_equal ("abc", str(commit)) def test_repr(self): - commit = Commit(self.repo, id='abc') + commit = Commit(self.rorepo, 'abc') assert_equal('<git.Commit "abc">', repr(commit)) def test_equality(self): - commit1 = Commit(self.repo, id='abc') - commit2 = Commit(self.repo, id='abc') - commit3 = Commit(self.repo, id='zyx') + commit1 = Commit(self.rorepo, 'abc') + commit2 = Commit(self.rorepo, 'abc') + commit3 = Commit(self.rorepo, 'zyx') assert_equal(commit1, commit2) assert_not_equal(commit2, commit3) def test_iter_parents(self): # should return all but ourselves, even if skip is defined - c = self.repo.commit('0.1.5') + c = self.rorepo.commit('0.1.5') for skip in (0, 1): piter = c.iter_parents(skip=skip) first_parent = piter.next() @@ -243,3 +105,7 @@ class TestCommit(object): assert first_parent == c.parents[0] # END for each + def test_base(self): + name_rev = self.rorepo.head.commit.name_rev + assert isinstance(name_rev, basestring) + diff --git a/test/git/test_config.py b/test/git/test_config.py new file mode 100644 index 00000000..c2909b8f --- /dev/null +++ b/test/git/test_config.py @@ -0,0 +1,87 @@ +# test_config.py +# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors +# +# This module is part of GitPython and is released under +# the BSD License: http://www.opensource.org/licenses/bsd-license.php + +from test.testlib import * +from git import * +import StringIO +from copy import copy + +class TestBase(TestCase): + + def _to_memcache(self, file_path): + fp = open(file_path, "r") + sio = StringIO.StringIO() + sio.write(fp.read()) + sio.seek(0) + sio.name = file_path + return sio + + def _parsers_equal_or_raise(self, lhs, rhs): + pass + + def test_read_write(self): + # writer must create the exact same file as the one read before + for filename in ("git_config", "git_config_global"): + file_obj = self._to_memcache(fixture_path(filename)) + file_obj_orig = copy(file_obj) + w_config = GitConfigParser(file_obj, read_only = False) + w_config.read() # enforce reading + assert w_config._sections + w_config.write() # enforce writing + assert file_obj.getvalue() == file_obj_orig.getvalue() + + # creating an additional config writer must fail due to exclusive access + self.failUnlessRaises(IOError, GitConfigParser, file_obj, read_only = False) + + # should still have a lock and be able to make changes + assert w_config._has_lock() + + # changes should be written right away + sname = "my_section" + oname = "mykey" + val = "myvalue" + w_config.add_section(sname) + assert w_config.has_section(sname) + w_config.set(sname, oname, val) + assert w_config.has_option(sname,oname) + assert w_config.get(sname, oname) == val + + file_obj.seek(0) + r_config = GitConfigParser(file_obj, read_only=True) + assert r_config.has_section(sname) + assert r_config.has_option(sname, oname) + assert r_config.get(sname, oname) == val + + # END for each filename + + def test_base(self): + path_repo = fixture_path("git_config") + path_global = fixture_path("git_config_global") + r_config = GitConfigParser([path_repo, path_global], read_only=True) + assert r_config.read_only + num_sections = 0 + num_options = 0 + + # test reader methods + assert r_config._is_initialized == False + for section in r_config.sections(): + num_sections += 1 + for option in r_config.options(section): + num_options += 1 + val = r_config.get(section, option) + assert val + assert "\n" not in option + assert "\n" not in val + + # writing must fail + self.failUnlessRaises(IOError, r_config.set, section, option, None) + self.failUnlessRaises(IOError, r_config.remove_option, section, option ) + # END for each option + self.failUnlessRaises(IOError, r_config.remove_section, section) + # END for each section + assert num_sections and num_options + assert r_config._is_initialized == True + diff --git a/test/git/test_diff.py b/test/git/test_diff.py index b2339455..9335aced 100644 --- a/test/git/test_diff.py +++ b/test/git/test_diff.py @@ -7,19 +7,17 @@ from test.testlib import * from git import * -class TestDiff(object): - def setup(self): - self.repo = Repo(GIT_REPO) - +class TestDiff(TestBase): + def test_list_from_string_new_mode(self): - output = fixture('diff_new_mode') - diffs = Diff._list_from_string(self.repo, output) + output = ListProcessAdapter(fixture('diff_new_mode')) + diffs = Diff._index_from_patch_format(self.rorepo, output.stdout) assert_equal(1, len(diffs)) assert_equal(10, len(diffs[0].diff.splitlines())) def test_diff_with_rename(self): - output = fixture('diff_rename') - diffs = Diff._list_from_string(self.repo, output) + output = ListProcessAdapter(fixture('diff_rename')) + diffs = Diff._index_from_patch_format(self.rorepo, output.stdout) assert_equal(1, len(diffs)) @@ -28,3 +26,57 @@ class TestDiff(object): assert_equal(diff.rename_from, 'AUTHORS') assert_equal(diff.rename_to, 'CONTRIBUTORS') + def test_diff_patch_format(self): + # test all of the 'old' format diffs for completness - it should at least + # be able to deal with it + fixtures = ("diff_2", "diff_2f", "diff_f", "diff_i", "diff_mode_only", + "diff_new_mode", "diff_numstat", "diff_p", "diff_rename", + "diff_tree_numstat_root" ) + + for fixture_name in fixtures: + diff_proc = ListProcessAdapter(fixture(fixture_name)) + diffs = Diff._index_from_patch_format(self.rorepo, diff_proc.stdout) + # END for each fixture + + def test_diff_interface(self): + # test a few variations of the main diff routine + assertion_map = dict() + for i, commit in enumerate(self.rorepo.iter_commits('0.1.6', max_count=2)): + diff_item = commit + if i%2 == 0: + diff_item = commit.tree + # END use tree every second item + + for other in (None, commit.Index, commit.parents[0]): + for paths in (None, "CHANGES", ("CHANGES", "lib")): + for create_patch in range(2): + diff_index = diff_item.diff(other, paths, create_patch) + assert isinstance(diff_index, DiffIndex) + + if diff_index: + for ct in DiffIndex.change_type: + key = 'ct_%s'%ct + assertion_map.setdefault(key, 0) + assertion_map[key] = assertion_map[key]+len(list(diff_index.iter_change_type(ct))) + # END for each changetype + + # check entries + diff_set = set() + diff_set.add(diff_index[0]) + diff_set.add(diff_index[0]) + assert len(diff_set) == 1 + assert diff_index[0] == diff_index[0] + assert not (diff_index[0] != diff_index[0]) + # END diff index checking + # END for each patch option + # END for each path option + # END for each other side + # END for each commit + + # assert we could always find at least one instance of the members we + # can iterate in the diff index - if not this indicates its not working correctly + # or our test does not span the whole range of possibilities + for key,value in assertion_map.items(): + assert value, "Did not find diff for %s" % key + # END for each iteration type + diff --git a/test/git/test_git.py b/test/git/test_git.py index 1f44aebc..c4a39e85 100644 --- a/test/git/test_git.py +++ b/test/git/test_git.py @@ -8,9 +8,11 @@ import os, sys from test.testlib import * from git import Git, GitCommandError -class TestGit(object): - def setup(self): - self.git = Git(GIT_REPO) +class TestGit(TestCase): + + @classmethod + def setUpAll(cls): + cls.git = Git(GIT_REPO) @patch_object(Git, 'execute') def test_call_process_calls_execute(self, git): diff --git a/test/git/test_head.py b/test/git/test_head.py deleted file mode 100644 index b8380838..00000000 --- a/test/git/test_head.py +++ /dev/null @@ -1,26 +0,0 @@ -# test_head.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php - -from test.testlib import * -from git import * - -class TestHead(object): - def setup(self): - self.repo = Repo(GIT_REPO) - - def test_base(self): - for head in self.repo.heads: - assert head.name - assert "refs/heads" in head.path - # END for each head - - @patch_object(Git, '_call_process') - def test_ref_with_path_component(self, git): - git.return_value = fixture('for_each_ref_with_path_component') - head = self.repo.heads[0] - - assert_equal('refactoring/feature1', head.name) - assert_true(git.called) diff --git a/test/git/test_index.py b/test/git/test_index.py new file mode 100644 index 00000000..e9541232 --- /dev/null +++ b/test/git/test_index.py @@ -0,0 +1,345 @@ +# test_index.py +# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors +# +# This module is part of GitPython and is released under +# the BSD License: http://www.opensource.org/licenses/bsd-license.php + +from test.testlib import * +from git import * +import inspect +import os +import sys +import tempfile +import glob +from stat import * + +class TestTree(TestBase): + + def test_index_file_base(self): + # read from file + index = IndexFile(self.rorepo, fixture_path("index")) + assert index.entries + assert index.version > 0 + + # test entry + last_val = None + entry = index.entries.itervalues().next() + for attr in ("path","ctime","mtime","dev","inode","mode","uid", + "gid","size","sha","stage"): + val = getattr(entry, attr) + # END for each method + + # test update + entries = index.entries + assert isinstance(index.update(), IndexFile) + assert entries is not index.entries + + # test stage + index_merge = IndexFile(self.rorepo, fixture_path("index_merge")) + assert len(index_merge.entries) == 106 + assert len(list(e for e in index_merge.entries.itervalues() if e.stage != 0 )) + + # write the data - it must match the original + tmpfile = tempfile.mktemp() + index_merge.write(tmpfile) + fp = open(tmpfile, 'r') + assert fp.read() == fixture("index_merge") + fp.close() + os.remove(tmpfile) + + def _cmp_tree_index(self, tree, index): + # fail unless both objects contain the same paths and blobs + if isinstance(tree, str): + tree = self.rorepo.commit(tree).tree + + num_blobs = 0 + for blob in tree.traverse(predicate = lambda e: e.type == "blob"): + assert (blob.path,0) in index.entries + num_blobs += 1 + # END for each blob in tree + assert num_blobs == len(index.entries) + + def test_index_file_from_tree(self): + common_ancestor_sha = "5117c9c8a4d3af19a9958677e45cda9269de1541" + cur_sha = "4b43ca7ff72d5f535134241e7c797ddc9c7a3573" + other_sha = "39f85c4358b7346fee22169da9cad93901ea9eb9" + + # simple index from tree + base_index = IndexFile.from_tree(self.rorepo, common_ancestor_sha) + assert base_index.entries + self._cmp_tree_index(common_ancestor_sha, base_index) + + # merge two trees - its like a fast-forward + two_way_index = IndexFile.from_tree(self.rorepo, common_ancestor_sha, cur_sha) + assert two_way_index.entries + self._cmp_tree_index(cur_sha, two_way_index) + + # merge three trees - here we have a merge conflict + three_way_index = IndexFile.from_tree(self.rorepo, common_ancestor_sha, cur_sha, other_sha) + assert len(list(e for e in three_way_index.entries.values() if e.stage != 0)) + + + # ITERATE BLOBS + merge_required = lambda t: t[0] != 0 + merge_blobs = list(three_way_index.iter_blobs(merge_required)) + assert merge_blobs + assert merge_blobs[0][0] in (1,2,3) + assert isinstance(merge_blobs[0][1], Blob) + + + # writing a tree should fail with an unmerged index + self.failUnlessRaises(GitCommandError, three_way_index.write_tree) + + # removed unmerged entries + unmerged_blob_map = three_way_index.unmerged_blobs() + assert unmerged_blob_map + + # pick the first blob at the first stage we find and use it as resolved version + three_way_index.resolve_blobs( l[0][1] for l in unmerged_blob_map.itervalues() ) + tree = three_way_index.write_tree() + assert isinstance(tree, Tree) + num_blobs = 0 + for blob in tree.traverse(predicate=lambda item: item.type == "blob"): + assert (blob.path,0) in three_way_index.entries + num_blobs += 1 + # END for each blob + assert num_blobs == len(three_way_index.entries) + + @with_rw_repo('0.1.6') + def test_index_file_diffing(self, rw_repo): + # default Index instance points to our index + index = IndexFile(rw_repo) + assert index.path is not None + assert len(index.entries) + + # write the file back + index.write() + + # could sha it, or check stats + + # test diff + # resetting the head will leave the index in a different state, and the + # diff will yield a few changes + cur_head_commit = rw_repo.head.reference.commit + ref = rw_repo.head.reset('HEAD~6', index=True, working_tree=False) + + # diff against same index is 0 + diff = index.diff() + assert len(diff) == 0 + + # against HEAD as string, must be the same as it matches index + diff = index.diff('HEAD') + assert len(diff) == 0 + + # against previous head, there must be a difference + diff = index.diff(cur_head_commit) + assert len(diff) + + # we reverse the result + adiff = index.diff(str(cur_head_commit), R=True) + odiff = index.diff(cur_head_commit, R=False) # now its not reversed anymore + assert adiff != odiff + assert odiff == diff # both unreversed diffs against HEAD + + # against working copy - its still at cur_commit + wdiff = index.diff(None) + assert wdiff != adiff + assert wdiff != odiff + + # against something unusual + self.failUnlessRaises(ValueError, index.diff, int) + + # adjust the index to match an old revision + cur_branch = rw_repo.active_branch + cur_commit = cur_branch.commit + rev_head_parent = 'HEAD~1' + assert index.reset(rev_head_parent) is index + + assert cur_branch == rw_repo.active_branch + assert cur_commit == rw_repo.head.commit + + # there must be differences towards the working tree which is in the 'future' + assert index.diff(None) + + # reset the working copy as well to current head,to pull 'back' as well + new_data = "will be reverted" + file_path = os.path.join(rw_repo.git.git_dir, "CHANGES") + fp = open(file_path, "w") + fp.write(new_data) + fp.close() + index.reset(rev_head_parent, working_tree=True) + assert not index.diff(None) + assert cur_branch == rw_repo.active_branch + assert cur_commit == rw_repo.head.commit + fp = open(file_path) + try: + assert fp.read() != new_data + finally: + fp.close() + + # test full checkout + test_file = os.path.join(rw_repo.git.git_dir, "CHANGES") + os.remove(test_file) + index.checkout(None, force=True) + assert os.path.isfile(test_file) + + os.remove(test_file) + index.checkout(None, force=False) + assert os.path.isfile(test_file) + + # individual file + os.remove(test_file) + index.checkout(test_file) + assert os.path.exists(test_file) + + + + # currently it ignore non-existing paths + index.checkout(paths=["doesnt/exist"]) + + + def _count_existing(self, repo, files): + """ + Returns count of files that actually exist in the repository directory. + """ + existing = 0 + basedir = repo.git.git_dir + for f in files: + existing += os.path.isfile(os.path.join(basedir, f)) + # END for each deleted file + return existing + # END num existing helper + + @with_rw_repo('0.1.6') + def test_index_mutation(self, rw_repo): + index = rw_repo.index + num_entries = len(index.entries) + cur_head = rw_repo.head + + # remove all of the files, provide a wild mix of paths, BaseIndexEntries, + # IndexEntries + def mixed_iterator(): + count = 0 + for entry in index.entries.itervalues(): + type_id = count % 4 + if type_id == 0: # path + yield entry.path + elif type_id == 1: # blob + yield Blob(rw_repo, entry.sha, entry.mode, entry.path) + elif type_id == 2: # BaseIndexEntry + yield BaseIndexEntry(entry[:4]) + elif type_id == 3: # IndexEntry + yield entry + else: + raise AssertionError("Invalid Type") + count += 1 + # END for each entry + # END mixed iterator + deleted_files = index.remove(mixed_iterator(), working_tree=False) + assert deleted_files + assert self._count_existing(rw_repo, deleted_files) == len(deleted_files) + assert len(index.entries) == 0 + + # reset the index to undo our changes + index.reset() + assert len(index.entries) == num_entries + + # remove with working copy + deleted_files = index.remove(mixed_iterator(), working_tree=True) + assert deleted_files + assert self._count_existing(rw_repo, deleted_files) == 0 + + # reset everything + index.reset(working_tree=True) + assert self._count_existing(rw_repo, deleted_files) == len(deleted_files) + + # invalid type + self.failUnlessRaises(TypeError, index.remove, [1]) + + # absolute path + deleted_files = index.remove([os.path.join(rw_repo.git.git_dir,"lib")], r=True) + assert len(deleted_files) > 1 + self.failUnlessRaises(ValueError, index.remove, ["/doesnt/exists"]) + + # TEST COMMITTING + # commit changed index + cur_commit = cur_head.commit + commit_message = "commit default head" + + new_commit = index.commit(commit_message, head=False) + assert new_commit.message == commit_message + assert new_commit.parents[0] == cur_commit + assert len(new_commit.parents) == 1 + assert cur_head.commit == cur_commit + + # same index, no parents + commit_message = "index without parents" + commit_no_parents = index.commit(commit_message, parent_commits=list(), head=True) + assert commit_no_parents.message == commit_message + assert len(commit_no_parents.parents) == 0 + assert cur_head.commit == commit_no_parents + + # same index, multiple parents + commit_message = "Index with multiple parents\n commit with another line" + commit_multi_parent = index.commit(commit_message,parent_commits=(commit_no_parents, new_commit)) + assert commit_multi_parent.message == commit_message + assert len(commit_multi_parent.parents) == 2 + assert commit_multi_parent.parents[0] == commit_no_parents + assert commit_multi_parent.parents[1] == new_commit + assert cur_head.commit == commit_multi_parent + + # re-add all files in lib + # get the lib folder back on disk, but get an index without it + index.reset(new_commit.parents[0], working_tree=True).reset(new_commit, working_tree=False) + lib_file_path = "lib/git/__init__.py" + assert (lib_file_path, 0) not in index.entries + assert os.path.isfile(os.path.join(rw_repo.git.git_dir, lib_file_path)) + + # directory + entries = index.add(['lib']) + assert len(entries)>1 + + # glob + entries = index.reset(new_commit).add(['lib/*.py']) + assert len(entries) == 14 + + # missing path + self.failUnlessRaises(GitCommandError, index.reset(new_commit).add, ['doesnt/exist/must/raise']) + + # blob from older revision overrides current index revision + old_blob = new_commit.parents[0].tree.blobs[0] + entries = index.reset(new_commit).add([old_blob]) + assert index.entries[(old_blob.path,0)].sha == old_blob.sha and len(entries) == 1 + + # mode 0 not allowed + null_sha = "0"*40 + self.failUnlessRaises(ValueError, index.reset(new_commit).add, [BaseIndexEntry((0, null_sha,0,"doesntmatter"))]) + + # add new file + new_file_relapath = "my_new_file" + new_file_path = self._make_file(new_file_relapath, "hello world", rw_repo) + entries = index.reset(new_commit).add([BaseIndexEntry((010644, null_sha, 0, new_file_relapath))]) + assert len(entries) == 1 and entries[0].sha != null_sha + + # add symlink + if sys.platform != "win32": + link_file = os.path.join(rw_repo.git.git_dir, "my_real_symlink") + os.symlink("/etc/that", link_file) + entries = index.reset(new_commit).add([link_file]) + assert len(entries) == 1 and S_ISLNK(entries[0].mode) + print "%o" % entries[0].mode + # END real symlink test + + # add fake symlink and assure it checks-our as symlink + fake_symlink_relapath = "my_fake_symlink" + fake_symlink_path = self._make_file(fake_symlink_relapath, "/etc/that", rw_repo) + fake_entry = BaseIndexEntry((0120000, null_sha, 0, fake_symlink_relapath)) + entries = index.reset(new_commit).add([fake_entry]) + assert len(entries) == 1 and S_ISLNK(entries[0].mode) + + # checkout the fakelink, should be a link then + assert not S_ISLNK(os.stat(fake_symlink_path)[ST_MODE]) + os.remove(fake_symlink_path) + index.checkout(fake_symlink_path) + assert S_ISLNK(os.lstat(fake_symlink_path)[ST_MODE]) + diff --git a/test/git/test_performance.py b/test/git/test_performance.py index 77567515..83d4a91e 100644 --- a/test/git/test_performance.py +++ b/test/git/test_performance.py @@ -8,9 +8,7 @@ from test.testlib import * from git import * from time import time -class TestPerformance(object): - def setup(self): - self.repo = Repo(GIT_REPO) +class TestPerformance(TestBase): def test_iteration(self): num_objs = 0 @@ -21,7 +19,7 @@ class TestPerformance(object): # return quite a lot of commits, we just take one and hence abort the operation st = time() - for c in self.repo.iter_commits('0.1.6'): + for c in self.rorepo.iter_commits('0.1.6'): num_commits += 1 c.author c.authored_date diff --git a/test/git/test_refs.py b/test/git/test_refs.py new file mode 100644 index 00000000..32a6de1c --- /dev/null +++ b/test/git/test_refs.py @@ -0,0 +1,239 @@ +# test_refs.py +# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors +# +# This module is part of GitPython and is released under +# the BSD License: http://www.opensource.org/licenses/bsd-license.php + +from mock import * +from test.testlib import * +from git import * +import git.refs as refs +from git.objects.tag import TagObject +from itertools import chain + +class TestRefs(TestBase): + + def test_tag_base(self): + tag_object_refs = list() + for tag in self.rorepo.tags: + assert "refs/tags" in tag.path + assert tag.name + assert isinstance( tag.commit, Commit ) + if tag.tag is not None: + tag_object_refs.append( tag ) + tagobj = tag.tag + assert isinstance( tagobj, TagObject ) + assert tagobj.tag == tag.name + assert isinstance( tagobj.tagger, Actor ) + assert isinstance( tagobj.tagged_date, int ) + assert tagobj.message + # END if we have a tag object + # END for tag in repo-tags + assert tag_object_refs + assert isinstance(self.rorepo.tags['0.1.5'], TagReference) + + def test_tags(self): + # tag refs can point to tag objects or to commits + s = set() + ref_count = 0 + for ref in chain(self.rorepo.tags, self.rorepo.heads): + ref_count += 1 + assert isinstance(ref, refs.Reference) + assert str(ref) == ref.name + assert repr(ref) + assert ref == ref + assert not ref != ref + s.add(ref) + # END for each ref + assert len(s) == ref_count + assert len(s|s) == ref_count + + def test_heads(self): + for head in self.rorepo.heads: + assert head.name + assert head.path + assert "refs/heads" in head.path + prev_object = head.object + cur_object = head.object + assert prev_object == cur_object # represent the same git object + assert prev_object is not cur_object # but are different instances + # END for each head + + def test_refs(self): + types_found = set() + for ref in self.rorepo.refs: + types_found.add(type(ref)) + assert len(types_found) == 3 + + @with_rw_repo('0.1.6') + def test_head_reset(self, rw_repo): + cur_head = rw_repo.head + new_head_commit = cur_head.ref.commit.parents[0] + cur_head.reset(new_head_commit, index=True) # index only + assert cur_head.reference.commit == new_head_commit + + self.failUnlessRaises(ValueError, cur_head.reset, new_head_commit, index=False, working_tree=True) + new_head_commit = new_head_commit.parents[0] + cur_head.reset(new_head_commit, index=True, working_tree=True) # index + wt + assert cur_head.reference.commit == new_head_commit + + # paths + cur_head.reset(new_head_commit, paths = "lib") + + + # now that we have a write write repo, change the HEAD reference - its + # like git-reset --soft + heads = rw_repo.heads + assert heads + for head in heads: + cur_head.reference = head + assert cur_head.reference == head + assert cur_head.commit == head.commit + assert not cur_head.is_detached + # END for each head + + # detach + cur_head.reference = heads[0].commit + assert cur_head.commit == heads[0].commit + assert cur_head.is_detached + self.failUnlessRaises(TypeError, getattr, cur_head, "reference") + + some_tag = rw_repo.tags[0] + cur_head.reference = some_tag + assert cur_head.is_detached + assert cur_head.commit == some_tag.commit + + # type check + self.failUnlessRaises(ValueError, setattr, cur_head, "reference", "that") + + # head handling + commit = 'HEAD' + prev_head_commit = cur_head.commit + for count, new_name in enumerate(("my_new_head", "feature/feature1")): + actual_commit = commit+"^"*count + new_head = Head.create(rw_repo, new_name, actual_commit) + assert cur_head.commit == prev_head_commit + assert isinstance(new_head, Head) + # already exists + self.failUnlessRaises(GitCommandError, Head.create, rw_repo, new_name) + + # force it + new_head = Head.create(rw_repo, new_name, actual_commit, force=True) + old_path = new_head.path + old_name = new_head.name + + assert new_head.rename("hello").name == "hello" + assert new_head.rename("hello/world").name == "hello/world" + assert new_head.rename(old_name).name == old_name and new_head.path == old_path + + # rename with force + tmp_head = Head.create(rw_repo, "tmphead") + self.failUnlessRaises(GitCommandError, tmp_head.rename, new_head) + tmp_head.rename(new_head, force=True) + assert tmp_head == new_head and tmp_head.object == new_head.object + + Head.delete(rw_repo, tmp_head) + heads = rw_repo.heads + assert tmp_head not in heads and new_head not in heads + # force on deletion testing would be missing here, code looks okay though ;) + # END for each new head name + self.failUnlessRaises(TypeError, RemoteReference.create, rw_repo, "some_name") + + # tag ref + tag_name = "1.0.2" + light_tag = TagReference.create(rw_repo, tag_name) + self.failUnlessRaises(GitCommandError, TagReference.create, rw_repo, tag_name) + light_tag = TagReference.create(rw_repo, tag_name, "HEAD~1", force = True) + assert isinstance(light_tag, TagReference) + assert light_tag.name == tag_name + assert light_tag.commit == cur_head.commit.parents[0] + assert light_tag.tag is None + + # tag with tag object + other_tag_name = "releases/1.0.2RC" + msg = "my mighty tag\nsecond line" + obj_tag = TagReference.create(rw_repo, other_tag_name, message=msg) + assert isinstance(obj_tag, TagReference) + assert obj_tag.name == other_tag_name + assert obj_tag.commit == cur_head.commit + assert obj_tag.tag is not None + + TagReference.delete(rw_repo, light_tag, obj_tag) + tags = rw_repo.tags + assert light_tag not in tags and obj_tag not in tags + + # remote deletion + remote_refs_so_far = 0 + remotes = rw_repo.remotes + assert remotes + for remote in remotes: + refs = remote.refs + RemoteReference.delete(rw_repo, *refs) + remote_refs_so_far += len(refs) + # END for each ref to delete + assert remote_refs_so_far + + for remote in remotes: + # remotes without references throw + self.failUnlessRaises(AssertionError, getattr, remote, 'refs') + # END for each remote + + # change where the active head points to + if cur_head.is_detached: + cur_head.reference = rw_repo.heads[0] + + head = cur_head.reference + old_commit = head.commit + head.commit = old_commit.parents[0] + assert head.commit == old_commit.parents[0] + assert head.commit == cur_head.commit + head.commit = old_commit + + # setting a non-commit as commit fails, but succeeds as object + head_tree = head.commit.tree + self.failUnlessRaises(GitCommandError, setattr, head, 'commit', head_tree) + assert head.commit == old_commit # and the ref did not change + self.failUnlessRaises(GitCommandError, setattr, head, 'object', head_tree) + + # set the commit directly using the head. This would never detach the head + assert not cur_head.is_detached + head.object = old_commit + cur_head.reference = head.commit + assert cur_head.is_detached + parent_commit = head.commit.parents[0] + assert cur_head.is_detached + cur_head.commit = parent_commit + assert cur_head.is_detached and cur_head.commit == parent_commit + + cur_head.reference = head + assert not cur_head.is_detached + cur_head.commit = parent_commit + assert not cur_head.is_detached + assert head.commit == parent_commit + + # test checkout + active_branch = rw_repo.active_branch + for head in rw_repo.heads: + checked_out_head = head.checkout() + assert checked_out_head == head + # END for each head to checkout + + # checkout with branch creation + new_head = active_branch.checkout(b="new_head") + assert active_branch != rw_repo.active_branch + assert new_head == rw_repo.active_branch + + # checkout with force has we have a change + # clear file + open(new_head.commit.tree.blobs[-1].abspath,'w').close() + assert len(new_head.commit.diff(None)) == 1 + + # create a new branch that is likely to touch the file we changed + far_away_head = rw_repo.create_head("far_head",'HEAD~100') + self.failUnlessRaises(GitCommandError, far_away_head.checkout) + assert active_branch == active_branch.checkout(force=True) + + # test ref listing - assure we have packed refs + rw_repo.git.pack_refs(all=True) + assert rw_repo.heads + assert rw_repo.tags diff --git a/test/git/test_remote.py b/test/git/test_remote.py new file mode 100644 index 00000000..f049f6f7 --- /dev/null +++ b/test/git/test_remote.py @@ -0,0 +1,423 @@ +# test_remote.py +# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors +# +# This module is part of GitPython and is released under +# the BSD License: http://www.opensource.org/licenses/bsd-license.php + +from test.testlib import * +from git import * +import tempfile +import shutil +import os +import random + +# assure we have repeatable results +random.seed(0) + +class TestPushProgress(PushProgress): + __slots__ = ( "_seen_lines", "_stages_per_op" ) + def __init__(self): + super(TestPushProgress, self).__init__() + self._seen_lines = list() + self._stages_per_op = dict() + + def _parse_progress_line(self, line): + # we may remove the line later if it is dropped + # Keep it for debugging + self._seen_lines.append(line) + super(TestPushProgress, self)._parse_progress_line(line) + assert len(line) > 1, "line %r too short" % line + + def line_dropped(self, line): + try: + self._seen_lines.remove(line) + except ValueError: + pass + + def update(self, op_code, cur_count, max_count=None, message=''): + # check each stage only comes once + op_id = op_code & self.OP_MASK + assert op_id in (self.COUNTING, self.COMPRESSING, self.WRITING) + + self._stages_per_op.setdefault(op_id, 0) + self._stages_per_op[ op_id ] = self._stages_per_op[ op_id ] | (op_code & self.STAGE_MASK) + + if op_code & (self.WRITING|self.END) == (self.WRITING|self.END): + assert message + # END check we get message + + def make_assertion(self): + if not self._seen_lines: + return + + # sometimes objects are not compressed which is okay + assert len(self._seen_ops) in (2,3) + assert self._stages_per_op + + # must have seen all stages + for op, stages in self._stages_per_op.items(): + assert stages & self.STAGE_MASK == self.STAGE_MASK + # END for each op/stage + +class TestRemote(TestBase): + + def _print_fetchhead(self, repo): + fp = open(os.path.join(repo.path, "FETCH_HEAD")) + fp.close() + + + def _test_fetch_result(self, results, remote): + # self._print_fetchhead(remote.repo) + assert len(results) > 0 and isinstance(results[0], FetchInfo) + for info in results: + if isinstance(info.ref, Reference): + assert info.flags != 0 + # END reference type flags handling + assert isinstance(info.ref, (SymbolicReference, Reference)) + if info.flags & info.FORCED_UPDATE: + assert isinstance(info.commit_before_forced_update, Commit) + else: + assert info.commit_before_forced_update is None + # END forced update checking + # END for each info + + def _test_push_result(self, results, remote): + assert len(results) > 0 and isinstance(results[0], PushInfo) + for info in results: + assert info.flags + if info.old_commit is not None: + assert isinstance(info.old_commit, Commit) + if info.flags & info.ERROR: + has_one = False + for bitflag in (info.REJECTED, info.REMOTE_REJECTED, info.REMOTE_FAILURE): + has_one |= bool(info.flags & bitflag) + # END for each bitflag + assert has_one + else: + # there must be a remote commit + if info.flags & info.DELETED == 0: + assert isinstance(info.local_ref, Reference) + else: + assert info.local_ref is None + assert type(info.remote_ref) in (TagReference, RemoteReference) + # END error checking + # END for each info + + + def _test_fetch_info(self, repo): + self.failUnlessRaises(ValueError, FetchInfo._from_line, repo, "nonsense", '') + self.failUnlessRaises(ValueError, FetchInfo._from_line, repo, "? [up to date] 0.1.7RC -> origin/0.1.7RC", '') + + def _commit_random_file(self, repo): + #Create a file with a random name and random data and commit it to repo. + # Return the commited absolute file path + index = repo.index + new_file = self._make_file(os.path.basename(tempfile.mktemp()),str(random.random()), repo) + index.add([new_file]) + index.commit("Committing %s" % new_file) + return new_file + + def _test_fetch(self,remote, rw_repo, remote_repo): + # specialized fetch testing to de-clutter the main test + self._test_fetch_info(rw_repo) + + def fetch_and_test(remote, **kwargs): + res = remote.fetch(**kwargs) + self._test_fetch_result(res, remote) + return res + # END fetch and check + + def get_info(res, remote, name): + return res["%s/%s"%(remote,name)] + + # put remote head to master as it is garantueed to exist + remote_repo.head.reference = remote_repo.heads.master + + res = fetch_and_test(remote) + # all uptodate + for info in res: + assert info.flags & info.HEAD_UPTODATE + + # rewind remote head to trigger rejection + # index must be false as remote is a bare repo + rhead = remote_repo.head + remote_commit = rhead.commit + rhead.reset("HEAD~2", index=False) + res = fetch_and_test(remote) + mkey = "%s/%s"%(remote,'master') + master_info = res[mkey] + assert master_info.flags & FetchInfo.FORCED_UPDATE and master_info.note is not None + + # normal fast forward - set head back to previous one + rhead.commit = remote_commit + res = fetch_and_test(remote) + assert res[mkey].flags & FetchInfo.FAST_FORWARD + + # new remote branch + new_remote_branch = Head.create(remote_repo, "new_branch") + res = fetch_and_test(remote) + new_branch_info = get_info(res, remote, new_remote_branch) + assert new_branch_info.flags & FetchInfo.NEW_HEAD + + # remote branch rename ( causes creation of a new one locally ) + new_remote_branch.rename("other_branch_name") + res = fetch_and_test(remote) + other_branch_info = get_info(res, remote, new_remote_branch) + assert other_branch_info.ref.commit == new_branch_info.ref.commit + + # remove new branch + Head.delete(new_remote_branch.repo, new_remote_branch) + res = fetch_and_test(remote) + # deleted remote will not be fetched + self.failUnlessRaises(IndexError, get_info, res, remote, new_remote_branch) + + # prune stale tracking branches + stale_refs = remote.stale_refs + assert len(stale_refs) == 2 and isinstance(stale_refs[0], RemoteReference) + RemoteReference.delete(rw_repo, *stale_refs) + + # test single branch fetch with refspec including target remote + res = fetch_and_test(remote, refspec="master:refs/remotes/%s/master"%remote) + assert len(res) == 1 and get_info(res, remote, 'master') + + # ... with respec and no target + res = fetch_and_test(remote, refspec='master') + assert len(res) == 1 + + # add new tag reference + rtag = TagReference.create(remote_repo, "1.0-RV_hello.there") + res = fetch_and_test(remote, tags=True) + tinfo = res[str(rtag)] + assert isinstance(tinfo.ref, TagReference) and tinfo.ref.commit == rtag.commit + assert tinfo.flags & tinfo.NEW_TAG + + # adjust tag commit + rtag.object = rhead.commit.parents[0].parents[0] + res = fetch_and_test(remote, tags=True) + tinfo = res[str(rtag)] + assert tinfo.commit == rtag.commit + assert tinfo.flags & tinfo.TAG_UPDATE + + # delete remote tag - local one will stay + TagReference.delete(remote_repo, rtag) + res = fetch_and_test(remote, tags=True) + self.failUnlessRaises(IndexError, get_info, res, remote, str(rtag)) + + # provoke to receive actual objects to see what kind of output we have to + # expect. For that we need a remote transport protocol + # Create a new UN-shared repo and fetch into it after we pushed a change + # to the shared repo + other_repo_dir = tempfile.mktemp("other_repo") + # must clone with a local path for the repo implementation not to freak out + # as it wants local paths only ( which I can understand ) + other_repo = remote_repo.clone(other_repo_dir, shared=False) + remote_repo_url = "git://localhost%s"%remote_repo.path + + # put origin to git-url + other_origin = other_repo.remotes.origin + other_origin.config_writer.set("url", remote_repo_url) + # it automatically creates alternates as remote_repo is shared as well. + # It will use the transport though and ignore alternates when fetching + # assert not other_repo.alternates # this would fail + + # assure we are in the right state + rw_repo.head.reset(remote.refs.master, working_tree=True) + try: + self._commit_random_file(rw_repo) + remote.push(rw_repo.head.reference) + + # here I would expect to see remote-information about packing + # objects and so on. Unfortunately, this does not happen + # if we are redirecting the output - git explicitly checks for this + # and only provides progress information to ttys + res = fetch_and_test(other_origin) + finally: + shutil.rmtree(other_repo_dir) + # END test and cleanup + + def _test_push_and_pull(self,remote, rw_repo, remote_repo): + # push our changes + lhead = rw_repo.head + lindex = rw_repo.index + # assure we are on master and it is checked out where the remote is + lhead.reference = rw_repo.heads.master + lhead.reset(remote.refs.master, working_tree=True) + + # push without spec should fail ( without further configuration ) + # well, works nicely + # self.failUnlessRaises(GitCommandError, remote.push) + + # simple file push + self._commit_random_file(rw_repo) + progress = TestPushProgress() + res = remote.push(lhead.reference, progress) + assert isinstance(res, IterableList) + self._test_push_result(res, remote) + progress.make_assertion() + + # rejected - undo last commit + lhead.reset("HEAD~1") + res = remote.push(lhead.reference) + assert res[0].flags & PushInfo.ERROR + assert res[0].flags & PushInfo.REJECTED + self._test_push_result(res, remote) + + # force rejected pull + res = remote.push('+%s' % lhead.reference) + assert res[0].flags & PushInfo.ERROR == 0 + assert res[0].flags & PushInfo.FORCED_UPDATE + self._test_push_result(res, remote) + + # invalid refspec + res = remote.push("hellothere") + assert len(res) == 0 + + # push new tags + progress = TestPushProgress() + to_be_updated = "my_tag.1.0RV" + new_tag = TagReference.create(rw_repo, to_be_updated) + other_tag = TagReference.create(rw_repo, "my_obj_tag.2.1aRV", message="my message") + res = remote.push(progress=progress, tags=True) + assert res[-1].flags & PushInfo.NEW_TAG + progress.make_assertion() + self._test_push_result(res, remote) + + # update push new tags + # Rejection is default + new_tag = TagReference.create(rw_repo, to_be_updated, ref='HEAD~1', force=True) + res = remote.push(tags=True) + self._test_push_result(res, remote) + assert res[-1].flags & PushInfo.REJECTED and res[-1].flags & PushInfo.ERROR + + # push force this tag + res = remote.push("+%s" % new_tag.path) + assert res[-1].flags & PushInfo.ERROR == 0 and res[-1].flags & PushInfo.FORCED_UPDATE + + # delete tag - have to do it using refspec + res = remote.push(":%s" % new_tag.path) + self._test_push_result(res, remote) + assert res[0].flags & PushInfo.DELETED + + # push new branch + new_head = Head.create(rw_repo, "my_new_branch") + progress = TestPushProgress() + res = remote.push(new_head, progress) + assert res[0].flags & PushInfo.NEW_HEAD + progress.make_assertion() + self._test_push_result(res, remote) + + # delete new branch on the remote end and locally + res = remote.push(":%s" % new_head.path) + self._test_push_result(res, remote) + Head.delete(rw_repo, new_head) + assert res[-1].flags & PushInfo.DELETED + + # --all + res = remote.push(all=True) + self._test_push_result(res, remote) + + # pull is essentially a fetch + merge, hence we just do a light + # test here, leave the reset to the actual merge testing + # fails as we did not specify a branch and there is no configuration for it + self.failUnlessRaises(GitCommandError, remote.pull) + remote.pull('master') + + # cleanup - delete created tags and branches as we are in an innerloop on + # the same repository + TagReference.delete(rw_repo, new_tag, other_tag) + remote.push(":%s" % other_tag.path) + + @with_rw_and_rw_remote_repo('0.1.6') + def test_base(self, rw_repo, remote_repo): + num_remotes = 0 + remote_set = set() + ran_fetch_test = False + + for remote in rw_repo.remotes: + num_remotes += 1 + assert remote == remote + assert str(remote) != repr(remote) + remote_set.add(remote) + remote_set.add(remote) # should already exist + + # REFS + refs = remote.refs + assert refs + for ref in refs: + assert ref.remote_name == remote.name + assert ref.remote_head + # END for each ref + + # OPTIONS + # cannot use 'fetch' key anymore as it is now a method + for opt in ("url", ): + val = getattr(remote, opt) + reader = remote.config_reader + assert reader.get(opt) == val + + # unable to write with a reader + self.failUnlessRaises(IOError, reader.set, opt, "test") + + # change value + writer = remote.config_writer + new_val = "myval" + writer.set(opt, new_val) + assert writer.get(opt) == new_val + writer.set(opt, val) + assert writer.get(opt) == val + del(writer) + assert getattr(remote, opt) == val + # END for each default option key + + # RENAME + other_name = "totally_other_name" + prev_name = remote.name + assert remote.rename(other_name) == remote + assert prev_name != remote.name + # multiple times + for time in range(2): + assert remote.rename(prev_name).name == prev_name + # END for each rename ( back to prev_name ) + + # PUSH/PULL TESTING + self._test_push_and_pull(remote, rw_repo, remote_repo) + + # FETCH TESTING + # Only for remotes - local cases are the same or less complicated + # as additional progress information will never be emitted + if remote.name == "daemon_origin": + self._test_fetch(remote, rw_repo, remote_repo) + ran_fetch_test = True + # END fetch test + + remote.update() + # END for each remote + + assert ran_fetch_test + assert num_remotes + assert num_remotes == len(remote_set) + + origin = rw_repo.remote('origin') + assert origin == rw_repo.remotes.origin + + @with_bare_rw_repo + def test_creation_and_removal(self, bare_rw_repo): + new_name = "test_new_one" + arg_list = (new_name, "git@server:hello.git") + remote = Remote.create(bare_rw_repo, *arg_list ) + assert remote.name == "test_new_one" + assert remote in bare_rw_repo.remotes + + # create same one again + self.failUnlessRaises(GitCommandError, Remote.create, bare_rw_repo, *arg_list) + + Remote.remove(bare_rw_repo, new_name) + + for remote in bare_rw_repo.remotes: + if remote.name == new_name: + raise AssertionError("Remote removal failed") + # END if deleted remote matches existing remote's name + # END for each remote + + + diff --git a/test/git/test_repo.py b/test/git/test_repo.py index 96d08b91..0b196a1f 100644 --- a/test/git/test_repo.py +++ b/test/git/test_repo.py @@ -8,9 +8,7 @@ import os, sys from test.testlib import * from git import * -class TestRepo(object): - def setup(self): - self.repo = Repo(GIT_REPO) +class TestRepo(TestBase): @raises(InvalidGitRepositoryError) def test_new_should_raise_on_invalid_repo_location(self): @@ -25,29 +23,40 @@ class TestRepo(object): def test_description(self): txt = "Test repository" - self.repo.description = txt - assert_equal(self.repo.description, txt) + self.rorepo.description = txt + assert_equal(self.rorepo.description, txt) def test_heads_should_return_array_of_head_objects(self): - for head in self.repo.heads: + for head in self.rorepo.heads: assert_equal(Head, head.__class__) def test_heads_should_populate_head_data(self): - for head in self.repo.heads: + for head in self.rorepo.heads: assert head.name assert isinstance(head.commit,Commit) # END for each head + + assert isinstance(self.rorepo.heads.master, Head) + assert isinstance(self.rorepo.heads['master'], Head) + + def test_tree_from_revision(self): + tree = self.rorepo.tree('0.1.6') + assert tree.type == "tree" + assert self.rorepo.tree(tree) == tree + + # try from invalid revision that does not exist + self.failUnlessRaises(ValueError, self.rorepo.tree, 'hello world') @patch_object(Git, '_call_process') def test_commits(self, git): git.return_value = ListProcessAdapter(fixture('rev_list')) - commits = list( self.repo.iter_commits('master', max_count=10) ) - + commits = list( self.rorepo.iter_commits('master', max_count=10) ) + c = commits[0] - assert_equal('4c8124ffcf4039d292442eeccabdeca5af5c5017', c.id) - assert_equal(["634396b2f541a9f2d58b00be1a07f0c358b999b3"], [p.id for p in c.parents]) - assert_equal("672eca9b7f9e09c22dcb128c283e8c3c8d7697a4", c.tree.id) + assert_equal('4c8124ffcf4039d292442eeccabdeca5af5c5017', c.sha) + assert_equal(["634396b2f541a9f2d58b00be1a07f0c358b999b3"], [p.sha for p in c.parents]) + assert_equal("672eca9b7f9e09c22dcb128c283e8c3c8d7697a4", c.tree.sha) assert_equal("Tom Preston-Werner", c.author.name) assert_equal("tom@mojombo.com", c.author.email) assert_equal(1191999972, c.authored_date) @@ -60,11 +69,21 @@ class TestRepo(object): assert_equal(tuple(), c.parents) c = commits[2] - assert_equal(["6e64c55896aabb9a7d8e9f8f296f426d21a78c2c", "7f874954efb9ba35210445be456c74e037ba6af2"], map(lambda p: p.id, c.parents)) + assert_equal(["6e64c55896aabb9a7d8e9f8f296f426d21a78c2c", "7f874954efb9ba35210445be456c74e037ba6af2"], map(lambda p: p.sha, c.parents)) assert_equal("Merge branch 'site'", c.summary) assert_true(git.called) + def test_trees(self): + mc = 30 + num_trees = 0 + for tree in self.rorepo.iter_trees('0.1.5', max_count=mc): + num_trees += 1 + assert isinstance(tree, Tree) + # END for each tree + assert num_trees == mc + + @patch_object(Repo, '__init__') @patch_object(Git, '_call_process') def test_init(self, git, repo): @@ -76,6 +95,9 @@ class TestRepo(object): assert_true(git.called) assert_true(repo.called) + + def test_bare_property(self): + self.rorepo.bare @patch_object(Repo, '__init__') @patch_object(Git, '_call_process') @@ -95,7 +117,7 @@ class TestRepo(object): git.return_value = None repo.return_value = None - self.repo.clone("repos/foo/bar.git") + self.rorepo.clone("repos/foo/bar.git") assert_true(git.called) path = os.path.join(absolute_project_path(), '.git') @@ -108,7 +130,7 @@ class TestRepo(object): git.return_value = None repo.return_value = None - self.repo.clone("repos/foo/bar.git", **{'template': '/awesome'}) + self.rorepo.clone("repos/foo/bar.git", **{'template': '/awesome'}) assert_true(git.called) path = os.path.join(absolute_project_path(), '.git') @@ -116,93 +138,63 @@ class TestRepo(object): { 'template': '/awesome'})) assert_true(repo.called) - @patch_object(Git, '_call_process') - def test_diff(self, git): - self.repo.diff('master^', 'master') - - assert_true(git.called) - assert_equal(git.call_args, (('diff', 'master^', 'master', '--'), {})) - - self.repo.diff('master^', 'master', 'foo/bar') - - assert_true(git.called) - assert_equal(git.call_args, (('diff', 'master^', 'master', '--', 'foo/bar'), {})) - - self.repo.diff('master^', 'master', 'foo/bar', 'foo/baz') - - assert_true(git.called) - assert_equal(git.call_args, (('diff', 'master^', 'master', '--', 'foo/bar', 'foo/baz'), {})) - - @patch_object(Git, '_call_process') - def test_diff_with_parents(self, git): - git.return_value = fixture('diff_p') - - diffs = self.repo.commit_diff('master') - assert_equal(15, len(diffs)) - assert_true(git.called) - def test_archive(self): - args = ( tuple(), (self.repo.heads[-1],),(None,"hello") ) - for arg_list in args: - ftmp = os.tmpfile() - self.repo.archive(ftmp, *arg_list) - ftmp.seek(0,2) - assert ftmp.tell() - # END for each arg-list - - @patch('git.utils.touch') - def test_enable_daemon_serve(self, touch): - self.repo.daemon_serve = False - assert_false(self.repo.daemon_serve) - - def test_disable_daemon_serve(self): - self.repo.daemon_serve = True - assert_true(self.repo.daemon_serve) + def test_daemon_export(self): + orig_val = self.rorepo.daemon_export + self.rorepo.daemon_export = not orig_val + assert self.rorepo.daemon_export == ( not orig_val ) + self.rorepo.daemon_export = orig_val + assert self.rorepo.daemon_export == orig_val - @patch_object(os.path, 'exists') - def test_alternates_no_file(self, os): - os.return_value = False - assert_equal([], self.repo.alternates) - - assert_true(os.called) - - @patch_object(os, 'remove') - def test_alternates_setter_empty(self, os): - self.repo.alternates = [] - assert_true(os.called) + def test_alternates(self): + cur_alternates = self.rorepo.alternates + # empty alternates + self.rorepo.alternates = [] + assert self.rorepo.alternates == [] + alts = [ "other/location", "this/location" ] + self.rorepo.alternates = alts + assert alts == self.rorepo.alternates + self.rorepo.alternates = cur_alternates def test_repr(self): path = os.path.join(os.path.abspath(GIT_REPO), '.git') - assert_equal('<git.Repo "%s">' % path, repr(self.repo)) + assert_equal('<git.Repo "%s">' % path, repr(self.rorepo)) def test_is_dirty_with_bare_repository(self): - self.repo.bare = True - assert_false(self.repo.is_dirty) - - @patch_object(Git, '_call_process') - def test_is_dirty_with_clean_working_dir(self, git): - self.repo.bare = False - git.return_value = '' - assert_false(self.repo.is_dirty) - assert_equal(git.call_args, (('diff', 'HEAD', '--'), {})) - - @patch_object(Git, '_call_process') - def test_is_dirty_with_dirty_working_dir(self, git): - self.repo.bare = False - git.return_value = '''-aaa\n+bbb''' - assert_true(self.repo.is_dirty) - assert_equal(git.call_args, (('diff', 'HEAD', '--'), {})) - - @patch_object(Git, '_call_process') - def test_active_branch(self, git): - git.return_value = 'refs/heads/major-refactoring' - assert_equal(self.repo.active_branch.name, 'major-refactoring') - assert_equal(git.call_args, (('symbolic_ref', 'HEAD'), {})) + self.rorepo._bare = True + assert_false(self.rorepo.is_dirty()) + + def test_is_dirty(self): + self.rorepo._bare = False + for index in (0,1): + for working_tree in (0,1): + for untracked_files in (0,1): + assert self.rorepo.is_dirty(index, working_tree, untracked_files) in (True, False) + # END untracked files + # END working tree + # END index + self.rorepo._bare = True + assert self.rorepo.is_dirty() == False + + def test_head(self): + assert self.rorepo.head.reference.object == self.rorepo.active_branch.object + + def test_index(self): + index = self.rorepo.index + assert isinstance(index, IndexFile) + + def test_tag(self): + assert self.rorepo.tag('refs/tags/0.1.5').commit + + def test_archive(self): + tmpfile = os.tmpfile() + self.rorepo.archive(tmpfile, '0.1.5') + assert tmpfile.tell() @patch_object(Git, '_call_process') def test_should_display_blame_information(self, git): git.return_value = fixture('blame') - b = self.repo.blame( 'master', 'lib/git.py') + b = self.rorepo.blame( 'master', 'lib/git.py') assert_equal(13, len(b)) assert_equal( 2, len(b[0]) ) # assert_equal(25, reduce(lambda acc, x: acc + len(x[-1]), b)) @@ -211,7 +203,7 @@ class TestRepo(object): assert_true(git.called) assert_equal(git.call_args, (('blame', 'master', '--', 'lib/git.py'), {'p': True})) - assert_equal('634396b2f541a9f2d58b00be1a07f0c358b999b3', c.id) + assert_equal('634396b2f541a9f2d58b00be1a07f0c358b999b3', c.sha) assert_equal('Tom Preston-Werner', c.author.name) assert_equal('tom@mojombo.com', c.author.email) assert_equal(1191997100, c.authored_date) @@ -225,3 +217,57 @@ class TestRepo(object): assert_true( tlist ) assert_true( isinstance( tlist[0], basestring ) ) assert_true( len( tlist ) < sum( len(t) for t in tlist ) ) # test for single-char bug + + def test_untracked_files(self): + base = self.rorepo.git.git_dir + files = (base+"/__test_myfile", base+"/__test_other_file") + num_recently_untracked = 0 + try: + for fpath in files: + fd = open(fpath,"wb") + fd.close() + # END for each filename + untracked_files = self.rorepo.untracked_files + num_recently_untracked = len(untracked_files) + + # assure we have all names - they are relative to the git-dir + num_test_untracked = 0 + for utfile in untracked_files: + num_test_untracked += os.path.join(base, utfile) in files + assert len(files) == num_test_untracked + finally: + for fpath in files: + if os.path.isfile(fpath): + os.remove(fpath) + # END handle files + + assert len(self.rorepo.untracked_files) == (num_recently_untracked - len(files)) + + def test_config_reader(self): + reader = self.rorepo.config_reader() # all config files + assert reader.read_only + reader = self.rorepo.config_reader("repository") # single config file + assert reader.read_only + + def test_config_writer(self): + for config_level in self.rorepo.config_level: + try: + writer = self.rorepo.config_writer(config_level) + assert not writer.read_only + except IOError: + # its okay not to get a writer for some configuration files if we + # have no permissions + pass + # END for each config level + + def test_creation_deletion(self): + # just a very quick test to assure it generally works. There are + # specialized cases in the test_refs module + head = self.rorepo.create_head("new_head", "HEAD~1") + self.rorepo.delete_head(head) + + tag = self.rorepo.create_tag("new_tag", "HEAD~2") + self.rorepo.delete_tag(tag) + + remote = self.rorepo.create_remote("new_remote", "git@server:repo.git") + self.rorepo.delete_remote(remote) diff --git a/test/git/test_stats.py b/test/git/test_stats.py index 706f29a4..7392a96e 100644 --- a/test/git/test_stats.py +++ b/test/git/test_stats.py @@ -7,13 +7,11 @@ from test.testlib import * from git import * -class TestStats(object): - def setup(self): - self.repo = Repo(GIT_REPO) +class TestStats(TestBase): def test__list_from_string(self): output = fixture('diff_numstat') - stats = Stats._list_from_string(self.repo, output) + stats = Stats._list_from_string(self.rorepo, output) assert_equal(2, stats.total['files']) assert_equal(52, stats.total['lines']) diff --git a/test/git/test_tag.py b/test/git/test_tag.py deleted file mode 100644 index 8f12bf11..00000000 --- a/test/git/test_tag.py +++ /dev/null @@ -1,34 +0,0 @@ -# test_tag.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php - -from mock import * -from test.testlib import * -from git import * -from git.objects.tag import TagObject - -class TestTag(object): - def setup(self): - self.repo = Repo(GIT_REPO) - - def test_tag_base(self): - tag_object_refs = list() - for tag in self.repo.tags: - assert "refs/tags" in tag.path - assert tag.name - assert isinstance( tag.commit, Commit ) - if tag.tag is not None: - tag_object_refs.append( tag ) - tagobj = tag.tag - assert isinstance( tagobj, TagObject ) - assert tagobj.tag == tag.name - assert isinstance( tagobj.tagger, Actor ) - assert isinstance( tagobj.tagged_date, int ) - assert tagobj.message - # END if we have a tag object - # END for tag in repo-tags - assert tag_object_refs - - diff --git a/test/git/test_tree.py b/test/git/test_tree.py index dafb6f3f..e0c1f134 100644 --- a/test/git/test_tree.py +++ b/test/git/test_tree.py @@ -4,6 +4,7 @@ # This module is part of GitPython and is released under # the BSD License: http://www.opensource.org/licenses/bsd-license.php +import os from test.testlib import * from git import * @@ -15,7 +16,7 @@ class TestTree(TestCase): def test_traverse(self): - root = self.repo.tree() + root = self.repo.tree('0.1.6') num_recursive = 0 all_items = list() for obj in root.traverse(): @@ -35,10 +36,28 @@ class TestTree(TestCase): trees = list(root.traverse(predicate = trees_only)) assert len(trees) == len(list( i for i in root.traverse() if trees_only(i) )) + # test prune + lib_folder = lambda t: t.path == "lib" + pruned_trees = list(root.traverse(predicate = trees_only,prune = lib_folder)) + assert len(pruned_trees) < len(trees) + # trees and blobs assert len(set(trees)|set(root.trees)) == len(trees) assert len(set(b for b in root if isinstance(b, Blob)) | set(root.blobs)) == len( root.blobs ) + subitem = trees[0][0] + assert "/" in subitem.path + assert subitem.name == os.path.basename(subitem.path) + + # assure that at some point the traversed paths have a slash in them + found_slash = False + for item in root.traverse(): + assert os.path.isabs(item.abspath) + if '/' in item.path: + found_slash = True + break + # END for each item + assert found_slash def test_repr(self): - tree = Tree(self.repo, id='abc') + tree = Tree(self.repo, 'abc') assert_equal('<git.Tree "abc">', repr(tree)) diff --git a/test/git/test_utils.py b/test/git/test_utils.py index 2983a14a..029d2054 100644 --- a/test/git/test_utils.py +++ b/test/git/test_utils.py @@ -5,10 +5,15 @@ # the BSD License: http://www.opensource.org/licenses/bsd-license.php import os +import tempfile + from test.testlib import * +from git.utils import * from git import * +from git.cmd import dashify + -class TestUtils(object): +class TestUtils(TestCase): def setup(self): self.testdict = { "string": "42", @@ -19,3 +24,83 @@ class TestUtils(object): def test_it_should_dashify(self): assert_equal('this-is-my-argument', dashify('this_is_my_argument')) assert_equal('foo', dashify('foo')) + + + def test_lock_file(self): + my_file = tempfile.mktemp() + lock_file = LockFile(my_file) + assert not lock_file._has_lock() + # release lock we don't have - fine + lock_file._release_lock() + + # get lock + lock_file._obtain_lock_or_raise() + assert lock_file._has_lock() + + # concurrent access + other_lock_file = LockFile(my_file) + assert not other_lock_file._has_lock() + self.failUnlessRaises(IOError, other_lock_file._obtain_lock_or_raise) + + lock_file._release_lock() + assert not lock_file._has_lock() + + other_lock_file._obtain_lock_or_raise() + self.failUnlessRaises(IOError, lock_file._obtain_lock_or_raise) + + # auto-release on destruction + del(other_lock_file) + lock_file._obtain_lock_or_raise() + lock_file._release_lock() + + def _cmp_contents(self, file_path, data): + # raise if data from file at file_path + # does not match data string + fp = open(file_path, "r") + try: + assert fp.read() == data + finally: + fp.close() + + def test_safe_operation(self): + my_file = tempfile.mktemp() + orig_data = "hello" + new_data = "world" + my_file_fp = open(my_file, "w") + my_file_fp.write(orig_data) + my_file_fp.close() + + try: + cwrite = ConcurrentWriteOperation(my_file) + + # didn't start writing, doesnt matter + cwrite._end_writing(False) + cwrite._end_writing(True) + assert not cwrite._is_writing() + + # write data and fail + stream = cwrite._begin_writing() + assert cwrite._is_writing() + stream.write(new_data) + cwrite._end_writing(successful=False) + self._cmp_contents(my_file, orig_data) + assert not os.path.exists(stream.name) + + # write data - concurrently + ocwrite = ConcurrentWriteOperation(my_file) + stream = cwrite._begin_writing() + self.failUnlessRaises(IOError, ocwrite._begin_writing) + + stream.write("world") + cwrite._end_writing(successful=True) + self._cmp_contents(my_file, new_data) + assert not os.path.exists(stream.name) + + # could test automatic _end_writing on destruction + finally: + os.remove(my_file) + # END final cleanup + + + + |