summaryrefslogtreecommitdiff
path: root/git/test/db
diff options
context:
space:
mode:
Diffstat (limited to 'git/test/db')
-rw-r--r--git/test/db/base.py1192
-rw-r--r--git/test/db/cmd/test_base.py148
-rw-r--r--git/test/db/dulwich/lib.py12
-rw-r--r--git/test/db/dulwich/test_base.py32
-rw-r--r--git/test/db/lib.py456
-rw-r--r--git/test/db/py/test_base.py12
-rw-r--r--git/test/db/py/test_git.py74
-rw-r--r--git/test/db/py/test_loose.py54
-rw-r--r--git/test/db/py/test_mem.py42
-rw-r--r--git/test/db/py/test_pack.py122
-rw-r--r--git/test/db/py/test_ref.py102
-rw-r--r--git/test/db/pygit2/lib.py12
-rw-r--r--git/test/db/pygit2/test_base.py32
-rw-r--r--git/test/db/test_base.py20
14 files changed, 1155 insertions, 1155 deletions
diff --git a/git/test/db/base.py b/git/test/db/base.py
index 0f5eebe4..202a4353 100644
--- a/git/test/db/base.py
+++ b/git/test/db/base.py
@@ -23,619 +23,619 @@ from git.db.compat import RepoCompatibilityInterface
class RepoGlobalsItemDeletorMetaCls(GlobalsItemDeletorMetaCls):
- ModuleToDelete = 'RepoBase'
-
+ ModuleToDelete = 'RepoBase'
+
class RepoBase(TestDBBase):
- """Basic test for everything a fully implemented repository should support"""
- __metaclass__ = RepoGlobalsItemDeletorMetaCls
-
- def test_new_should_raise_on_invalid_repo_location(self):
- self.failUnlessRaises(InvalidGitRepositoryError, self.RepoCls, tempfile.gettempdir())
+ """Basic test for everything a fully implemented repository should support"""
+ __metaclass__ = RepoGlobalsItemDeletorMetaCls
+
+ def test_new_should_raise_on_invalid_repo_location(self):
+ self.failUnlessRaises(InvalidGitRepositoryError, self.RepoCls, tempfile.gettempdir())
- def test_new_should_raise_on_non_existant_path(self):
- self.failUnlessRaises(NoSuchPathError, self.RepoCls, "repos/foobar")
+ def test_new_should_raise_on_non_existant_path(self):
+ self.failUnlessRaises(NoSuchPathError, self.RepoCls, "repos/foobar")
- def test_repo_creation_from_different_paths(self):
- r_from_gitdir = self.RepoCls(self.rorepo.git_dir)
- assert r_from_gitdir.git_dir == self.rorepo.git_dir
- assert r_from_gitdir.git_dir.endswith('.git')
- assert not self.rorepo.git.working_dir.endswith('.git')
- assert r_from_gitdir.git.working_dir == self.rorepo.git.working_dir
+ def test_repo_creation_from_different_paths(self):
+ r_from_gitdir = self.RepoCls(self.rorepo.git_dir)
+ assert r_from_gitdir.git_dir == self.rorepo.git_dir
+ assert r_from_gitdir.git_dir.endswith('.git')
+ assert not self.rorepo.git.working_dir.endswith('.git')
+ assert r_from_gitdir.git.working_dir == self.rorepo.git.working_dir
- def test_description(self):
- txt = "Test repository"
- self.rorepo.description = txt
- assert_equal(self.rorepo.description, txt)
+ def test_description(self):
+ txt = "Test repository"
+ self.rorepo.description = txt
+ assert_equal(self.rorepo.description, txt)
- def test_heads_should_return_array_of_head_objects(self):
- for head in self.rorepo.heads:
- assert_equal(Head, head.__class__)
+ def test_heads_should_return_array_of_head_objects(self):
+ for head in self.rorepo.heads:
+ assert_equal(Head, head.__class__)
- def test_heads_should_populate_head_data(self):
- for head in self.rorepo.heads:
- assert head.name
- assert isinstance(head.commit,Commit)
- # END for each head
-
- assert isinstance(self.rorepo.heads.master, Head)
- assert isinstance(self.rorepo.heads['master'], Head)
-
- def test_tree_from_revision(self):
- tree = self.rorepo.tree('0.1.6')
- assert len(tree.hexsha) == 40
- assert tree.type == "tree"
- assert self.rorepo.tree(tree) == tree
-
- # try from invalid revision that does not exist
- self.failUnlessRaises(BadObject, self.rorepo.tree, 'hello world')
-
- def test_commit_from_revision(self):
- commit = self.rorepo.commit('0.1.4')
- assert commit.type == 'commit'
- assert self.rorepo.commit(commit) == commit
+ def test_heads_should_populate_head_data(self):
+ for head in self.rorepo.heads:
+ assert head.name
+ assert isinstance(head.commit,Commit)
+ # END for each head
+
+ assert isinstance(self.rorepo.heads.master, Head)
+ assert isinstance(self.rorepo.heads['master'], Head)
+
+ def test_tree_from_revision(self):
+ tree = self.rorepo.tree('0.1.6')
+ assert len(tree.hexsha) == 40
+ assert tree.type == "tree"
+ assert self.rorepo.tree(tree) == tree
+
+ # try from invalid revision that does not exist
+ self.failUnlessRaises(BadObject, self.rorepo.tree, 'hello world')
+
+ def test_commit_from_revision(self):
+ commit = self.rorepo.commit('0.1.4')
+ assert commit.type == 'commit'
+ assert self.rorepo.commit(commit) == commit
- def test_commits(self):
- mc = 10
- commits = list(self.rorepo.iter_commits('0.1.6', max_count=mc))
- assert len(commits) == mc
-
- c = commits[0]
- assert_equal('9a4b1d4d11eee3c5362a4152216376e634bd14cf', c.hexsha)
- assert_equal(["c76852d0bff115720af3f27acdb084c59361e5f6"], [p.hexsha for p in c.parents])
- assert_equal("ce41fc29549042f1aa09cc03174896cf23f112e3", c.tree.hexsha)
- assert_equal("Michael Trier", c.author.name)
- assert_equal("mtrier@gmail.com", c.author.email)
- assert_equal(1232829715, c.authored_date)
- assert_equal(5*3600, c.author_tz_offset)
- assert_equal("Michael Trier", c.committer.name)
- assert_equal("mtrier@gmail.com", c.committer.email)
- assert_equal(1232829715, c.committed_date)
- assert_equal(5*3600, c.committer_tz_offset)
- assert_equal("Bumped version 0.1.6\n", c.message)
+ def test_commits(self):
+ mc = 10
+ commits = list(self.rorepo.iter_commits('0.1.6', max_count=mc))
+ assert len(commits) == mc
+
+ c = commits[0]
+ assert_equal('9a4b1d4d11eee3c5362a4152216376e634bd14cf', c.hexsha)
+ assert_equal(["c76852d0bff115720af3f27acdb084c59361e5f6"], [p.hexsha for p in c.parents])
+ assert_equal("ce41fc29549042f1aa09cc03174896cf23f112e3", c.tree.hexsha)
+ assert_equal("Michael Trier", c.author.name)
+ assert_equal("mtrier@gmail.com", c.author.email)
+ assert_equal(1232829715, c.authored_date)
+ assert_equal(5*3600, c.author_tz_offset)
+ assert_equal("Michael Trier", c.committer.name)
+ assert_equal("mtrier@gmail.com", c.committer.email)
+ assert_equal(1232829715, c.committed_date)
+ assert_equal(5*3600, c.committer_tz_offset)
+ assert_equal("Bumped version 0.1.6\n", c.message)
- c = commits[1]
- assert isinstance(c.parents, tuple)
+ c = commits[1]
+ assert isinstance(c.parents, tuple)
- def test_trees(self):
- mc = 30
- num_trees = 0
- for tree in self.rorepo.iter_trees('0.1.5', max_count=mc):
- num_trees += 1
- assert isinstance(tree, Tree)
- # END for each tree
- assert num_trees == mc
+ def test_trees(self):
+ mc = 30
+ num_trees = 0
+ for tree in self.rorepo.iter_trees('0.1.5', max_count=mc):
+ num_trees += 1
+ assert isinstance(tree, Tree)
+ # END for each tree
+ assert num_trees == mc
- def _assert_empty_repo(self, repo):
- # test all kinds of things with an empty, freshly initialized repo.
- # It should throw good errors
-
- # entries should be empty
- assert len(repo.index.entries) == 0
-
- # head is accessible
- assert repo.head
- assert repo.head.ref
- assert not repo.head.is_valid()
-
- # we can change the head to some other ref
- head_ref = Head.from_path(repo, Head.to_full_path('some_head'))
- assert not head_ref.is_valid()
- repo.head.ref = head_ref
-
- # is_dirty can handle all kwargs
- for args in ((1, 0, 0), (0, 1, 0), (0, 0, 1)):
- assert not repo.is_dirty(*args)
- # END for each arg
-
- # we can add a file to the index ( if we are not bare )
- if not repo.bare:
- pass
- # END test repos with working tree
-
+ def _assert_empty_repo(self, repo):
+ # test all kinds of things with an empty, freshly initialized repo.
+ # It should throw good errors
+
+ # entries should be empty
+ assert len(repo.index.entries) == 0
+
+ # head is accessible
+ assert repo.head
+ assert repo.head.ref
+ assert not repo.head.is_valid()
+
+ # we can change the head to some other ref
+ head_ref = Head.from_path(repo, Head.to_full_path('some_head'))
+ assert not head_ref.is_valid()
+ repo.head.ref = head_ref
+
+ # is_dirty can handle all kwargs
+ for args in ((1, 0, 0), (0, 1, 0), (0, 0, 1)):
+ assert not repo.is_dirty(*args)
+ # END for each arg
+
+ # we can add a file to the index ( if we are not bare )
+ if not repo.bare:
+ pass
+ # END test repos with working tree
+
- def test_init(self):
- prev_cwd = os.getcwd()
- os.chdir(tempfile.gettempdir())
- git_dir_rela = "repos/foo/bar.git"
- del_dir_abs = os.path.abspath("repos")
- git_dir_abs = os.path.abspath(git_dir_rela)
- try:
- # with specific path
- for path in (git_dir_rela, git_dir_abs):
- r = self.RepoCls.init(path=path, bare=True)
- assert isinstance(r, self.RepoCls)
- assert r.bare == True
- assert os.path.isdir(r.git_dir)
-
- self._assert_empty_repo(r)
-
- # test clone
- clone_path = path + "_clone"
- rc = r.clone(clone_path)
- self._assert_empty_repo(rc)
-
-
- try:
- shutil.rmtree(clone_path)
- except OSError:
- # when relative paths are used, the clone may actually be inside
- # of the parent directory
- pass
- # END exception handling
-
- # try again, this time with the absolute version
- rc = self.RepoCls.clone_from(r.git_dir, clone_path)
- self._assert_empty_repo(rc)
-
- shutil.rmtree(git_dir_abs)
- try:
- shutil.rmtree(clone_path)
- except OSError:
- # when relative paths are used, the clone may actually be inside
- # of the parent directory
- pass
- # END exception handling
-
- # END for each path
-
- os.makedirs(git_dir_rela)
- os.chdir(git_dir_rela)
- r = self.RepoCls.init(bare=False)
- r.bare == False
-
- self._assert_empty_repo(r)
- finally:
- try:
- shutil.rmtree(del_dir_abs)
- except OSError:
- pass
- os.chdir(prev_cwd)
- # END restore previous state
-
- def test_bare_property(self):
- if isinstance(self.rorepo, RepoCompatibilityInterface):
- self.rorepo.bare
- #END handle compatability
- self.rorepo.is_bare
+ def test_init(self):
+ prev_cwd = os.getcwd()
+ os.chdir(tempfile.gettempdir())
+ git_dir_rela = "repos/foo/bar.git"
+ del_dir_abs = os.path.abspath("repos")
+ git_dir_abs = os.path.abspath(git_dir_rela)
+ try:
+ # with specific path
+ for path in (git_dir_rela, git_dir_abs):
+ r = self.RepoCls.init(path=path, bare=True)
+ assert isinstance(r, self.RepoCls)
+ assert r.bare == True
+ assert os.path.isdir(r.git_dir)
+
+ self._assert_empty_repo(r)
+
+ # test clone
+ clone_path = path + "_clone"
+ rc = r.clone(clone_path)
+ self._assert_empty_repo(rc)
+
+
+ try:
+ shutil.rmtree(clone_path)
+ except OSError:
+ # when relative paths are used, the clone may actually be inside
+ # of the parent directory
+ pass
+ # END exception handling
+
+ # try again, this time with the absolute version
+ rc = self.RepoCls.clone_from(r.git_dir, clone_path)
+ self._assert_empty_repo(rc)
+
+ shutil.rmtree(git_dir_abs)
+ try:
+ shutil.rmtree(clone_path)
+ except OSError:
+ # when relative paths are used, the clone may actually be inside
+ # of the parent directory
+ pass
+ # END exception handling
+
+ # END for each path
+
+ os.makedirs(git_dir_rela)
+ os.chdir(git_dir_rela)
+ r = self.RepoCls.init(bare=False)
+ r.bare == False
+
+ self._assert_empty_repo(r)
+ finally:
+ try:
+ shutil.rmtree(del_dir_abs)
+ except OSError:
+ pass
+ os.chdir(prev_cwd)
+ # END restore previous state
+
+ def test_bare_property(self):
+ if isinstance(self.rorepo, RepoCompatibilityInterface):
+ self.rorepo.bare
+ #END handle compatability
+ self.rorepo.is_bare
- def test_daemon_export(self):
- orig_val = self.rorepo.daemon_export
- self.rorepo.daemon_export = not orig_val
- assert self.rorepo.daemon_export == ( not orig_val )
- self.rorepo.daemon_export = orig_val
- assert self.rorepo.daemon_export == orig_val
+ def test_daemon_export(self):
+ orig_val = self.rorepo.daemon_export
+ self.rorepo.daemon_export = not orig_val
+ assert self.rorepo.daemon_export == ( not orig_val )
+ self.rorepo.daemon_export = orig_val
+ assert self.rorepo.daemon_export == orig_val
- def test_alternates(self):
- cur_alternates = self.rorepo.alternates
- # empty alternates
- self.rorepo.alternates = []
- assert self.rorepo.alternates == []
- alts = [ "other/location", "this/location" ]
- self.rorepo.alternates = alts
- assert alts == self.rorepo.alternates
- self.rorepo.alternates = cur_alternates
+ def test_alternates(self):
+ cur_alternates = self.rorepo.alternates
+ # empty alternates
+ self.rorepo.alternates = []
+ assert self.rorepo.alternates == []
+ alts = [ "other/location", "this/location" ]
+ self.rorepo.alternates = alts
+ assert alts == self.rorepo.alternates
+ self.rorepo.alternates = cur_alternates
- def test_repr(self):
- assert_equal('<git.Repo "%s">' % rorepo_dir(), repr(self.rorepo))
+ def test_repr(self):
+ assert_equal('<git.Repo "%s">' % rorepo_dir(), repr(self.rorepo))
- def test_is_dirty_with_bare_repository(self):
- orig_value = self.rorepo._bare
- self.rorepo._bare = True
- assert_false(self.rorepo.is_dirty())
- self.rorepo._bare = orig_value
+ def test_is_dirty_with_bare_repository(self):
+ orig_value = self.rorepo._bare
+ self.rorepo._bare = True
+ assert_false(self.rorepo.is_dirty())
+ self.rorepo._bare = orig_value
- def test_is_dirty(self):
- self.rorepo._bare = False
- for index in (0,1):
- for working_tree in (0,1):
- for untracked_files in (0,1):
- assert self.rorepo.is_dirty(index, working_tree, untracked_files) in (True, False)
- # END untracked files
- # END working tree
- # END index
- orig_val = self.rorepo._bare
- self.rorepo._bare = True
- assert self.rorepo.is_dirty() == False
- self.rorepo._bare = orig_val
+ def test_is_dirty(self):
+ self.rorepo._bare = False
+ for index in (0,1):
+ for working_tree in (0,1):
+ for untracked_files in (0,1):
+ assert self.rorepo.is_dirty(index, working_tree, untracked_files) in (True, False)
+ # END untracked files
+ # END working tree
+ # END index
+ orig_val = self.rorepo._bare
+ self.rorepo._bare = True
+ assert self.rorepo.is_dirty() == False
+ self.rorepo._bare = orig_val
- def test_head(self):
- assert self.rorepo.head.reference.object == self.rorepo.active_branch.object
+ def test_head(self):
+ assert self.rorepo.head.reference.object == self.rorepo.active_branch.object
- def test_index(self):
- index = self.rorepo.index
- assert isinstance(index, IndexFile)
-
- def test_tag(self):
- assert self.rorepo.tag('0.1.5').commit
- assert self.rorepo.tag('refs/tags/0.1.5').commit
-
- def test_archive(self):
- tmpfile = os.tmpfile()
- self.rorepo.archive(tmpfile, '0.1.5')
- assert tmpfile.tell()
-
- @patch_object(Git, '_call_process')
- def test_should_display_blame_information(self, git):
- git.return_value = fixture('blame')
- b = self.rorepo.blame( 'master', 'lib/git.py')
- assert_equal(13, len(b))
- assert_equal( 2, len(b[0]) )
- # assert_equal(25, reduce(lambda acc, x: acc + len(x[-1]), b))
- assert_equal(hash(b[0][0]), hash(b[9][0]))
- c = b[0][0]
- assert_true(git.called)
- assert_equal(git.call_args, (('blame', 'master', '--', 'lib/git.py'), {'p': True}))
-
- assert_equal('634396b2f541a9f2d58b00be1a07f0c358b999b3', c.hexsha)
- assert_equal('Tom Preston-Werner', c.author.name)
- assert_equal('tom@mojombo.com', c.author.email)
- assert_equal(1191997100, c.authored_date)
- assert_equal('Tom Preston-Werner', c.committer.name)
- assert_equal('tom@mojombo.com', c.committer.email)
- assert_equal(1191997100, c.committed_date)
- assert_equal('initial grit setup', c.message)
-
- # test the 'lines per commit' entries
- tlist = b[0][1]
- assert_true( tlist )
- assert_true( isinstance( tlist[0], basestring ) )
- assert_true( len( tlist ) < sum( len(t) for t in tlist ) ) # test for single-char bug
-
- def test_blame_real(self):
- c = 0
- for item in self.rorepo.head.commit.tree.traverse(
- predicate=lambda i, d: i.type == 'blob' and i.path.endswith('.py')):
- c += 1
- b = self.rorepo.blame(self.rorepo.head, item.path)
- #END for each item to traverse
- assert c
-
- def test_untracked_files(self):
- base = self.rorepo.working_tree_dir
- files = ( join_path_native(base, "__test_myfile"),
- join_path_native(base, "__test_other_file") )
- num_recently_untracked = 0
- try:
- for fpath in files:
- fd = open(fpath,"wb")
- fd.close()
- # END for each filename
- untracked_files = self.rorepo.untracked_files
- num_recently_untracked = len(untracked_files)
-
- # assure we have all names - they are relative to the git-dir
- num_test_untracked = 0
- for utfile in untracked_files:
- num_test_untracked += join_path_native(base, utfile) in files
- assert len(files) == num_test_untracked
- finally:
- for fpath in files:
- if os.path.isfile(fpath):
- os.remove(fpath)
- # END handle files
-
- assert len(self.rorepo.untracked_files) == (num_recently_untracked - len(files))
-
- def test_config_reader(self):
- reader = self.rorepo.config_reader() # all config files
- assert reader.read_only
- reader = self.rorepo.config_reader("repository") # single config file
- assert reader.read_only
-
- def test_config_writer(self):
- for config_level in self.rorepo.config_level:
- try:
- writer = self.rorepo.config_writer(config_level)
- assert not writer.read_only
- except IOError:
- # its okay not to get a writer for some configuration files if we
- # have no permissions
- pass
- # END for each config level
-
- def test_creation_deletion(self):
- # just a very quick test to assure it generally works. There are
- # specialized cases in the test_refs module
- head = self.rorepo.create_head("new_head", "HEAD~1")
- self.rorepo.delete_head(head)
-
- tag = self.rorepo.create_tag("new_tag", "HEAD~2")
- self.rorepo.delete_tag(tag)
- self.rorepo.config_writer()
- remote = self.rorepo.create_remote("new_remote", "git@server:repo.git")
- self.rorepo.delete_remote(remote)
-
- def test_comparison_and_hash(self):
- # this is only a preliminary test, more testing done in test_index
- assert self.rorepo == self.rorepo and not (self.rorepo != self.rorepo)
- assert len(set((self.rorepo, self.rorepo))) == 1
-
- def test_git_cmd(self):
- # test CatFileContentStream, just to be very sure we have no fencepost errors
- # last \n is the terminating newline that it expects
- l1 = "0123456789\n"
- l2 = "abcdefghijklmnopqrstxy\n"
- l3 = "z\n"
- d = "%s%s%s\n" % (l1, l2, l3)
-
- l1p = l1[:5]
-
- # full size
- # size is without terminating newline
- def mkfull():
- return Git.CatFileContentStream(len(d)-1, StringIO(d))
-
- ts = 5
- def mktiny():
- return Git.CatFileContentStream(ts, StringIO(d))
-
- # readlines no limit
- s = mkfull()
- lines = s.readlines()
- assert len(lines) == 3 and lines[-1].endswith('\n')
- assert s._stream.tell() == len(d) # must have scrubbed to the end
-
- # realines line limit
- s = mkfull()
- lines = s.readlines(5)
- assert len(lines) == 1
-
- # readlines on tiny sections
- s = mktiny()
- lines = s.readlines()
- assert len(lines) == 1 and lines[0] == l1p
- assert s._stream.tell() == ts+1
-
- # readline no limit
- s = mkfull()
- assert s.readline() == l1
- assert s.readline() == l2
- assert s.readline() == l3
- assert s.readline() == ''
- assert s._stream.tell() == len(d)
-
- # readline limit
- s = mkfull()
- assert s.readline(5) == l1p
- assert s.readline() == l1[5:]
-
- # readline on tiny section
- s = mktiny()
- assert s.readline() == l1p
- assert s.readline() == ''
- assert s._stream.tell() == ts+1
-
- # read no limit
- s = mkfull()
- assert s.read() == d[:-1]
- assert s.read() == ''
- assert s._stream.tell() == len(d)
-
- # read limit
- s = mkfull()
- assert s.read(5) == l1p
- assert s.read(6) == l1[5:]
- assert s._stream.tell() == 5 + 6 # its not yet done
-
- # read tiny
- s = mktiny()
- assert s.read(2) == l1[:2]
- assert s._stream.tell() == 2
- assert s.read() == l1[2:ts]
- assert s._stream.tell() == ts+1
-
- def _assert_rev_parse_types(self, name, rev_obj):
- rev_parse = self.rorepo.rev_parse
-
- if rev_obj.type == 'tag':
- rev_obj = rev_obj.object
-
- # tree and blob type
- obj = rev_parse(name + '^{tree}')
- assert obj == rev_obj.tree
-
- obj = rev_parse(name + ':CHANGES')
- assert obj.type == 'blob' and obj.path == 'CHANGES'
- assert rev_obj.tree['CHANGES'] == obj
-
-
- def _assert_rev_parse(self, name):
- """tries multiple different rev-parse syntaxes with the given name
- :return: parsed object"""
- rev_parse = self.rorepo.rev_parse
- orig_obj = rev_parse(name)
- if orig_obj.type == 'tag':
- obj = orig_obj.object
- else:
- obj = orig_obj
- # END deref tags by default
-
- # try history
- rev = name + "~"
- obj2 = rev_parse(rev)
- assert obj2 == obj.parents[0]
- self._assert_rev_parse_types(rev, obj2)
-
- # history with number
- ni = 11
- history = [obj.parents[0]]
- for pn in range(ni):
- history.append(history[-1].parents[0])
- # END get given amount of commits
-
- for pn in range(11):
- rev = name + "~%i" % (pn+1)
- obj2 = rev_parse(rev)
- assert obj2 == history[pn]
- self._assert_rev_parse_types(rev, obj2)
- # END history check
-
- # parent ( default )
- rev = name + "^"
- obj2 = rev_parse(rev)
- assert obj2 == obj.parents[0]
- self._assert_rev_parse_types(rev, obj2)
-
- # parent with number
- for pn, parent in enumerate(obj.parents):
- rev = name + "^%i" % (pn+1)
- assert rev_parse(rev) == parent
- self._assert_rev_parse_types(rev, parent)
- # END for each parent
-
- return orig_obj
-
- @with_rw_repo('HEAD', bare=False)
- def test_rw_rev_parse(self, rwrepo):
- # verify it does not confuse branches with hexsha ids
- ahead = rwrepo.create_head('aaaaaaaa')
- assert(rwrepo.rev_parse(str(ahead)) == ahead.commit)
-
- def test_rev_parse(self):
- rev_parse = self.rorepo.rev_parse
-
- # try special case: This one failed at some point, make sure its fixed
- assert rev_parse("33ebe").hexsha == "33ebe7acec14b25c5f84f35a664803fcab2f7781"
-
- # start from reference
- num_resolved = 0
-
- for ref in Reference.iter_items(self.rorepo):
- path_tokens = ref.path.split("/")
- for pt in range(len(path_tokens)):
- path_section = '/'.join(path_tokens[-(pt+1):])
- try:
- obj = self._assert_rev_parse(path_section)
- assert obj.type == ref.object.type
- num_resolved += 1
- except BadObject:
- print "failed on %s" % path_section
- # is fine, in case we have something like 112, which belongs to remotes/rname/merge-requests/112
- pass
- # END exception handling
- # END for each token
- # END for each reference
- assert num_resolved
-
- # it works with tags !
- tag = self._assert_rev_parse('0.1.4')
- assert tag.type == 'tag'
-
- # try full sha directly ( including type conversion )
- assert tag.object == rev_parse(tag.object.hexsha)
- self._assert_rev_parse_types(tag.object.hexsha, tag.object)
-
-
- # multiple tree types result in the same tree: HEAD^{tree}^{tree}:CHANGES
- rev = '0.1.4^{tree}^{tree}'
- assert rev_parse(rev) == tag.object.tree
- assert rev_parse(rev+':CHANGES') == tag.object.tree['CHANGES']
-
-
- # try to get parents from first revision - it should fail as no such revision
- # exists
- first_rev = "33ebe7acec14b25c5f84f35a664803fcab2f7781"
- commit = rev_parse(first_rev)
- assert len(commit.parents) == 0
- assert commit.hexsha == first_rev
- self.failUnlessRaises(BadObject, rev_parse, first_rev+"~")
- self.failUnlessRaises(BadObject, rev_parse, first_rev+"^")
-
- # short SHA1
- commit2 = rev_parse(first_rev[:20])
- assert commit2 == commit
- commit2 = rev_parse(first_rev[:5])
- assert commit2 == commit
-
-
- # todo: dereference tag into a blob 0.1.7^{blob} - quite a special one
- # needs a tag which points to a blob
-
-
- # ref^0 returns commit being pointed to, same with ref~0, and ^{}
- tag = rev_parse('0.1.4')
- for token in (('~0', '^0', '^{}')):
- assert tag.object == rev_parse('0.1.4%s' % token)
- # END handle multiple tokens
-
- # try partial parsing
- max_items = 40
- for i, binsha in enumerate(self.rorepo.odb.sha_iter()):
- assert rev_parse(bin_to_hex(binsha)[:8-(i%2)]).binsha == binsha
- if i > max_items:
- # this is rather slow currently, as rev_parse returns an object
- # which requires accessing packs, it has some additional overhead
- break
- # END for each binsha in repo
-
- # missing closing brace commit^{tree
- self.failUnlessRaises(ValueError, rev_parse, '0.1.4^{tree')
-
- # missing starting brace
- self.failUnlessRaises(ValueError, rev_parse, '0.1.4^tree}')
-
- # REVLOG
- #######
- head = self.rorepo.head
-
- # need to specify a ref when using the @ syntax
- self.failUnlessRaises(BadObject, rev_parse, "%s@{0}" % head.commit.hexsha)
-
- # uses HEAD.ref by default
- assert rev_parse('@{0}') == head.commit
- if not head.is_detached:
- refspec = '%s@{0}' % head.ref.name
- assert rev_parse(refspec) == head.ref.commit
- # all additional specs work as well
- assert rev_parse(refspec+"^{tree}") == head.commit.tree
- assert rev_parse(refspec+":CHANGES").type == 'blob'
- #END operate on non-detached head
-
- # the most recent previous position of the currently checked out branch
-
- try:
- assert rev_parse('@{1}') != head.commit
- except IndexError:
- # on new checkouts, there isn't even a single past branch position
- # in the log
- pass
- #END handle fresh checkouts
-
- # position doesn't exist
- self.failUnlessRaises(IndexError, rev_parse, '@{10000}')
-
- # currently, nothing more is supported
- self.failUnlessRaises(NotImplementedError, rev_parse, "@{1 week ago}")
-
- def test_submodules(self):
- assert len(self.rorepo.submodules) == 2 # non-recursive
- # in previous configurations, we had recursive repositories so this would compare to 2
- # now there is only one left, as gitdb was merged, but we have smmap instead
- assert len(list(self.rorepo.iter_submodules())) == 2
-
- assert isinstance(self.rorepo.submodule("async"), Submodule)
- self.failUnlessRaises(ValueError, self.rorepo.submodule, "doesn't exist")
-
- @with_rw_repo('HEAD', bare=False)
- def test_submodule_update(self, rwrepo):
- # fails in bare mode
- rwrepo._bare = True
- # special handling: there are repo implementations which have a bare attribute. IN that case, set it directly
- if not rwrepo.bare:
- rwrepo.bare = True
- self.failUnlessRaises(InvalidGitRepositoryError, rwrepo.submodule_update)
- rwrepo._bare = False
- if rwrepo.bare:
- rwrepo.bare = False
- #END special repo handling
-
- # test create submodule
- sm = rwrepo.submodules[0]
- sm = rwrepo.create_submodule("my_new_sub", "some_path", join_path_native(self.rorepo.working_tree_dir, sm.path))
- assert isinstance(sm, Submodule)
-
- # note: the rest of this functionality is tested in test_submodule
-
-
+ def test_index(self):
+ index = self.rorepo.index
+ assert isinstance(index, IndexFile)
+
+ def test_tag(self):
+ assert self.rorepo.tag('0.1.5').commit
+ assert self.rorepo.tag('refs/tags/0.1.5').commit
+
+ def test_archive(self):
+ tmpfile = os.tmpfile()
+ self.rorepo.archive(tmpfile, '0.1.5')
+ assert tmpfile.tell()
+
+ @patch_object(Git, '_call_process')
+ def test_should_display_blame_information(self, git):
+ git.return_value = fixture('blame')
+ b = self.rorepo.blame( 'master', 'lib/git.py')
+ assert_equal(13, len(b))
+ assert_equal( 2, len(b[0]) )
+ # assert_equal(25, reduce(lambda acc, x: acc + len(x[-1]), b))
+ assert_equal(hash(b[0][0]), hash(b[9][0]))
+ c = b[0][0]
+ assert_true(git.called)
+ assert_equal(git.call_args, (('blame', 'master', '--', 'lib/git.py'), {'p': True}))
+
+ assert_equal('634396b2f541a9f2d58b00be1a07f0c358b999b3', c.hexsha)
+ assert_equal('Tom Preston-Werner', c.author.name)
+ assert_equal('tom@mojombo.com', c.author.email)
+ assert_equal(1191997100, c.authored_date)
+ assert_equal('Tom Preston-Werner', c.committer.name)
+ assert_equal('tom@mojombo.com', c.committer.email)
+ assert_equal(1191997100, c.committed_date)
+ assert_equal('initial grit setup', c.message)
+
+ # test the 'lines per commit' entries
+ tlist = b[0][1]
+ assert_true( tlist )
+ assert_true( isinstance( tlist[0], basestring ) )
+ assert_true( len( tlist ) < sum( len(t) for t in tlist ) ) # test for single-char bug
+
+ def test_blame_real(self):
+ c = 0
+ for item in self.rorepo.head.commit.tree.traverse(
+ predicate=lambda i, d: i.type == 'blob' and i.path.endswith('.py')):
+ c += 1
+ b = self.rorepo.blame(self.rorepo.head, item.path)
+ #END for each item to traverse
+ assert c
+
+ def test_untracked_files(self):
+ base = self.rorepo.working_tree_dir
+ files = ( join_path_native(base, "__test_myfile"),
+ join_path_native(base, "__test_other_file") )
+ num_recently_untracked = 0
+ try:
+ for fpath in files:
+ fd = open(fpath,"wb")
+ fd.close()
+ # END for each filename
+ untracked_files = self.rorepo.untracked_files
+ num_recently_untracked = len(untracked_files)
+
+ # assure we have all names - they are relative to the git-dir
+ num_test_untracked = 0
+ for utfile in untracked_files:
+ num_test_untracked += join_path_native(base, utfile) in files
+ assert len(files) == num_test_untracked
+ finally:
+ for fpath in files:
+ if os.path.isfile(fpath):
+ os.remove(fpath)
+ # END handle files
+
+ assert len(self.rorepo.untracked_files) == (num_recently_untracked - len(files))
+
+ def test_config_reader(self):
+ reader = self.rorepo.config_reader() # all config files
+ assert reader.read_only
+ reader = self.rorepo.config_reader("repository") # single config file
+ assert reader.read_only
+
+ def test_config_writer(self):
+ for config_level in self.rorepo.config_level:
+ try:
+ writer = self.rorepo.config_writer(config_level)
+ assert not writer.read_only
+ except IOError:
+ # its okay not to get a writer for some configuration files if we
+ # have no permissions
+ pass
+ # END for each config level
+
+ def test_creation_deletion(self):
+ # just a very quick test to assure it generally works. There are
+ # specialized cases in the test_refs module
+ head = self.rorepo.create_head("new_head", "HEAD~1")
+ self.rorepo.delete_head(head)
+
+ tag = self.rorepo.create_tag("new_tag", "HEAD~2")
+ self.rorepo.delete_tag(tag)
+ self.rorepo.config_writer()
+ remote = self.rorepo.create_remote("new_remote", "git@server:repo.git")
+ self.rorepo.delete_remote(remote)
+
+ def test_comparison_and_hash(self):
+ # this is only a preliminary test, more testing done in test_index
+ assert self.rorepo == self.rorepo and not (self.rorepo != self.rorepo)
+ assert len(set((self.rorepo, self.rorepo))) == 1
+
+ def test_git_cmd(self):
+ # test CatFileContentStream, just to be very sure we have no fencepost errors
+ # last \n is the terminating newline that it expects
+ l1 = "0123456789\n"
+ l2 = "abcdefghijklmnopqrstxy\n"
+ l3 = "z\n"
+ d = "%s%s%s\n" % (l1, l2, l3)
+
+ l1p = l1[:5]
+
+ # full size
+ # size is without terminating newline
+ def mkfull():
+ return Git.CatFileContentStream(len(d)-1, StringIO(d))
+
+ ts = 5
+ def mktiny():
+ return Git.CatFileContentStream(ts, StringIO(d))
+
+ # readlines no limit
+ s = mkfull()
+ lines = s.readlines()
+ assert len(lines) == 3 and lines[-1].endswith('\n')
+ assert s._stream.tell() == len(d) # must have scrubbed to the end
+
+ # realines line limit
+ s = mkfull()
+ lines = s.readlines(5)
+ assert len(lines) == 1
+
+ # readlines on tiny sections
+ s = mktiny()
+ lines = s.readlines()
+ assert len(lines) == 1 and lines[0] == l1p
+ assert s._stream.tell() == ts+1
+
+ # readline no limit
+ s = mkfull()
+ assert s.readline() == l1
+ assert s.readline() == l2
+ assert s.readline() == l3
+ assert s.readline() == ''
+ assert s._stream.tell() == len(d)
+
+ # readline limit
+ s = mkfull()
+ assert s.readline(5) == l1p
+ assert s.readline() == l1[5:]
+
+ # readline on tiny section
+ s = mktiny()
+ assert s.readline() == l1p
+ assert s.readline() == ''
+ assert s._stream.tell() == ts+1
+
+ # read no limit
+ s = mkfull()
+ assert s.read() == d[:-1]
+ assert s.read() == ''
+ assert s._stream.tell() == len(d)
+
+ # read limit
+ s = mkfull()
+ assert s.read(5) == l1p
+ assert s.read(6) == l1[5:]
+ assert s._stream.tell() == 5 + 6 # its not yet done
+
+ # read tiny
+ s = mktiny()
+ assert s.read(2) == l1[:2]
+ assert s._stream.tell() == 2
+ assert s.read() == l1[2:ts]
+ assert s._stream.tell() == ts+1
+
+ def _assert_rev_parse_types(self, name, rev_obj):
+ rev_parse = self.rorepo.rev_parse
+
+ if rev_obj.type == 'tag':
+ rev_obj = rev_obj.object
+
+ # tree and blob type
+ obj = rev_parse(name + '^{tree}')
+ assert obj == rev_obj.tree
+
+ obj = rev_parse(name + ':CHANGES')
+ assert obj.type == 'blob' and obj.path == 'CHANGES'
+ assert rev_obj.tree['CHANGES'] == obj
+
+
+ def _assert_rev_parse(self, name):
+ """tries multiple different rev-parse syntaxes with the given name
+ :return: parsed object"""
+ rev_parse = self.rorepo.rev_parse
+ orig_obj = rev_parse(name)
+ if orig_obj.type == 'tag':
+ obj = orig_obj.object
+ else:
+ obj = orig_obj
+ # END deref tags by default
+
+ # try history
+ rev = name + "~"
+ obj2 = rev_parse(rev)
+ assert obj2 == obj.parents[0]
+ self._assert_rev_parse_types(rev, obj2)
+
+ # history with number
+ ni = 11
+ history = [obj.parents[0]]
+ for pn in range(ni):
+ history.append(history[-1].parents[0])
+ # END get given amount of commits
+
+ for pn in range(11):
+ rev = name + "~%i" % (pn+1)
+ obj2 = rev_parse(rev)
+ assert obj2 == history[pn]
+ self._assert_rev_parse_types(rev, obj2)
+ # END history check
+
+ # parent ( default )
+ rev = name + "^"
+ obj2 = rev_parse(rev)
+ assert obj2 == obj.parents[0]
+ self._assert_rev_parse_types(rev, obj2)
+
+ # parent with number
+ for pn, parent in enumerate(obj.parents):
+ rev = name + "^%i" % (pn+1)
+ assert rev_parse(rev) == parent
+ self._assert_rev_parse_types(rev, parent)
+ # END for each parent
+
+ return orig_obj
+
+ @with_rw_repo('HEAD', bare=False)
+ def test_rw_rev_parse(self, rwrepo):
+ # verify it does not confuse branches with hexsha ids
+ ahead = rwrepo.create_head('aaaaaaaa')
+ assert(rwrepo.rev_parse(str(ahead)) == ahead.commit)
+
+ def test_rev_parse(self):
+ rev_parse = self.rorepo.rev_parse
+
+ # try special case: This one failed at some point, make sure its fixed
+ assert rev_parse("33ebe").hexsha == "33ebe7acec14b25c5f84f35a664803fcab2f7781"
+
+ # start from reference
+ num_resolved = 0
+
+ for ref in Reference.iter_items(self.rorepo):
+ path_tokens = ref.path.split("/")
+ for pt in range(len(path_tokens)):
+ path_section = '/'.join(path_tokens[-(pt+1):])
+ try:
+ obj = self._assert_rev_parse(path_section)
+ assert obj.type == ref.object.type
+ num_resolved += 1
+ except BadObject:
+ print "failed on %s" % path_section
+ # is fine, in case we have something like 112, which belongs to remotes/rname/merge-requests/112
+ pass
+ # END exception handling
+ # END for each token
+ # END for each reference
+ assert num_resolved
+
+ # it works with tags !
+ tag = self._assert_rev_parse('0.1.4')
+ assert tag.type == 'tag'
+
+ # try full sha directly ( including type conversion )
+ assert tag.object == rev_parse(tag.object.hexsha)
+ self._assert_rev_parse_types(tag.object.hexsha, tag.object)
+
+
+ # multiple tree types result in the same tree: HEAD^{tree}^{tree}:CHANGES
+ rev = '0.1.4^{tree}^{tree}'
+ assert rev_parse(rev) == tag.object.tree
+ assert rev_parse(rev+':CHANGES') == tag.object.tree['CHANGES']
+
+
+ # try to get parents from first revision - it should fail as no such revision
+ # exists
+ first_rev = "33ebe7acec14b25c5f84f35a664803fcab2f7781"
+ commit = rev_parse(first_rev)
+ assert len(commit.parents) == 0
+ assert commit.hexsha == first_rev
+ self.failUnlessRaises(BadObject, rev_parse, first_rev+"~")
+ self.failUnlessRaises(BadObject, rev_parse, first_rev+"^")
+
+ # short SHA1
+ commit2 = rev_parse(first_rev[:20])
+ assert commit2 == commit
+ commit2 = rev_parse(first_rev[:5])
+ assert commit2 == commit
+
+
+ # todo: dereference tag into a blob 0.1.7^{blob} - quite a special one
+ # needs a tag which points to a blob
+
+
+ # ref^0 returns commit being pointed to, same with ref~0, and ^{}
+ tag = rev_parse('0.1.4')
+ for token in (('~0', '^0', '^{}')):
+ assert tag.object == rev_parse('0.1.4%s' % token)
+ # END handle multiple tokens
+
+ # try partial parsing
+ max_items = 40
+ for i, binsha in enumerate(self.rorepo.odb.sha_iter()):
+ assert rev_parse(bin_to_hex(binsha)[:8-(i%2)]).binsha == binsha
+ if i > max_items:
+ # this is rather slow currently, as rev_parse returns an object
+ # which requires accessing packs, it has some additional overhead
+ break
+ # END for each binsha in repo
+
+ # missing closing brace commit^{tree
+ self.failUnlessRaises(ValueError, rev_parse, '0.1.4^{tree')
+
+ # missing starting brace
+ self.failUnlessRaises(ValueError, rev_parse, '0.1.4^tree}')
+
+ # REVLOG
+ #######
+ head = self.rorepo.head
+
+ # need to specify a ref when using the @ syntax
+ self.failUnlessRaises(BadObject, rev_parse, "%s@{0}" % head.commit.hexsha)
+
+ # uses HEAD.ref by default
+ assert rev_parse('@{0}') == head.commit
+ if not head.is_detached:
+ refspec = '%s@{0}' % head.ref.name
+ assert rev_parse(refspec) == head.ref.commit
+ # all additional specs work as well
+ assert rev_parse(refspec+"^{tree}") == head.commit.tree
+ assert rev_parse(refspec+":CHANGES").type == 'blob'
+ #END operate on non-detached head
+
+ # the most recent previous position of the currently checked out branch
+
+ try:
+ assert rev_parse('@{1}') != head.commit
+ except IndexError:
+ # on new checkouts, there isn't even a single past branch position
+ # in the log
+ pass
+ #END handle fresh checkouts
+
+ # position doesn't exist
+ self.failUnlessRaises(IndexError, rev_parse, '@{10000}')
+
+ # currently, nothing more is supported
+ self.failUnlessRaises(NotImplementedError, rev_parse, "@{1 week ago}")
+
+ def test_submodules(self):
+ assert len(self.rorepo.submodules) == 2 # non-recursive
+ # in previous configurations, we had recursive repositories so this would compare to 2
+ # now there is only one left, as gitdb was merged, but we have smmap instead
+ assert len(list(self.rorepo.iter_submodules())) == 2
+
+ assert isinstance(self.rorepo.submodule("async"), Submodule)
+ self.failUnlessRaises(ValueError, self.rorepo.submodule, "doesn't exist")
+
+ @with_rw_repo('HEAD', bare=False)
+ def test_submodule_update(self, rwrepo):
+ # fails in bare mode
+ rwrepo._bare = True
+ # special handling: there are repo implementations which have a bare attribute. IN that case, set it directly
+ if not rwrepo.bare:
+ rwrepo.bare = True
+ self.failUnlessRaises(InvalidGitRepositoryError, rwrepo.submodule_update)
+ rwrepo._bare = False
+ if rwrepo.bare:
+ rwrepo.bare = False
+ #END special repo handling
+
+ # test create submodule
+ sm = rwrepo.submodules[0]
+ sm = rwrepo.create_submodule("my_new_sub", "some_path", join_path_native(self.rorepo.working_tree_dir, sm.path))
+ assert isinstance(sm, Submodule)
+
+ # note: the rest of this functionality is tested in test_submodule
+
+
diff --git a/git/test/db/cmd/test_base.py b/git/test/db/cmd/test_base.py
index cbb4a339..890c0232 100644
--- a/git/test/db/cmd/test_base.py
+++ b/git/test/db/cmd/test_base.py
@@ -14,78 +14,78 @@ from git.db.cmd.base import *
from git.refs import TagReference, Reference, RemoteReference
class TestBase(RepoBase):
- RepoCls = CmdCompatibilityGitDB
+ RepoCls = CmdCompatibilityGitDB
- def test_basics(self):
- gdb = self.rorepo
-
- # partial to complete - works with everything
- hexsha = bin_to_hex(gdb.partial_to_complete_sha_hex("0.1.6"))
- assert len(hexsha) == 40
-
- assert bin_to_hex(gdb.partial_to_complete_sha_hex(hexsha[:20])) == hexsha
-
- # fails with BadObject
- for invalid_rev in ("0000", "bad/ref", "super bad"):
- self.failUnlessRaises(BadObject, gdb.partial_to_complete_sha_hex, invalid_rev)
-
- def test_fetch_info(self):
- self.failUnlessRaises(ValueError, CmdCmdFetchInfo._from_line, self.rorepo, "nonsense", '')
- self.failUnlessRaises(ValueError, CmdCmdFetchInfo._from_line, self.rorepo, "? [up to date] 0.1.7RC -> origin/0.1.7RC", '')
-
-
- def test_fetch_info(self):
- # assure we can handle remote-tracking branches
- fetch_info_line_fmt = "c437ee5deb8d00cf02f03720693e4c802e99f390 not-for-merge %s '0.3' of git://github.com/gitpython-developers/GitPython"
- remote_info_line_fmt = "* [new branch] nomatter -> %s"
- fi = CmdFetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % "local/master",
- fetch_info_line_fmt % 'remote-tracking branch')
-
- # we wouldn't be here if it wouldn't have worked
-
- # handles non-default refspecs: One can specify a different path in refs/remotes
- # or a special path just in refs/something for instance
-
- fi = CmdFetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % "subdir/tagname",
- fetch_info_line_fmt % 'tag')
-
- assert isinstance(fi.ref, TagReference)
- assert fi.ref.path.startswith('refs/tags')
-
- # it could be in a remote direcftory though
- fi = CmdFetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % "remotename/tags/tagname",
- fetch_info_line_fmt % 'tag')
-
- assert isinstance(fi.ref, TagReference)
- assert fi.ref.path.startswith('refs/remotes/')
-
- # it can also be anywhere !
- tag_path = "refs/something/remotename/tags/tagname"
- fi = CmdFetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % tag_path,
- fetch_info_line_fmt % 'tag')
-
- assert isinstance(fi.ref, TagReference)
- assert fi.ref.path == tag_path
-
- # branches default to refs/remotes
- fi = CmdFetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % "remotename/branch",
- fetch_info_line_fmt % 'branch')
-
- assert isinstance(fi.ref, RemoteReference)
- assert fi.ref.remote_name == 'remotename'
-
- # but you can force it anywhere, in which case we only have a references
- fi = CmdFetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % "refs/something/branch",
- fetch_info_line_fmt % 'branch')
-
- assert type(fi.ref) is Reference
- assert fi.ref.path == "refs/something/branch"
-
-
-
+ def test_basics(self):
+ gdb = self.rorepo
+
+ # partial to complete - works with everything
+ hexsha = bin_to_hex(gdb.partial_to_complete_sha_hex("0.1.6"))
+ assert len(hexsha) == 40
+
+ assert bin_to_hex(gdb.partial_to_complete_sha_hex(hexsha[:20])) == hexsha
+
+ # fails with BadObject
+ for invalid_rev in ("0000", "bad/ref", "super bad"):
+ self.failUnlessRaises(BadObject, gdb.partial_to_complete_sha_hex, invalid_rev)
+
+ def test_fetch_info(self):
+ self.failUnlessRaises(ValueError, CmdCmdFetchInfo._from_line, self.rorepo, "nonsense", '')
+ self.failUnlessRaises(ValueError, CmdCmdFetchInfo._from_line, self.rorepo, "? [up to date] 0.1.7RC -> origin/0.1.7RC", '')
+
+
+ def test_fetch_info(self):
+ # assure we can handle remote-tracking branches
+ fetch_info_line_fmt = "c437ee5deb8d00cf02f03720693e4c802e99f390 not-for-merge %s '0.3' of git://github.com/gitpython-developers/GitPython"
+ remote_info_line_fmt = "* [new branch] nomatter -> %s"
+ fi = CmdFetchInfo._from_line(self.rorepo,
+ remote_info_line_fmt % "local/master",
+ fetch_info_line_fmt % 'remote-tracking branch')
+
+ # we wouldn't be here if it wouldn't have worked
+
+ # handles non-default refspecs: One can specify a different path in refs/remotes
+ # or a special path just in refs/something for instance
+
+ fi = CmdFetchInfo._from_line(self.rorepo,
+ remote_info_line_fmt % "subdir/tagname",
+ fetch_info_line_fmt % 'tag')
+
+ assert isinstance(fi.ref, TagReference)
+ assert fi.ref.path.startswith('refs/tags')
+
+ # it could be in a remote direcftory though
+ fi = CmdFetchInfo._from_line(self.rorepo,
+ remote_info_line_fmt % "remotename/tags/tagname",
+ fetch_info_line_fmt % 'tag')
+
+ assert isinstance(fi.ref, TagReference)
+ assert fi.ref.path.startswith('refs/remotes/')
+
+ # it can also be anywhere !
+ tag_path = "refs/something/remotename/tags/tagname"
+ fi = CmdFetchInfo._from_line(self.rorepo,
+ remote_info_line_fmt % tag_path,
+ fetch_info_line_fmt % 'tag')
+
+ assert isinstance(fi.ref, TagReference)
+ assert fi.ref.path == tag_path
+
+ # branches default to refs/remotes
+ fi = CmdFetchInfo._from_line(self.rorepo,
+ remote_info_line_fmt % "remotename/branch",
+ fetch_info_line_fmt % 'branch')
+
+ assert isinstance(fi.ref, RemoteReference)
+ assert fi.ref.remote_name == 'remotename'
+
+ # but you can force it anywhere, in which case we only have a references
+ fi = CmdFetchInfo._from_line(self.rorepo,
+ remote_info_line_fmt % "refs/something/branch",
+ fetch_info_line_fmt % 'branch')
+
+ assert type(fi.ref) is Reference
+ assert fi.ref.path == "refs/something/branch"
+
+
+
diff --git a/git/test/db/dulwich/lib.py b/git/test/db/dulwich/lib.py
index 56734064..a58469f1 100644
--- a/git/test/db/dulwich/lib.py
+++ b/git/test/db/dulwich/lib.py
@@ -1,23 +1,23 @@
"""dulwich specific utilities, as well as all the default ones"""
from git.test.lib import (
- InheritedTestMethodsOverrideWrapperMetaClsAutoMixin,
- needs_module_or_skip
- )
+ InheritedTestMethodsOverrideWrapperMetaClsAutoMixin,
+ needs_module_or_skip
+ )
__all__ = ['needs_dulwich_or_skip', 'DulwichRequiredMetaMixin']
#{ Decoorators
def needs_dulwich_or_skip(func):
- """Skip this test if we have no dulwich - print warning"""
- return needs_module_or_skip('dulwich')(func)
+ """Skip this test if we have no dulwich - print warning"""
+ return needs_module_or_skip('dulwich')(func)
#}END decorators
#{ MetaClasses
class DulwichRequiredMetaMixin(InheritedTestMethodsOverrideWrapperMetaClsAutoMixin):
- decorator = [needs_dulwich_or_skip]
+ decorator = [needs_dulwich_or_skip]
#} END metaclasses
diff --git a/git/test/db/dulwich/test_base.py b/git/test/db/dulwich/test_base.py
index 78416518..ed2f8975 100644
--- a/git/test/db/dulwich/test_base.py
+++ b/git/test/db/dulwich/test_base.py
@@ -9,24 +9,24 @@ from git.test.db.base import RepoBase
try:
- import dulwich
+ import dulwich
except ImportError:
- # om this case, all other dulwich tests will be skipped
- # Need to properly initialize the class though, otherwise it would fail
- from git.db.complex import PureCompatibilityGitDB as DulwichDB
+ # om this case, all other dulwich tests will be skipped
+ # Need to properly initialize the class though, otherwise it would fail
+ from git.db.complex import PureCompatibilityGitDB as DulwichDB
else:
- # now we know dulwich is available, to do futher imports
- from git.db.dulwich.complex import DulwichCompatibilityGitDB as DulwichDB
-
+ # now we know dulwich is available, to do futher imports
+ from git.db.dulwich.complex import DulwichCompatibilityGitDB as DulwichDB
+
#END handle imports
class TestDulwichDBBase(RepoBase):
- __metaclass__ = DulwichRequiredMetaMixin
- RepoCls = DulwichDB
-
- @needs_dulwich_or_skip
- @with_rw_repo('HEAD', bare=False)
- def test_basics(self, rw_repo):
- db = DulwichDB(rw_repo.working_tree_dir)
-
-
+ __metaclass__ = DulwichRequiredMetaMixin
+ RepoCls = DulwichDB
+
+ @needs_dulwich_or_skip
+ @with_rw_repo('HEAD', bare=False)
+ def test_basics(self, rw_repo):
+ db = DulwichDB(rw_repo.working_tree_dir)
+
+
diff --git a/git/test/db/lib.py b/git/test/db/lib.py
index 2b3ddde5..df9fec76 100644
--- a/git/test/db/lib.py
+++ b/git/test/db/lib.py
@@ -4,21 +4,21 @@
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Base classes for object db testing"""
from git.test.lib import (
- with_rw_directory,
- with_packs_rw,
- ZippedStoreShaWriter,
- fixture_path,
- TestBase,
- rorepo_dir,
- )
+ with_rw_directory,
+ with_packs_rw,
+ ZippedStoreShaWriter,
+ fixture_path,
+ TestBase,
+ rorepo_dir,
+ )
from git.stream import Sha1Writer
from git.base import (
- IStream,
- OStream,
- OInfo
- )
-
+ IStream,
+ OStream,
+ OInfo
+ )
+
from git.exc import BadObject
from git.typ import str_blob_type
@@ -28,220 +28,220 @@ from struct import pack
__all__ = ('TestDBBase', 'with_rw_directory', 'with_packs_rw', 'fixture_path')
-
+
class TestDBBase(TestBase):
- """Base Class providing default functionality to all tests such as:
-
- - Utility functions provided by the TestCase base of the unittest method such as::
- self.fail("todo")
- self.failUnlessRaises(...)
-
- - Class level repository which is considered read-only as it is shared among
- all test cases in your type.
- Access it using::
- self.rorepo # 'ro' stands for read-only
-
- The rorepo is in fact your current project's git repo. If you refer to specific
- shas for your objects, be sure you choose some that are part of the immutable portion
- of the project history ( to assure tests don't fail for others ).
-
- Derived types can override the default repository type to create a different
- read-only repo, allowing to test their specific type
- """
-
- # data
- two_lines = "1234\nhello world"
- all_data = (two_lines, )
-
- #{ Configuration
- # The repository type to instantiate. It takes at least a path to operate upon
- # during instantiation.
- RepoCls = None
-
- # if True, a read-only repo will be provided and RepoCls must be set.
- # Otherwise it may remain unset
- needs_ro_repo = True
- #} END configuration
-
- @classmethod
- def setUpAll(cls):
- """
- Dynamically add a read-only repository to our actual type. This way
- each test type has its own repository
- """
- if cls.needs_ro_repo:
- if cls is not TestDBBase:
- assert cls.RepoCls is not None, "RepoCls class member must be set in %s" % cls
- cls.rorepo = cls.RepoCls(rorepo_dir())
- #END handle rorepo
-
- def _assert_object_writing_simple(self, db):
- # write a bunch of objects and query their streams and info
- null_objs = db.size()
- ni = 250
- for i in xrange(ni):
- data = pack(">L", i)
- istream = IStream(str_blob_type, len(data), StringIO(data))
- new_istream = db.store(istream)
- assert new_istream is istream
- assert db.has_object(istream.binsha)
-
- info = db.info(istream.binsha)
- assert isinstance(info, OInfo)
- assert info.type == istream.type and info.size == istream.size
-
- stream = db.stream(istream.binsha)
- assert isinstance(stream, OStream)
- assert stream.binsha == info.binsha and stream.type == info.type
- assert stream.read() == data
- # END for each item
-
- assert db.size() == null_objs + ni
- shas = list(db.sha_iter())
- assert len(shas) == db.size()
- assert len(shas[0]) == 20
-
-
- def _assert_object_writing(self, db):
- """General tests to verify object writing, compatible to ObjectDBW
- :note: requires write access to the database"""
- # start in 'dry-run' mode, using a simple sha1 writer
- ostreams = (ZippedStoreShaWriter, None)
- for ostreamcls in ostreams:
- for data in self.all_data:
- dry_run = ostreamcls is not None
- ostream = None
- if ostreamcls is not None:
- ostream = ostreamcls()
- assert isinstance(ostream, Sha1Writer)
- # END create ostream
-
- prev_ostream = db.set_ostream(ostream)
- assert type(prev_ostream) in ostreams or prev_ostream in ostreams
-
- istream = IStream(str_blob_type, len(data), StringIO(data))
-
- # store returns same istream instance, with new sha set
- my_istream = db.store(istream)
- sha = istream.binsha
- assert my_istream is istream
- assert db.has_object(sha) != dry_run
- assert len(sha) == 20
-
- # verify data - the slow way, we want to run code
- if not dry_run:
- info = db.info(sha)
- assert str_blob_type == info.type
- assert info.size == len(data)
-
- ostream = db.stream(sha)
- assert ostream.read() == data
- assert ostream.type == str_blob_type
- assert ostream.size == len(data)
- else:
- self.failUnlessRaises(BadObject, db.info, sha)
- self.failUnlessRaises(BadObject, db.stream, sha)
-
- # DIRECT STREAM COPY
- # our data hase been written in object format to the StringIO
- # we pasesd as output stream. No physical database representation
- # was created.
- # Test direct stream copy of object streams, the result must be
- # identical to what we fed in
- ostream.seek(0)
- istream.stream = ostream
- assert istream.binsha is not None
- prev_sha = istream.binsha
-
- db.set_ostream(ZippedStoreShaWriter())
- db.store(istream)
- assert istream.binsha == prev_sha
- new_ostream = db.ostream()
-
- # note: only works as long our store write uses the same compression
- # level, which is zip_best
- assert ostream.getvalue() == new_ostream.getvalue()
- # END for each data set
- # END for each dry_run mode
-
- def _assert_object_writing_async(self, db):
- """Test generic object writing using asynchronous access"""
- ni = 5000
- def istream_generator(offset=0, ni=ni):
- for data_src in xrange(ni):
- data = str(data_src + offset)
- yield IStream(str_blob_type, len(data), StringIO(data))
- # END for each item
- # END generator utility
-
- # for now, we are very trusty here as we expect it to work if it worked
- # in the single-stream case
-
- # write objects
- reader = IteratorReader(istream_generator())
- istream_reader = db.store_async(reader)
- istreams = istream_reader.read() # read all
- assert istream_reader.task().error() is None
- assert len(istreams) == ni
-
- for stream in istreams:
- assert stream.error is None
- assert len(stream.binsha) == 20
- assert isinstance(stream, IStream)
- # END assert each stream
-
- # test has-object-async - we must have all previously added ones
- reader = IteratorReader( istream.binsha for istream in istreams )
- hasobject_reader = db.has_object_async(reader)
- count = 0
- for sha, has_object in hasobject_reader:
- assert has_object
- count += 1
- # END for each sha
- assert count == ni
-
- # read the objects we have just written
- reader = IteratorReader( istream.binsha for istream in istreams )
- ostream_reader = db.stream_async(reader)
-
- # read items individually to prevent hitting possible sys-limits
- count = 0
- for ostream in ostream_reader:
- assert isinstance(ostream, OStream)
- count += 1
- # END for each ostream
- assert ostream_reader.task().error() is None
- assert count == ni
-
- # get info about our items
- reader = IteratorReader( istream.binsha for istream in istreams )
- info_reader = db.info_async(reader)
-
- count = 0
- for oinfo in info_reader:
- assert isinstance(oinfo, OInfo)
- count += 1
- # END for each oinfo instance
- assert count == ni
-
-
- # combined read-write using a converter
- # add 2500 items, and obtain their output streams
- nni = 2500
- reader = IteratorReader(istream_generator(offset=ni, ni=nni))
- istream_to_sha = lambda istreams: [ istream.binsha for istream in istreams ]
-
- istream_reader = db.store_async(reader)
- istream_reader.set_post_cb(istream_to_sha)
-
- ostream_reader = db.stream_async(istream_reader)
-
- count = 0
- # read it individually, otherwise we might run into the ulimit
- for ostream in ostream_reader:
- assert isinstance(ostream, OStream)
- count += 1
- # END for each ostream
- assert count == nni
-
-
+ """Base Class providing default functionality to all tests such as:
+
+ - Utility functions provided by the TestCase base of the unittest method such as::
+ self.fail("todo")
+ self.failUnlessRaises(...)
+
+ - Class level repository which is considered read-only as it is shared among
+ all test cases in your type.
+ Access it using::
+ self.rorepo # 'ro' stands for read-only
+
+ The rorepo is in fact your current project's git repo. If you refer to specific
+ shas for your objects, be sure you choose some that are part of the immutable portion
+ of the project history ( to assure tests don't fail for others ).
+
+ Derived types can override the default repository type to create a different
+ read-only repo, allowing to test their specific type
+ """
+
+ # data
+ two_lines = "1234\nhello world"
+ all_data = (two_lines, )
+
+ #{ Configuration
+ # The repository type to instantiate. It takes at least a path to operate upon
+ # during instantiation.
+ RepoCls = None
+
+ # if True, a read-only repo will be provided and RepoCls must be set.
+ # Otherwise it may remain unset
+ needs_ro_repo = True
+ #} END configuration
+
+ @classmethod
+ def setUpAll(cls):
+ """
+ Dynamically add a read-only repository to our actual type. This way
+ each test type has its own repository
+ """
+ if cls.needs_ro_repo:
+ if cls is not TestDBBase:
+ assert cls.RepoCls is not None, "RepoCls class member must be set in %s" % cls
+ cls.rorepo = cls.RepoCls(rorepo_dir())
+ #END handle rorepo
+
+ def _assert_object_writing_simple(self, db):
+ # write a bunch of objects and query their streams and info
+ null_objs = db.size()
+ ni = 250
+ for i in xrange(ni):
+ data = pack(">L", i)
+ istream = IStream(str_blob_type, len(data), StringIO(data))
+ new_istream = db.store(istream)
+ assert new_istream is istream
+ assert db.has_object(istream.binsha)
+
+ info = db.info(istream.binsha)
+ assert isinstance(info, OInfo)
+ assert info.type == istream.type and info.size == istream.size
+
+ stream = db.stream(istream.binsha)
+ assert isinstance(stream, OStream)
+ assert stream.binsha == info.binsha and stream.type == info.type
+ assert stream.read() == data
+ # END for each item
+
+ assert db.size() == null_objs + ni
+ shas = list(db.sha_iter())
+ assert len(shas) == db.size()
+ assert len(shas[0]) == 20
+
+
+ def _assert_object_writing(self, db):
+ """General tests to verify object writing, compatible to ObjectDBW
+ :note: requires write access to the database"""
+ # start in 'dry-run' mode, using a simple sha1 writer
+ ostreams = (ZippedStoreShaWriter, None)
+ for ostreamcls in ostreams:
+ for data in self.all_data:
+ dry_run = ostreamcls is not None
+ ostream = None
+ if ostreamcls is not None:
+ ostream = ostreamcls()
+ assert isinstance(ostream, Sha1Writer)
+ # END create ostream
+
+ prev_ostream = db.set_ostream(ostream)
+ assert type(prev_ostream) in ostreams or prev_ostream in ostreams
+
+ istream = IStream(str_blob_type, len(data), StringIO(data))
+
+ # store returns same istream instance, with new sha set
+ my_istream = db.store(istream)
+ sha = istream.binsha
+ assert my_istream is istream
+ assert db.has_object(sha) != dry_run
+ assert len(sha) == 20
+
+ # verify data - the slow way, we want to run code
+ if not dry_run:
+ info = db.info(sha)
+ assert str_blob_type == info.type
+ assert info.size == len(data)
+
+ ostream = db.stream(sha)
+ assert ostream.read() == data
+ assert ostream.type == str_blob_type
+ assert ostream.size == len(data)
+ else:
+ self.failUnlessRaises(BadObject, db.info, sha)
+ self.failUnlessRaises(BadObject, db.stream, sha)
+
+ # DIRECT STREAM COPY
+ # our data hase been written in object format to the StringIO
+ # we pasesd as output stream. No physical database representation
+ # was created.
+ # Test direct stream copy of object streams, the result must be
+ # identical to what we fed in
+ ostream.seek(0)
+ istream.stream = ostream
+ assert istream.binsha is not None
+ prev_sha = istream.binsha
+
+ db.set_ostream(ZippedStoreShaWriter())
+ db.store(istream)
+ assert istream.binsha == prev_sha
+ new_ostream = db.ostream()
+
+ # note: only works as long our store write uses the same compression
+ # level, which is zip_best
+ assert ostream.getvalue() == new_ostream.getvalue()
+ # END for each data set
+ # END for each dry_run mode
+
+ def _assert_object_writing_async(self, db):
+ """Test generic object writing using asynchronous access"""
+ ni = 5000
+ def istream_generator(offset=0, ni=ni):
+ for data_src in xrange(ni):
+ data = str(data_src + offset)
+ yield IStream(str_blob_type, len(data), StringIO(data))
+ # END for each item
+ # END generator utility
+
+ # for now, we are very trusty here as we expect it to work if it worked
+ # in the single-stream case
+
+ # write objects
+ reader = IteratorReader(istream_generator())
+ istream_reader = db.store_async(reader)
+ istreams = istream_reader.read() # read all
+ assert istream_reader.task().error() is None
+ assert len(istreams) == ni
+
+ for stream in istreams:
+ assert stream.error is None
+ assert len(stream.binsha) == 20
+ assert isinstance(stream, IStream)
+ # END assert each stream
+
+ # test has-object-async - we must have all previously added ones
+ reader = IteratorReader( istream.binsha for istream in istreams )
+ hasobject_reader = db.has_object_async(reader)
+ count = 0
+ for sha, has_object in hasobject_reader:
+ assert has_object
+ count += 1
+ # END for each sha
+ assert count == ni
+
+ # read the objects we have just written
+ reader = IteratorReader( istream.binsha for istream in istreams )
+ ostream_reader = db.stream_async(reader)
+
+ # read items individually to prevent hitting possible sys-limits
+ count = 0
+ for ostream in ostream_reader:
+ assert isinstance(ostream, OStream)
+ count += 1
+ # END for each ostream
+ assert ostream_reader.task().error() is None
+ assert count == ni
+
+ # get info about our items
+ reader = IteratorReader( istream.binsha for istream in istreams )
+ info_reader = db.info_async(reader)
+
+ count = 0
+ for oinfo in info_reader:
+ assert isinstance(oinfo, OInfo)
+ count += 1
+ # END for each oinfo instance
+ assert count == ni
+
+
+ # combined read-write using a converter
+ # add 2500 items, and obtain their output streams
+ nni = 2500
+ reader = IteratorReader(istream_generator(offset=ni, ni=nni))
+ istream_to_sha = lambda istreams: [ istream.binsha for istream in istreams ]
+
+ istream_reader = db.store_async(reader)
+ istream_reader.set_post_cb(istream_to_sha)
+
+ ostream_reader = db.stream_async(istream_reader)
+
+ count = 0
+ # read it individually, otherwise we might run into the ulimit
+ for ostream in ostream_reader:
+ assert isinstance(ostream, OStream)
+ count += 1
+ # END for each ostream
+ assert count == nni
+
+
diff --git a/git/test/db/py/test_base.py b/git/test/db/py/test_base.py
index 6b06bbe9..5d076bb2 100644
--- a/git/test/db/py/test_base.py
+++ b/git/test/db/py/test_base.py
@@ -8,9 +8,9 @@ from git.test.db.base import RepoBase
from git.db.complex import PureCompatibilityGitDB
class TestPyDBBase(RepoBase):
-
- RepoCls = PureCompatibilityGitDB
-
- def test_basics(self):
- pass
-
+
+ RepoCls = PureCompatibilityGitDB
+
+ def test_basics(self):
+ pass
+
diff --git a/git/test/db/py/test_git.py b/git/test/db/py/test_git.py
index ecaa5c8f..4f5b5fb5 100644
--- a/git/test/db/py/test_git.py
+++ b/git/test/db/py/test_git.py
@@ -12,40 +12,40 @@ from git.util import hex_to_bin, bin_to_hex
import os
class TestGitDB(TestDBBase):
- needs_ro_repo = False
-
- def test_reading(self):
- gdb = PureGitODB(os.path.join(rorepo_dir(), 'objects'))
-
- # we have packs and loose objects, alternates doesn't necessarily exist
- assert 1 < len(gdb.databases()) < 4
-
- # access should be possible
- git_sha = hex_to_bin("5aebcd5cb3340fb31776941d7e4d518a712a8655")
- assert isinstance(gdb.info(git_sha), OInfo)
- assert isinstance(gdb.stream(git_sha), OStream)
- assert gdb.size() > 200
- sha_list = list(gdb.sha_iter())
- assert len(sha_list) == gdb.size()
-
-
- # This is actually a test for compound functionality, but it doesn't
- # have a separate test module
- # test partial shas
- # this one as uneven and quite short
- assert gdb.partial_to_complete_sha_hex('5aebcd') == hex_to_bin("5aebcd5cb3340fb31776941d7e4d518a712a8655")
-
- # mix even/uneven hexshas
- for i, binsha in enumerate(sha_list[:50]):
- assert gdb.partial_to_complete_sha_hex(bin_to_hex(binsha)[:8-(i%2)]) == binsha
- # END for each sha
-
- self.failUnlessRaises(BadObject, gdb.partial_to_complete_sha_hex, "0000")
-
- @with_rw_directory
- def test_writing(self, path):
- gdb = PureGitODB(path)
-
- # its possible to write objects
- self._assert_object_writing(gdb)
- self._assert_object_writing_async(gdb)
+ needs_ro_repo = False
+
+ def test_reading(self):
+ gdb = PureGitODB(os.path.join(rorepo_dir(), 'objects'))
+
+ # we have packs and loose objects, alternates doesn't necessarily exist
+ assert 1 < len(gdb.databases()) < 4
+
+ # access should be possible
+ git_sha = hex_to_bin("5aebcd5cb3340fb31776941d7e4d518a712a8655")
+ assert isinstance(gdb.info(git_sha), OInfo)
+ assert isinstance(gdb.stream(git_sha), OStream)
+ assert gdb.size() > 200
+ sha_list = list(gdb.sha_iter())
+ assert len(sha_list) == gdb.size()
+
+
+ # This is actually a test for compound functionality, but it doesn't
+ # have a separate test module
+ # test partial shas
+ # this one as uneven and quite short
+ assert gdb.partial_to_complete_sha_hex('5aebcd') == hex_to_bin("5aebcd5cb3340fb31776941d7e4d518a712a8655")
+
+ # mix even/uneven hexshas
+ for i, binsha in enumerate(sha_list[:50]):
+ assert gdb.partial_to_complete_sha_hex(bin_to_hex(binsha)[:8-(i%2)]) == binsha
+ # END for each sha
+
+ self.failUnlessRaises(BadObject, gdb.partial_to_complete_sha_hex, "0000")
+
+ @with_rw_directory
+ def test_writing(self, path):
+ gdb = PureGitODB(path)
+
+ # its possible to write objects
+ self._assert_object_writing(gdb)
+ self._assert_object_writing_async(gdb)
diff --git a/git/test/db/py/test_loose.py b/git/test/db/py/test_loose.py
index 0c9b4831..cfb0ca3a 100644
--- a/git/test/db/py/test_loose.py
+++ b/git/test/db/py/test_loose.py
@@ -6,31 +6,31 @@ from git.test.db.lib import TestDBBase, with_rw_directory
from git.db.py.loose import PureLooseObjectODB
from git.exc import BadObject
from git.util import bin_to_hex
-
+
class TestLooseDB(TestDBBase):
-
- needs_ro_repo = False
-
- @with_rw_directory
- def test_basics(self, path):
- ldb = PureLooseObjectODB(path)
-
- # write data
- self._assert_object_writing(ldb)
- self._assert_object_writing_async(ldb)
-
- # verify sha iteration and size
- shas = list(ldb.sha_iter())
- assert shas and len(shas[0]) == 20
-
- assert len(shas) == ldb.size()
-
- # verify find short object
- long_sha = bin_to_hex(shas[-1])
- for short_sha in (long_sha[:20], long_sha[:5]):
- assert bin_to_hex(ldb.partial_to_complete_sha_hex(short_sha)) == long_sha
- # END for each sha
-
- self.failUnlessRaises(BadObject, ldb.partial_to_complete_sha_hex, '0000')
- # raises if no object could be foudn
-
+
+ needs_ro_repo = False
+
+ @with_rw_directory
+ def test_basics(self, path):
+ ldb = PureLooseObjectODB(path)
+
+ # write data
+ self._assert_object_writing(ldb)
+ self._assert_object_writing_async(ldb)
+
+ # verify sha iteration and size
+ shas = list(ldb.sha_iter())
+ assert shas and len(shas[0]) == 20
+
+ assert len(shas) == ldb.size()
+
+ # verify find short object
+ long_sha = bin_to_hex(shas[-1])
+ for short_sha in (long_sha[:20], long_sha[:5]):
+ assert bin_to_hex(ldb.partial_to_complete_sha_hex(short_sha)) == long_sha
+ # END for each sha
+
+ self.failUnlessRaises(BadObject, ldb.partial_to_complete_sha_hex, '0000')
+ # raises if no object could be foudn
+
diff --git a/git/test/db/py/test_mem.py b/git/test/db/py/test_mem.py
index bc98dc56..bb879554 100644
--- a/git/test/db/py/test_mem.py
+++ b/git/test/db/py/test_mem.py
@@ -5,26 +5,26 @@
from git.test.db.lib import TestDBBase, with_rw_directory
from git.db.py.mem import PureMemoryDB
from git.db.py.loose import PureLooseObjectODB
-
+
class TestPureMemoryDB(TestDBBase):
-
- needs_ro_repo = False
+
+ needs_ro_repo = False
- @with_rw_directory
- def test_writing(self, path):
- mdb = PureMemoryDB()
-
- # write data
- self._assert_object_writing_simple(mdb)
-
- # test stream copy
- ldb = PureLooseObjectODB(path)
- assert ldb.size() == 0
- num_streams_copied = mdb.stream_copy(mdb.sha_iter(), ldb)
- assert num_streams_copied == mdb.size()
-
- assert ldb.size() == mdb.size()
- for sha in mdb.sha_iter():
- assert ldb.has_object(sha)
- assert ldb.stream(sha).read() == mdb.stream(sha).read()
- # END verify objects where copied and are equal
+ @with_rw_directory
+ def test_writing(self, path):
+ mdb = PureMemoryDB()
+
+ # write data
+ self._assert_object_writing_simple(mdb)
+
+ # test stream copy
+ ldb = PureLooseObjectODB(path)
+ assert ldb.size() == 0
+ num_streams_copied = mdb.stream_copy(mdb.sha_iter(), ldb)
+ assert num_streams_copied == mdb.size()
+
+ assert ldb.size() == mdb.size()
+ for sha in mdb.sha_iter():
+ assert ldb.has_object(sha)
+ assert ldb.stream(sha).read() == mdb.stream(sha).read()
+ # END verify objects where copied and are equal
diff --git a/git/test/db/py/test_pack.py b/git/test/db/py/test_pack.py
index 5043f446..54dc2e2c 100644
--- a/git/test/db/py/test_pack.py
+++ b/git/test/db/py/test_pack.py
@@ -13,64 +13,64 @@ import os
import random
class TestPackDB(TestDBBase):
-
- needs_ro_repo = False
-
- @with_packs_rw
- def test_writing(self, path):
- pdb = PurePackedODB(path)
-
- # on demand, we init our pack cache
- num_packs = len(pdb.entities())
- assert num_packs
- assert pdb._st_mtime != 0
-
- # test pack directory changed:
- # packs removed - rename a file, should affect the glob
- pack_path = pdb.entities()[0].pack().path()
- new_pack_path = pack_path + "renamed"
- os.rename(pack_path, new_pack_path)
-
- pdb.update_cache(force=True)
- assert len(pdb.entities()) == num_packs - 1
-
- # packs added
- os.rename(new_pack_path, pack_path)
- pdb.update_cache(force=True)
- assert len(pdb.entities()) == num_packs
-
- # bang on the cache
- # access the Entities directly, as there is no iteration interface
- # yet ( or required for now )
- sha_list = list(pdb.sha_iter())
- assert len(sha_list) == pdb.size()
-
- # hit all packs in random order
- random.shuffle(sha_list)
-
- for sha in sha_list:
- info = pdb.info(sha)
- stream = pdb.stream(sha)
- # END for each sha to query
-
-
- # test short finding - be a bit more brutal here
- max_bytes = 19
- min_bytes = 2
- num_ambiguous = 0
- for i, sha in enumerate(sha_list):
- short_sha = sha[:max((i % max_bytes), min_bytes)]
- try:
- assert pdb.partial_to_complete_sha(short_sha, len(short_sha)*2) == sha
- except AmbiguousObjectName:
- num_ambiguous += 1
- pass # valid, we can have short objects
- # END exception handling
- # END for each sha to find
-
- # we should have at least one ambiguous, considering the small sizes
- # but in our pack, there is no ambigious ...
- # assert num_ambiguous
-
- # non-existing
- self.failUnlessRaises(BadObject, pdb.partial_to_complete_sha, "\0\0", 4)
+
+ needs_ro_repo = False
+
+ @with_packs_rw
+ def test_writing(self, path):
+ pdb = PurePackedODB(path)
+
+ # on demand, we init our pack cache
+ num_packs = len(pdb.entities())
+ assert num_packs
+ assert pdb._st_mtime != 0
+
+ # test pack directory changed:
+ # packs removed - rename a file, should affect the glob
+ pack_path = pdb.entities()[0].pack().path()
+ new_pack_path = pack_path + "renamed"
+ os.rename(pack_path, new_pack_path)
+
+ pdb.update_cache(force=True)
+ assert len(pdb.entities()) == num_packs - 1
+
+ # packs added
+ os.rename(new_pack_path, pack_path)
+ pdb.update_cache(force=True)
+ assert len(pdb.entities()) == num_packs
+
+ # bang on the cache
+ # access the Entities directly, as there is no iteration interface
+ # yet ( or required for now )
+ sha_list = list(pdb.sha_iter())
+ assert len(sha_list) == pdb.size()
+
+ # hit all packs in random order
+ random.shuffle(sha_list)
+
+ for sha in sha_list:
+ info = pdb.info(sha)
+ stream = pdb.stream(sha)
+ # END for each sha to query
+
+
+ # test short finding - be a bit more brutal here
+ max_bytes = 19
+ min_bytes = 2
+ num_ambiguous = 0
+ for i, sha in enumerate(sha_list):
+ short_sha = sha[:max((i % max_bytes), min_bytes)]
+ try:
+ assert pdb.partial_to_complete_sha(short_sha, len(short_sha)*2) == sha
+ except AmbiguousObjectName:
+ num_ambiguous += 1
+ pass # valid, we can have short objects
+ # END exception handling
+ # END for each sha to find
+
+ # we should have at least one ambiguous, considering the small sizes
+ # but in our pack, there is no ambigious ...
+ # assert num_ambiguous
+
+ # non-existing
+ self.failUnlessRaises(BadObject, pdb.partial_to_complete_sha, "\0\0", 4)
diff --git a/git/test/db/py/test_ref.py b/git/test/db/py/test_ref.py
index c5374dc9..dfaf9644 100644
--- a/git/test/db/py/test_ref.py
+++ b/git/test/db/py/test_ref.py
@@ -6,57 +6,57 @@ from git.test.db.lib import *
from git.db.py.ref import PureReferenceDB
from git.util import (
- NULL_BIN_SHA,
- hex_to_bin
- )
+ NULL_BIN_SHA,
+ hex_to_bin
+ )
import os
-
+
class TestPureReferenceDB(TestDBBase):
-
- needs_ro_repo = False
-
- def make_alt_file(self, alt_path, alt_list):
- """Create an alternates file which contains the given alternates.
- The list can be empty"""
- alt_file = open(alt_path, "wb")
- for alt in alt_list:
- alt_file.write(alt + "\n")
- alt_file.close()
-
- @with_rw_directory
- def test_writing(self, path):
- NULL_BIN_SHA = '\0' * 20
-
- alt_path = os.path.join(path, 'alternates')
- rdb = PureReferenceDB(alt_path)
- assert len(rdb.databases()) == 0
- assert rdb.size() == 0
- assert len(list(rdb.sha_iter())) == 0
-
- # try empty, non-existing
- assert not rdb.has_object(NULL_BIN_SHA)
-
-
- # setup alternate file
- # add two, one is invalid
- own_repo_path = fixture_path('../../../.git/objects') # use own repo
- self.make_alt_file(alt_path, [own_repo_path, "invalid/path"])
- rdb.update_cache()
- assert len(rdb.databases()) == 1
-
- # we should now find a default revision of ours
- git_sha = hex_to_bin("5aebcd5cb3340fb31776941d7e4d518a712a8655")
- assert rdb.has_object(git_sha)
-
- # remove valid
- self.make_alt_file(alt_path, ["just/one/invalid/path"])
- rdb.update_cache()
- assert len(rdb.databases()) == 0
-
- # add valid
- self.make_alt_file(alt_path, [own_repo_path])
- rdb.update_cache()
- assert len(rdb.databases()) == 1
-
-
+
+ needs_ro_repo = False
+
+ def make_alt_file(self, alt_path, alt_list):
+ """Create an alternates file which contains the given alternates.
+ The list can be empty"""
+ alt_file = open(alt_path, "wb")
+ for alt in alt_list:
+ alt_file.write(alt + "\n")
+ alt_file.close()
+
+ @with_rw_directory
+ def test_writing(self, path):
+ NULL_BIN_SHA = '\0' * 20
+
+ alt_path = os.path.join(path, 'alternates')
+ rdb = PureReferenceDB(alt_path)
+ assert len(rdb.databases()) == 0
+ assert rdb.size() == 0
+ assert len(list(rdb.sha_iter())) == 0
+
+ # try empty, non-existing
+ assert not rdb.has_object(NULL_BIN_SHA)
+
+
+ # setup alternate file
+ # add two, one is invalid
+ own_repo_path = fixture_path('../../../.git/objects') # use own repo
+ self.make_alt_file(alt_path, [own_repo_path, "invalid/path"])
+ rdb.update_cache()
+ assert len(rdb.databases()) == 1
+
+ # we should now find a default revision of ours
+ git_sha = hex_to_bin("5aebcd5cb3340fb31776941d7e4d518a712a8655")
+ assert rdb.has_object(git_sha)
+
+ # remove valid
+ self.make_alt_file(alt_path, ["just/one/invalid/path"])
+ rdb.update_cache()
+ assert len(rdb.databases()) == 0
+
+ # add valid
+ self.make_alt_file(alt_path, [own_repo_path])
+ rdb.update_cache()
+ assert len(rdb.databases()) == 1
+
+
diff --git a/git/test/db/pygit2/lib.py b/git/test/db/pygit2/lib.py
index 356df9dc..fab762e7 100644
--- a/git/test/db/pygit2/lib.py
+++ b/git/test/db/pygit2/lib.py
@@ -1,23 +1,23 @@
"""pygit2 specific utilities, as well as all the default ones"""
from git.test.lib import (
- InheritedTestMethodsOverrideWrapperMetaClsAutoMixin,
- needs_module_or_skip
- )
+ InheritedTestMethodsOverrideWrapperMetaClsAutoMixin,
+ needs_module_or_skip
+ )
__all__ = ['needs_pygit2_or_skip', 'Pygit2RequiredMetaMixin']
#{ Decoorators
def needs_pygit2_or_skip(func):
- """Skip this test if we have no pygit2 - print warning"""
- return needs_module_or_skip('pygit2')(func)
+ """Skip this test if we have no pygit2 - print warning"""
+ return needs_module_or_skip('pygit2')(func)
#}END decorators
#{ MetaClasses
class Pygit2RequiredMetaMixin(InheritedTestMethodsOverrideWrapperMetaClsAutoMixin):
- decorator = [needs_pygit2_or_skip]
+ decorator = [needs_pygit2_or_skip]
#} END metaclasses
diff --git a/git/test/db/pygit2/test_base.py b/git/test/db/pygit2/test_base.py
index 246a1643..52ee24f5 100644
--- a/git/test/db/pygit2/test_base.py
+++ b/git/test/db/pygit2/test_base.py
@@ -9,24 +9,24 @@ from git.test.db.base import RepoBase
try:
- import pygit2
+ import pygit2
except ImportError:
- # om this case, all other pygit2 tests will be skipped
- # Need to properly initialize the class though, otherwise it would fail
- from git.db.complex import PureCompatibilityGitDB as Pygit2DB
+ # om this case, all other pygit2 tests will be skipped
+ # Need to properly initialize the class though, otherwise it would fail
+ from git.db.complex import PureCompatibilityGitDB as Pygit2DB
else:
- # now we know pygit2 is available, to do futher imports
- from git.db.pygit2.complex import Pygit2CompatibilityGitDB as Pygit2DB
-
+ # now we know pygit2 is available, to do futher imports
+ from git.db.pygit2.complex import Pygit2CompatibilityGitDB as Pygit2DB
+
#END handle imports
class TestPyGit2DBBase(RepoBase):
- __metaclass__ = Pygit2RequiredMetaMixin
- RepoCls = Pygit2DB
-
- @needs_pygit2_or_skip
- @with_rw_repo('HEAD', bare=False)
- def test_basics(self, rw_repo):
- db = Pygit2DB(rw_repo.working_tree_dir)
-
-
+ __metaclass__ = Pygit2RequiredMetaMixin
+ RepoCls = Pygit2DB
+
+ @needs_pygit2_or_skip
+ @with_rw_repo('HEAD', bare=False)
+ def test_basics(self, rw_repo):
+ db = Pygit2DB(rw_repo.working_tree_dir)
+
+
diff --git a/git/test/db/test_base.py b/git/test/db/test_base.py
index 2a882d0a..78da9f04 100644
--- a/git/test/db/test_base.py
+++ b/git/test/db/test_base.py
@@ -7,14 +7,14 @@ from git.db import RefSpec
class TestBase(TestDBBase):
- needs_ro_repo = False
+ needs_ro_repo = False
- @with_rw_directory
- def test_basics(self, path):
- self.failUnlessRaises(ValueError, RefSpec, None, None)
- rs = RefSpec(None, "something")
- assert rs.force == False
- assert rs.delete_destination()
- assert rs.source is None
- assert rs.destination == "something"
-
+ @with_rw_directory
+ def test_basics(self, path):
+ self.failUnlessRaises(ValueError, RefSpec, None, None)
+ rs = RefSpec(None, "something")
+ assert rs.force == False
+ assert rs.delete_destination()
+ assert rs.source is None
+ assert rs.destination == "something"
+