summaryrefslogtreecommitdiff
path: root/git/test/db
diff options
context:
space:
mode:
Diffstat (limited to 'git/test/db')
-rw-r--r--git/test/db/base.py285
-rw-r--r--git/test/db/cmd/test_base.py66
-rw-r--r--git/test/db/dulwich/lib.py8
-rw-r--r--git/test/db/dulwich/test_base.py10
-rw-r--r--git/test/db/lib.py105
-rw-r--r--git/test/db/py/test_base.py6
-rw-r--r--git/test/db/py/test_git.py22
-rw-r--r--git/test/db/py/test_loose.py18
-rw-r--r--git/test/db/py/test_mem.py13
-rw-r--r--git/test/db/py/test_pack.py36
-rw-r--r--git/test/db/py/test_ref.py32
-rw-r--r--git/test/db/pygit2/lib.py8
-rw-r--r--git/test/db/pygit2/test_base.py10
-rw-r--r--git/test/db/test_base.py2
14 files changed, 305 insertions, 316 deletions
diff --git a/git/test/db/base.py b/git/test/db/base.py
index 80cb9ebb..dd8e9d8f 100644
--- a/git/test/db/base.py
+++ b/git/test/db/base.py
@@ -14,7 +14,8 @@ from git.util import join_path_native
from git.exc import BadObject
from git.util import hex_to_bin, bin_to_hex
-import os, sys
+import os
+import sys
import tempfile
import shutil
from cStringIO import StringIO
@@ -24,12 +25,13 @@ from git.db.compat import RepoCompatibilityInterface
class RepoGlobalsItemDeletorMetaCls(GlobalsItemDeletorMetaCls):
ModuleToDelete = 'RepoBase'
-
+
class RepoBase(TestDBBase):
+
"""Basic test for everything a fully implemented repository should support"""
__metaclass__ = RepoGlobalsItemDeletorMetaCls
-
+
def test_new_should_raise_on_invalid_repo_location(self):
self.failUnlessRaises(InvalidGitRepositoryError, self.RepoCls, tempfile.gettempdir())
@@ -55,21 +57,21 @@ class RepoBase(TestDBBase):
def test_heads_should_populate_head_data(self):
for head in self.rorepo.heads:
assert head.name
- assert isinstance(head.commit,Commit)
- # END for each head
-
+ assert isinstance(head.commit, Commit)
+ # END for each head
+
assert isinstance(self.rorepo.heads.master, Head)
assert isinstance(self.rorepo.heads['master'], Head)
-
+
def test_tree_from_revision(self):
tree = self.rorepo.tree('0.1.6')
- assert len(tree.hexsha) == 40
+ assert len(tree.hexsha) == 40
assert tree.type == "tree"
assert self.rorepo.tree(tree) == tree
-
+
# try from invalid revision that does not exist
self.failUnlessRaises(BadObject, self.rorepo.tree, 'hello world')
-
+
def test_commit_from_revision(self):
commit = self.rorepo.commit('0.1.4')
assert commit.type == 'commit'
@@ -79,7 +81,7 @@ class RepoBase(TestDBBase):
mc = 10
commits = list(self.rorepo.iter_commits('0.1.6', max_count=mc))
assert len(commits) == mc
-
+
c = commits[0]
assert_equal('9a4b1d4d11eee3c5362a4152216376e634bd14cf', c.hexsha)
assert_equal(["c76852d0bff115720af3f27acdb084c59361e5f6"], [p.hexsha for p in c.parents])
@@ -87,11 +89,11 @@ class RepoBase(TestDBBase):
assert_equal("Michael Trier", c.author.name)
assert_equal("mtrier@gmail.com", c.author.email)
assert_equal(1232829715, c.authored_date)
- assert_equal(5*3600, c.author_tz_offset)
+ assert_equal(5 * 3600, c.author_tz_offset)
assert_equal("Michael Trier", c.committer.name)
assert_equal("mtrier@gmail.com", c.committer.email)
assert_equal(1232829715, c.committed_date)
- assert_equal(5*3600, c.committer_tz_offset)
+ assert_equal(5 * 3600, c.committer_tz_offset)
assert_equal("Bumped version 0.1.6\n", c.message)
c = commits[1]
@@ -106,34 +108,32 @@ class RepoBase(TestDBBase):
# END for each tree
assert num_trees == mc
-
def _assert_empty_repo(self, repo):
- # test all kinds of things with an empty, freshly initialized repo.
+ # test all kinds of things with an empty, freshly initialized repo.
# It should throw good errors
-
+
# entries should be empty
assert len(repo.index.entries) == 0
-
+
# head is accessible
assert repo.head
assert repo.head.ref
assert not repo.head.is_valid()
-
+
# we can change the head to some other ref
head_ref = Head.from_path(repo, Head.to_full_path('some_head'))
assert not head_ref.is_valid()
repo.head.ref = head_ref
-
+
# is_dirty can handle all kwargs
for args in ((1, 0, 0), (0, 1, 0), (0, 0, 1)):
assert not repo.is_dirty(*args)
- # END for each arg
-
+ # END for each arg
+
# we can add a file to the index ( if we are not bare )
if not repo.bare:
pass
# END test repos with working tree
-
def test_init(self):
prev_cwd = os.getcwd()
@@ -148,15 +148,14 @@ class RepoBase(TestDBBase):
assert isinstance(r, self.RepoCls)
assert r.bare == True
assert os.path.isdir(r.git_dir)
-
+
self._assert_empty_repo(r)
-
+
# test clone
clone_path = path + "_clone"
rc = r.clone(clone_path)
self._assert_empty_repo(rc)
-
-
+
try:
shutil.rmtree(clone_path)
except OSError:
@@ -164,11 +163,11 @@ class RepoBase(TestDBBase):
# of the parent directory
pass
# END exception handling
-
+
# try again, this time with the absolute version
rc = self.RepoCls.clone_from(r.git_dir, clone_path)
self._assert_empty_repo(rc)
-
+
shutil.rmtree(git_dir_abs)
try:
shutil.rmtree(clone_path)
@@ -177,14 +176,14 @@ class RepoBase(TestDBBase):
# of the parent directory
pass
# END exception handling
-
+
# END for each path
-
+
os.makedirs(git_dir_rela)
os.chdir(git_dir_rela)
r = self.RepoCls.init(bare=False)
r.bare == False
-
+
self._assert_empty_repo(r)
finally:
try:
@@ -193,26 +192,26 @@ class RepoBase(TestDBBase):
pass
os.chdir(prev_cwd)
# END restore previous state
-
+
def test_bare_property(self):
if isinstance(self.rorepo, RepoCompatibilityInterface):
self.rorepo.bare
- #END handle compatability
+ # END handle compatability
self.rorepo.is_bare
def test_daemon_export(self):
orig_val = self.rorepo.daemon_export
self.rorepo.daemon_export = not orig_val
- assert self.rorepo.daemon_export == ( not orig_val )
+ assert self.rorepo.daemon_export == (not orig_val)
self.rorepo.daemon_export = orig_val
assert self.rorepo.daemon_export == orig_val
-
+
def test_alternates(self):
cur_alternates = self.rorepo.alternates
# empty alternates
self.rorepo.alternates = []
assert self.rorepo.alternates == []
- alts = [ "other/location", "this/location" ]
+ alts = ["other/location", "this/location"]
self.rorepo.alternates = alts
assert alts == self.rorepo.alternates
self.rorepo.alternates = cur_alternates
@@ -224,13 +223,13 @@ class RepoBase(TestDBBase):
orig_value = self.rorepo._bare
self.rorepo._bare = True
assert_false(self.rorepo.is_dirty())
- self.rorepo._bare = orig_value
+ self.rorepo._bare = orig_value
def test_is_dirty(self):
self.rorepo._bare = False
- for index in (0,1):
- for working_tree in (0,1):
- for untracked_files in (0,1):
+ for index in (0, 1):
+ for working_tree in (0, 1):
+ for untracked_files in (0, 1):
assert self.rorepo.is_dirty(index, working_tree, untracked_files) in (True, False)
# END untracked files
# END working tree
@@ -246,28 +245,28 @@ class RepoBase(TestDBBase):
def test_index(self):
index = self.rorepo.index
assert isinstance(index, IndexFile)
-
+
def test_tag(self):
assert self.rorepo.tag('0.1.5').commit
assert self.rorepo.tag('refs/tags/0.1.5').commit
-
+
def test_archive(self):
tmpfile = os.tmpfile()
self.rorepo.archive(tmpfile, '0.1.5')
assert tmpfile.tell()
-
+
@patch.object(Git, '_call_process')
def test_should_display_blame_information(self, git):
git.return_value = fixture('blame')
- b = self.rorepo.blame( 'master', 'lib/git.py')
+ b = self.rorepo.blame('master', 'lib/git.py')
assert_equal(13, len(b))
- assert_equal( 2, len(b[0]) )
+ assert_equal(2, len(b[0]))
# assert_equal(25, reduce(lambda acc, x: acc + len(x[-1]), b))
assert_equal(hash(b[0][0]), hash(b[9][0]))
c = b[0][0]
assert_true(git.called)
assert_equal(git.call_args, (('blame', 'master', '--', 'lib/git.py'), {'p': True}))
-
+
assert_equal('634396b2f541a9f2d58b00be1a07f0c358b999b3', c.hexsha)
assert_equal('Tom Preston-Werner', c.author.name)
assert_equal('tom@mojombo.com', c.author.email)
@@ -276,35 +275,35 @@ class RepoBase(TestDBBase):
assert_equal('tom@mojombo.com', c.committer.email)
assert_equal(1191997100, c.committed_date)
assert_equal('initial grit setup', c.message)
-
+
# test the 'lines per commit' entries
tlist = b[0][1]
- assert_true( tlist )
- assert_true( isinstance( tlist[0], basestring ) )
- assert_true( len( tlist ) < sum( len(t) for t in tlist ) ) # test for single-char bug
-
+ assert_true(tlist)
+ assert_true(isinstance(tlist[0], basestring))
+ assert_true(len(tlist) < sum(len(t) for t in tlist)) # test for single-char bug
+
def test_blame_real(self):
c = 0
for item in self.rorepo.head.commit.tree.traverse(
- predicate=lambda i, d: i.type == 'blob' and i.path.endswith('.py')):
+ predicate=lambda i, d: i.type == 'blob' and i.path.endswith('.py')):
c += 1
b = self.rorepo.blame(self.rorepo.head, item.path)
- #END for each item to traverse
+ # END for each item to traverse
assert c
-
+
def test_untracked_files(self):
base = self.rorepo.working_tree_dir
- files = ( join_path_native(base, "__test_myfile"),
- join_path_native(base, "__test_other_file") )
+ files = (join_path_native(base, "__test_myfile"),
+ join_path_native(base, "__test_other_file"))
num_recently_untracked = 0
try:
for fpath in files:
- fd = open(fpath,"wb")
+ fd = open(fpath, "wb")
fd.close()
# END for each filename
untracked_files = self.rorepo.untracked_files
num_recently_untracked = len(untracked_files)
-
+
# assure we have all names - they are relative to the git-dir
num_test_untracked = 0
for utfile in untracked_files:
@@ -314,80 +313,81 @@ class RepoBase(TestDBBase):
for fpath in files:
if os.path.isfile(fpath):
os.remove(fpath)
- # END handle files
-
+ # END handle files
+
assert len(self.rorepo.untracked_files) == (num_recently_untracked - len(files))
-
+
def test_config_reader(self):
- reader = self.rorepo.config_reader() # all config files
+ reader = self.rorepo.config_reader() # all config files
assert reader.read_only
reader = self.rorepo.config_reader("repository") # single config file
assert reader.read_only
-
+
def test_config_writer(self):
for config_level in self.rorepo.config_level:
try:
writer = self.rorepo.config_writer(config_level)
assert not writer.read_only
except IOError:
- # its okay not to get a writer for some configuration files if we
+ # its okay not to get a writer for some configuration files if we
# have no permissions
- pass
- # END for each config level
-
+ pass
+ # END for each config level
+
def test_creation_deletion(self):
- # just a very quick test to assure it generally works. There are
+ # just a very quick test to assure it generally works. There are
# specialized cases in the test_refs module
head = self.rorepo.create_head("new_head", "HEAD~1")
self.rorepo.delete_head(head)
-
+
tag = self.rorepo.create_tag("new_tag", "HEAD~2")
self.rorepo.delete_tag(tag)
self.rorepo.config_writer()
remote = self.rorepo.create_remote("new_remote", "git@server:repo.git")
self.rorepo.delete_remote(remote)
-
+
def test_comparison_and_hash(self):
# this is only a preliminary test, more testing done in test_index
assert self.rorepo == self.rorepo and not (self.rorepo != self.rorepo)
assert len(set((self.rorepo, self.rorepo))) == 1
-
+
def test_git_cmd(self):
# test CatFileContentStream, just to be very sure we have no fencepost errors
# last \n is the terminating newline that it expects
l1 = "0123456789\n"
l2 = "abcdefghijklmnopqrstxy\n"
- l3 = "z\n"
+ l3 = "z\n"
d = "%s%s%s\n" % (l1, l2, l3)
-
+
l1p = l1[:5]
-
+
# full size
# size is without terminating newline
def mkfull():
- return Git.CatFileContentStream(len(d)-1, StringIO(d))
-
+ return Git.CatFileContentStream(len(d) - 1, StringIO(d))
+
ts = 5
+
def mktiny():
return Git.CatFileContentStream(ts, StringIO(d))
-
+
# readlines no limit
s = mkfull()
lines = s.readlines()
assert len(lines) == 3 and lines[-1].endswith('\n')
assert s._stream.tell() == len(d) # must have scrubbed to the end
-
+
# realines line limit
s = mkfull()
lines = s.readlines(5)
assert len(lines) == 1
-
+
# readlines on tiny sections
s = mktiny()
lines = s.readlines()
assert len(lines) == 1 and lines[0] == l1p
- assert s._stream.tell() == ts+1
-
+ assert s._stream.tell() == ts + 1
+
# readline no limit
s = mkfull()
assert s.readline() == l1
@@ -395,52 +395,51 @@ class RepoBase(TestDBBase):
assert s.readline() == l3
assert s.readline() == ''
assert s._stream.tell() == len(d)
-
+
# readline limit
s = mkfull()
assert s.readline(5) == l1p
assert s.readline() == l1[5:]
-
+
# readline on tiny section
s = mktiny()
assert s.readline() == l1p
assert s.readline() == ''
- assert s._stream.tell() == ts+1
-
+ assert s._stream.tell() == ts + 1
+
# read no limit
s = mkfull()
assert s.read() == d[:-1]
assert s.read() == ''
assert s._stream.tell() == len(d)
-
+
# read limit
s = mkfull()
assert s.read(5) == l1p
assert s.read(6) == l1[5:]
assert s._stream.tell() == 5 + 6 # its not yet done
-
+
# read tiny
s = mktiny()
assert s.read(2) == l1[:2]
assert s._stream.tell() == 2
assert s.read() == l1[2:ts]
- assert s._stream.tell() == ts+1
-
+ assert s._stream.tell() == ts + 1
+
def _assert_rev_parse_types(self, name, rev_obj):
rev_parse = self.rorepo.rev_parse
-
+
if rev_obj.type == 'tag':
rev_obj = rev_obj.object
-
+
# tree and blob type
obj = rev_parse(name + '^{tree}')
assert obj == rev_obj.tree
-
+
obj = rev_parse(name + ':CHANGES')
assert obj.type == 'blob' and obj.path == 'CHANGES'
assert rev_obj.tree['CHANGES'] == obj
-
-
+
def _assert_rev_parse(self, name):
"""tries multiple different rev-parse syntaxes with the given name
:return: parsed object"""
@@ -450,62 +449,62 @@ class RepoBase(TestDBBase):
obj = orig_obj.object
else:
obj = orig_obj
- # END deref tags by default
-
+ # END deref tags by default
+
# try history
rev = name + "~"
obj2 = rev_parse(rev)
assert obj2 == obj.parents[0]
self._assert_rev_parse_types(rev, obj2)
-
+
# history with number
ni = 11
history = [obj.parents[0]]
for pn in range(ni):
history.append(history[-1].parents[0])
# END get given amount of commits
-
+
for pn in range(11):
- rev = name + "~%i" % (pn+1)
+ rev = name + "~%i" % (pn + 1)
obj2 = rev_parse(rev)
assert obj2 == history[pn]
self._assert_rev_parse_types(rev, obj2)
# END history check
-
+
# parent ( default )
rev = name + "^"
obj2 = rev_parse(rev)
assert obj2 == obj.parents[0]
self._assert_rev_parse_types(rev, obj2)
-
+
# parent with number
for pn, parent in enumerate(obj.parents):
- rev = name + "^%i" % (pn+1)
+ rev = name + "^%i" % (pn + 1)
assert rev_parse(rev) == parent
self._assert_rev_parse_types(rev, parent)
# END for each parent
-
+
return orig_obj
-
+
@with_rw_repo('HEAD', bare=False)
def test_rw_rev_parse(self, rwrepo):
# verify it does not confuse branches with hexsha ids
ahead = rwrepo.create_head('aaaaaaaa')
assert(rwrepo.rev_parse(str(ahead)) == ahead.commit)
-
+
def test_rev_parse(self):
rev_parse = self.rorepo.rev_parse
-
+
# try special case: This one failed at some point, make sure its fixed
assert rev_parse("33ebe").hexsha == "33ebe7acec14b25c5f84f35a664803fcab2f7781"
-
+
# start from reference
num_resolved = 0
-
+
for ref in Reference.iter_items(self.rorepo):
path_tokens = ref.path.split("/")
for pt in range(len(path_tokens)):
- path_section = '/'.join(path_tokens[-(pt+1):])
+ path_section = '/'.join(path_tokens[-(pt + 1):])
try:
obj = self._assert_rev_parse(path_section)
assert obj.type == ref.object.type
@@ -518,106 +517,102 @@ class RepoBase(TestDBBase):
# END for each token
# END for each reference
assert num_resolved
-
+
# it works with tags !
tag = self._assert_rev_parse('0.1.4')
assert tag.type == 'tag'
-
+
# try full sha directly ( including type conversion )
assert tag.object == rev_parse(tag.object.hexsha)
self._assert_rev_parse_types(tag.object.hexsha, tag.object)
-
-
+
# multiple tree types result in the same tree: HEAD^{tree}^{tree}:CHANGES
rev = '0.1.4^{tree}^{tree}'
assert rev_parse(rev) == tag.object.tree
- assert rev_parse(rev+':CHANGES') == tag.object.tree['CHANGES']
-
-
+ assert rev_parse(rev + ':CHANGES') == tag.object.tree['CHANGES']
+
# try to get parents from first revision - it should fail as no such revision
# exists
first_rev = "33ebe7acec14b25c5f84f35a664803fcab2f7781"
commit = rev_parse(first_rev)
assert len(commit.parents) == 0
assert commit.hexsha == first_rev
- self.failUnlessRaises(BadObject, rev_parse, first_rev+"~")
- self.failUnlessRaises(BadObject, rev_parse, first_rev+"^")
-
+ self.failUnlessRaises(BadObject, rev_parse, first_rev + "~")
+ self.failUnlessRaises(BadObject, rev_parse, first_rev + "^")
+
# short SHA1
commit2 = rev_parse(first_rev[:20])
assert commit2 == commit
commit2 = rev_parse(first_rev[:5])
assert commit2 == commit
-
-
+
# todo: dereference tag into a blob 0.1.7^{blob} - quite a special one
# needs a tag which points to a blob
-
-
+
# ref^0 returns commit being pointed to, same with ref~0, and ^{}
tag = rev_parse('0.1.4')
for token in (('~0', '^0', '^{}')):
assert tag.object == rev_parse('0.1.4%s' % token)
# END handle multiple tokens
-
+
# try partial parsing
max_items = 40
for i, binsha in enumerate(self.rorepo.odb.sha_iter()):
- assert rev_parse(bin_to_hex(binsha)[:8-(i%2)]).binsha == binsha
+ assert rev_parse(bin_to_hex(binsha)[:8 - (i % 2)]).binsha == binsha
if i > max_items:
# this is rather slow currently, as rev_parse returns an object
# which requires accessing packs, it has some additional overhead
break
# END for each binsha in repo
-
+
# missing closing brace commit^{tree
self.failUnlessRaises(ValueError, rev_parse, '0.1.4^{tree')
-
+
# missing starting brace
self.failUnlessRaises(ValueError, rev_parse, '0.1.4^tree}')
-
+
# REVLOG
#######
head = self.rorepo.head
-
+
# need to specify a ref when using the @ syntax
self.failUnlessRaises(BadObject, rev_parse, "%s@{0}" % head.commit.hexsha)
-
+
# uses HEAD.ref by default
assert rev_parse('@{0}') == head.commit
if not head.is_detached:
refspec = '%s@{0}' % head.ref.name
assert rev_parse(refspec) == head.ref.commit
# all additional specs work as well
- assert rev_parse(refspec+"^{tree}") == head.commit.tree
- assert rev_parse(refspec+":CHANGES").type == 'blob'
- #END operate on non-detached head
-
+ assert rev_parse(refspec + "^{tree}") == head.commit.tree
+ assert rev_parse(refspec + ":CHANGES").type == 'blob'
+ # END operate on non-detached head
+
# the most recent previous position of the currently checked out branch
-
+
try:
assert rev_parse('@{1}') != head.commit
except IndexError:
# on new checkouts, there isn't even a single past branch position
# in the log
pass
- #END handle fresh checkouts
-
+ # END handle fresh checkouts
+
# position doesn't exist
self.failUnlessRaises(IndexError, rev_parse, '@{10000}')
-
+
# currently, nothing more is supported
self.failUnlessRaises(NotImplementedError, rev_parse, "@{1 week ago}")
-
+
def test_submodules(self):
assert len(self.rorepo.submodules) == 2 # non-recursive
# in previous configurations, we had recursive repositories so this would compare to 2
# now there is only one left, as gitdb was merged, but we have smmap instead
assert len(list(self.rorepo.iter_submodules())) == 2
-
+
assert isinstance(self.rorepo.submodule("async"), Submodule)
self.failUnlessRaises(ValueError, self.rorepo.submodule, "doesn't exist")
-
+
@with_rw_repo('HEAD', bare=False)
def test_submodule_update(self, rwrepo):
# fails in bare mode
@@ -629,13 +624,11 @@ class RepoBase(TestDBBase):
rwrepo._bare = False
if rwrepo.bare:
rwrepo.bare = False
- #END special repo handling
-
+ # END special repo handling
+
# test create submodule
sm = rwrepo.submodules[0]
sm = rwrepo.create_submodule("my_new_sub", "some_path", join_path_native(self.rorepo.working_tree_dir, sm.path))
assert isinstance(sm, Submodule)
-
+
# note: the rest of this functionality is tested in test_submodule
-
-
diff --git a/git/test/db/cmd/test_base.py b/git/test/db/cmd/test_base.py
index 890c0232..9eee7223 100644
--- a/git/test/db/cmd/test_base.py
+++ b/git/test/db/cmd/test_base.py
@@ -13,79 +13,77 @@ from git.db.cmd.base import *
from git.refs import TagReference, Reference, RemoteReference
+
class TestBase(RepoBase):
RepoCls = CmdCompatibilityGitDB
def test_basics(self):
gdb = self.rorepo
-
+
# partial to complete - works with everything
hexsha = bin_to_hex(gdb.partial_to_complete_sha_hex("0.1.6"))
assert len(hexsha) == 40
-
+
assert bin_to_hex(gdb.partial_to_complete_sha_hex(hexsha[:20])) == hexsha
-
+
# fails with BadObject
for invalid_rev in ("0000", "bad/ref", "super bad"):
self.failUnlessRaises(BadObject, gdb.partial_to_complete_sha_hex, invalid_rev)
-
+
def test_fetch_info(self):
self.failUnlessRaises(ValueError, CmdCmdFetchInfo._from_line, self.rorepo, "nonsense", '')
- self.failUnlessRaises(ValueError, CmdCmdFetchInfo._from_line, self.rorepo, "? [up to date] 0.1.7RC -> origin/0.1.7RC", '')
-
-
+ self.failUnlessRaises(ValueError, CmdCmdFetchInfo._from_line, self.rorepo,
+ "? [up to date] 0.1.7RC -> origin/0.1.7RC", '')
+
def test_fetch_info(self):
# assure we can handle remote-tracking branches
fetch_info_line_fmt = "c437ee5deb8d00cf02f03720693e4c802e99f390 not-for-merge %s '0.3' of git://github.com/gitpython-developers/GitPython"
remote_info_line_fmt = "* [new branch] nomatter -> %s"
fi = CmdFetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % "local/master",
- fetch_info_line_fmt % 'remote-tracking branch')
-
+ remote_info_line_fmt % "local/master",
+ fetch_info_line_fmt % 'remote-tracking branch')
+
# we wouldn't be here if it wouldn't have worked
-
+
# handles non-default refspecs: One can specify a different path in refs/remotes
# or a special path just in refs/something for instance
-
+
fi = CmdFetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % "subdir/tagname",
- fetch_info_line_fmt % 'tag')
-
+ remote_info_line_fmt % "subdir/tagname",
+ fetch_info_line_fmt % 'tag')
+
assert isinstance(fi.ref, TagReference)
assert fi.ref.path.startswith('refs/tags')
-
+
# it could be in a remote direcftory though
fi = CmdFetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % "remotename/tags/tagname",
- fetch_info_line_fmt % 'tag')
-
+ remote_info_line_fmt % "remotename/tags/tagname",
+ fetch_info_line_fmt % 'tag')
+
assert isinstance(fi.ref, TagReference)
assert fi.ref.path.startswith('refs/remotes/')
-
+
# it can also be anywhere !
tag_path = "refs/something/remotename/tags/tagname"
fi = CmdFetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % tag_path,
- fetch_info_line_fmt % 'tag')
-
+ remote_info_line_fmt % tag_path,
+ fetch_info_line_fmt % 'tag')
+
assert isinstance(fi.ref, TagReference)
assert fi.ref.path == tag_path
-
+
# branches default to refs/remotes
fi = CmdFetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % "remotename/branch",
- fetch_info_line_fmt % 'branch')
-
+ remote_info_line_fmt % "remotename/branch",
+ fetch_info_line_fmt % 'branch')
+
assert isinstance(fi.ref, RemoteReference)
assert fi.ref.remote_name == 'remotename'
-
+
# but you can force it anywhere, in which case we only have a references
fi = CmdFetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % "refs/something/branch",
- fetch_info_line_fmt % 'branch')
-
+ remote_info_line_fmt % "refs/something/branch",
+ fetch_info_line_fmt % 'branch')
+
assert type(fi.ref) is Reference
assert fi.ref.path == "refs/something/branch"
-
-
-
diff --git a/git/test/db/dulwich/lib.py b/git/test/db/dulwich/lib.py
index a58469f1..bd6a0564 100644
--- a/git/test/db/dulwich/lib.py
+++ b/git/test/db/dulwich/lib.py
@@ -1,14 +1,15 @@
"""dulwich specific utilities, as well as all the default ones"""
from git.test.lib import (
- InheritedTestMethodsOverrideWrapperMetaClsAutoMixin,
- needs_module_or_skip
- )
+ InheritedTestMethodsOverrideWrapperMetaClsAutoMixin,
+ needs_module_or_skip
+)
__all__ = ['needs_dulwich_or_skip', 'DulwichRequiredMetaMixin']
#{ Decoorators
+
def needs_dulwich_or_skip(func):
"""Skip this test if we have no dulwich - print warning"""
return needs_module_or_skip('dulwich')(func)
@@ -17,6 +18,7 @@ def needs_dulwich_or_skip(func):
#{ MetaClasses
+
class DulwichRequiredMetaMixin(InheritedTestMethodsOverrideWrapperMetaClsAutoMixin):
decorator = [needs_dulwich_or_skip]
diff --git a/git/test/db/dulwich/test_base.py b/git/test/db/dulwich/test_base.py
index ed2f8975..82713103 100644
--- a/git/test/db/dulwich/test_base.py
+++ b/git/test/db/dulwich/test_base.py
@@ -7,7 +7,6 @@ from git.test.lib import TestBase, with_rw_repo
from git.test.db.base import RepoBase
-
try:
import dulwich
except ImportError:
@@ -17,16 +16,15 @@ except ImportError:
else:
# now we know dulwich is available, to do futher imports
from git.db.dulwich.complex import DulwichCompatibilityGitDB as DulwichDB
-
-#END handle imports
+
+# END handle imports
+
class TestDulwichDBBase(RepoBase):
__metaclass__ = DulwichRequiredMetaMixin
RepoCls = DulwichDB
-
+
@needs_dulwich_or_skip
@with_rw_repo('HEAD', bare=False)
def test_basics(self, rw_repo):
db = DulwichDB(rw_repo.working_tree_dir)
-
-
diff --git a/git/test/db/lib.py b/git/test/db/lib.py
index d406382a..74a6509e 100644
--- a/git/test/db/lib.py
+++ b/git/test/db/lib.py
@@ -10,15 +10,15 @@ from git.test.lib import (
fixture_path,
TestBase,
rorepo_dir,
- )
+)
from git.stream import Sha1Writer
from git.base import (
- IStream,
- OStream,
- OInfo
- )
-
+ IStream,
+ OStream,
+ OInfo
+)
+
from git.exc import BadObject
from git.typ import str_blob_type
@@ -28,41 +28,43 @@ from struct import pack
__all__ = ('TestDBBase', 'with_rw_directory', 'with_packs_rw', 'fixture_path')
-
+
+
class TestDBBase(TestBase):
+
"""Base Class providing default functionality to all tests such as:
-
+
- Utility functions provided by the TestCase base of the unittest method such as::
self.fail("todo")
self.failUnlessRaises(...)
-
+
- Class level repository which is considered read-only as it is shared among
all test cases in your type.
Access it using::
self.rorepo # 'ro' stands for read-only
-
+
The rorepo is in fact your current project's git repo. If you refer to specific
shas for your objects, be sure you choose some that are part of the immutable portion
of the project history ( to assure tests don't fail for others ).
-
+
Derived types can override the default repository type to create a different
read-only repo, allowing to test their specific type
"""
-
+
# data
two_lines = "1234\nhello world"
all_data = (two_lines, )
-
+
#{ Configuration
# The repository type to instantiate. It takes at least a path to operate upon
# during instantiation.
RepoCls = None
-
+
# if True, a read-only repo will be provided and RepoCls must be set.
# Otherwise it may remain unset
needs_ro_repo = True
#} END configuration
-
+
@classmethod
def setUp(cls):
"""
@@ -73,8 +75,8 @@ class TestDBBase(TestBase):
if cls is not TestDBBase:
assert cls.RepoCls is not None, "RepoCls class member must be set in %s" % cls
cls.rorepo = cls.RepoCls(rorepo_dir())
- #END handle rorepo
-
+ # END handle rorepo
+
def _assert_object_writing_simple(self, db):
# write a bunch of objects and query their streams and info
null_objs = db.size()
@@ -85,23 +87,22 @@ class TestDBBase(TestBase):
new_istream = db.store(istream)
assert new_istream is istream
assert db.has_object(istream.binsha)
-
+
info = db.info(istream.binsha)
assert isinstance(info, OInfo)
assert info.type == istream.type and info.size == istream.size
-
+
stream = db.stream(istream.binsha)
assert isinstance(stream, OStream)
assert stream.binsha == info.binsha and stream.type == info.type
assert stream.read() == data
# END for each item
-
+
assert db.size() == null_objs + ni
shas = list(db.sha_iter())
assert len(shas) == db.size()
assert len(shas[0]) == 20
-
-
+
def _assert_object_writing(self, db):
"""General tests to verify object writing, compatible to ObjectDBW
:note: requires write access to the database"""
@@ -115,25 +116,25 @@ class TestDBBase(TestBase):
ostream = ostreamcls()
assert isinstance(ostream, Sha1Writer)
# END create ostream
-
+
prev_ostream = db.set_ostream(ostream)
- assert type(prev_ostream) in ostreams or prev_ostream in ostreams
-
+ assert type(prev_ostream) in ostreams or prev_ostream in ostreams
+
istream = IStream(str_blob_type, len(data), StringIO(data))
-
+
# store returns same istream instance, with new sha set
my_istream = db.store(istream)
sha = istream.binsha
assert my_istream is istream
assert db.has_object(sha) != dry_run
- assert len(sha) == 20
-
+ assert len(sha) == 20
+
# verify data - the slow way, we want to run code
if not dry_run:
info = db.info(sha)
assert str_blob_type == info.type
assert info.size == len(data)
-
+
ostream = db.stream(sha)
assert ostream.read() == data
assert ostream.type == str_blob_type
@@ -141,57 +142,58 @@ class TestDBBase(TestBase):
else:
self.failUnlessRaises(BadObject, db.info, sha)
self.failUnlessRaises(BadObject, db.stream, sha)
-
+
# DIRECT STREAM COPY
# our data hase been written in object format to the StringIO
# we pasesd as output stream. No physical database representation
# was created.
- # Test direct stream copy of object streams, the result must be
+ # Test direct stream copy of object streams, the result must be
# identical to what we fed in
ostream.seek(0)
istream.stream = ostream
assert istream.binsha is not None
prev_sha = istream.binsha
-
+
db.set_ostream(ZippedStoreShaWriter())
db.store(istream)
assert istream.binsha == prev_sha
new_ostream = db.ostream()
-
+
# note: only works as long our store write uses the same compression
# level, which is zip_best
assert ostream.getvalue() == new_ostream.getvalue()
# END for each data set
# END for each dry_run mode
-
+
def _assert_object_writing_async(self, db):
"""Test generic object writing using asynchronous access"""
ni = 5000
+
def istream_generator(offset=0, ni=ni):
for data_src in xrange(ni):
data = str(data_src + offset)
yield IStream(str_blob_type, len(data), StringIO(data))
# END for each item
# END generator utility
-
+
# for now, we are very trusty here as we expect it to work if it worked
# in the single-stream case
-
+
# write objects
reader = IteratorReader(istream_generator())
istream_reader = db.store_async(reader)
istreams = istream_reader.read() # read all
assert istream_reader.task().error() is None
assert len(istreams) == ni
-
+
for stream in istreams:
assert stream.error is None
assert len(stream.binsha) == 20
assert isinstance(stream, IStream)
# END assert each stream
-
+
# test has-object-async - we must have all previously added ones
- reader = IteratorReader( istream.binsha for istream in istreams )
+ reader = IteratorReader(istream.binsha for istream in istreams)
hasobject_reader = db.has_object_async(reader)
count = 0
for sha, has_object in hasobject_reader:
@@ -199,11 +201,11 @@ class TestDBBase(TestBase):
count += 1
# END for each sha
assert count == ni
-
+
# read the objects we have just written
- reader = IteratorReader( istream.binsha for istream in istreams )
+ reader = IteratorReader(istream.binsha for istream in istreams)
ostream_reader = db.stream_async(reader)
-
+
# read items individually to prevent hitting possible sys-limits
count = 0
for ostream in ostream_reader:
@@ -212,30 +214,29 @@ class TestDBBase(TestBase):
# END for each ostream
assert ostream_reader.task().error() is None
assert count == ni
-
+
# get info about our items
- reader = IteratorReader( istream.binsha for istream in istreams )
+ reader = IteratorReader(istream.binsha for istream in istreams)
info_reader = db.info_async(reader)
-
+
count = 0
for oinfo in info_reader:
assert isinstance(oinfo, OInfo)
count += 1
# END for each oinfo instance
assert count == ni
-
-
+
# combined read-write using a converter
# add 2500 items, and obtain their output streams
nni = 2500
reader = IteratorReader(istream_generator(offset=ni, ni=nni))
- istream_to_sha = lambda istreams: [ istream.binsha for istream in istreams ]
-
+ istream_to_sha = lambda istreams: [istream.binsha for istream in istreams]
+
istream_reader = db.store_async(reader)
istream_reader.set_post_cb(istream_to_sha)
-
+
ostream_reader = db.stream_async(istream_reader)
-
+
count = 0
# read it individually, otherwise we might run into the ulimit
for ostream in ostream_reader:
@@ -243,5 +244,3 @@ class TestDBBase(TestBase):
count += 1
# END for each ostream
assert count == nni
-
-
diff --git a/git/test/db/py/test_base.py b/git/test/db/py/test_base.py
index 5d076bb2..cd1bed0f 100644
--- a/git/test/db/py/test_base.py
+++ b/git/test/db/py/test_base.py
@@ -7,10 +7,10 @@ from git.test.db.base import RepoBase
from git.db.complex import PureCompatibilityGitDB
+
class TestPyDBBase(RepoBase):
-
+
RepoCls = PureCompatibilityGitDB
-
+
def test_basics(self):
pass
-
diff --git a/git/test/db/py/test_git.py b/git/test/db/py/test_git.py
index 4f5b5fb5..207d2864 100644
--- a/git/test/db/py/test_git.py
+++ b/git/test/db/py/test_git.py
@@ -11,15 +11,16 @@ from git.util import hex_to_bin, bin_to_hex
import os
+
class TestGitDB(TestDBBase):
needs_ro_repo = False
-
+
def test_reading(self):
gdb = PureGitODB(os.path.join(rorepo_dir(), 'objects'))
-
+
# we have packs and loose objects, alternates doesn't necessarily exist
assert 1 < len(gdb.databases()) < 4
-
+
# access should be possible
git_sha = hex_to_bin("5aebcd5cb3340fb31776941d7e4d518a712a8655")
assert isinstance(gdb.info(git_sha), OInfo)
@@ -27,25 +28,24 @@ class TestGitDB(TestDBBase):
assert gdb.size() > 200
sha_list = list(gdb.sha_iter())
assert len(sha_list) == gdb.size()
-
-
- # This is actually a test for compound functionality, but it doesn't
+
+ # This is actually a test for compound functionality, but it doesn't
# have a separate test module
# test partial shas
# this one as uneven and quite short
assert gdb.partial_to_complete_sha_hex('5aebcd') == hex_to_bin("5aebcd5cb3340fb31776941d7e4d518a712a8655")
-
+
# mix even/uneven hexshas
for i, binsha in enumerate(sha_list[:50]):
- assert gdb.partial_to_complete_sha_hex(bin_to_hex(binsha)[:8-(i%2)]) == binsha
+ assert gdb.partial_to_complete_sha_hex(bin_to_hex(binsha)[:8 - (i % 2)]) == binsha
# END for each sha
-
+
self.failUnlessRaises(BadObject, gdb.partial_to_complete_sha_hex, "0000")
-
+
@with_rw_directory
def test_writing(self, path):
gdb = PureGitODB(path)
-
+
# its possible to write objects
self._assert_object_writing(gdb)
self._assert_object_writing_async(gdb)
diff --git a/git/test/db/py/test_loose.py b/git/test/db/py/test_loose.py
index cfb0ca3a..b3ffb64f 100644
--- a/git/test/db/py/test_loose.py
+++ b/git/test/db/py/test_loose.py
@@ -6,31 +6,31 @@ from git.test.db.lib import TestDBBase, with_rw_directory
from git.db.py.loose import PureLooseObjectODB
from git.exc import BadObject
from git.util import bin_to_hex
-
+
+
class TestLooseDB(TestDBBase):
-
+
needs_ro_repo = False
-
+
@with_rw_directory
def test_basics(self, path):
ldb = PureLooseObjectODB(path)
-
+
# write data
self._assert_object_writing(ldb)
self._assert_object_writing_async(ldb)
-
+
# verify sha iteration and size
shas = list(ldb.sha_iter())
assert shas and len(shas[0]) == 20
-
+
assert len(shas) == ldb.size()
-
+
# verify find short object
long_sha = bin_to_hex(shas[-1])
for short_sha in (long_sha[:20], long_sha[:5]):
assert bin_to_hex(ldb.partial_to_complete_sha_hex(short_sha)) == long_sha
# END for each sha
-
+
self.failUnlessRaises(BadObject, ldb.partial_to_complete_sha_hex, '0000')
# raises if no object could be foudn
-
diff --git a/git/test/db/py/test_mem.py b/git/test/db/py/test_mem.py
index bb879554..0468b8af 100644
--- a/git/test/db/py/test_mem.py
+++ b/git/test/db/py/test_mem.py
@@ -5,26 +5,27 @@
from git.test.db.lib import TestDBBase, with_rw_directory
from git.db.py.mem import PureMemoryDB
from git.db.py.loose import PureLooseObjectODB
-
+
+
class TestPureMemoryDB(TestDBBase):
-
+
needs_ro_repo = False
@with_rw_directory
def test_writing(self, path):
mdb = PureMemoryDB()
-
+
# write data
self._assert_object_writing_simple(mdb)
-
+
# test stream copy
ldb = PureLooseObjectODB(path)
assert ldb.size() == 0
num_streams_copied = mdb.stream_copy(mdb.sha_iter(), ldb)
assert num_streams_copied == mdb.size()
-
+
assert ldb.size() == mdb.size()
for sha in mdb.sha_iter():
assert ldb.has_object(sha)
- assert ldb.stream(sha).read() == mdb.stream(sha).read()
+ assert ldb.stream(sha).read() == mdb.stream(sha).read()
# END verify objects where copied and are equal
diff --git a/git/test/db/py/test_pack.py b/git/test/db/py/test_pack.py
index 54dc2e2c..2cb7ea70 100644
--- a/git/test/db/py/test_pack.py
+++ b/git/test/db/py/test_pack.py
@@ -12,48 +12,48 @@ from git.exc import BadObject, AmbiguousObjectName
import os
import random
+
class TestPackDB(TestDBBase):
-
- needs_ro_repo = False
-
+
+ needs_ro_repo = False
+
@with_packs_rw
def test_writing(self, path):
pdb = PurePackedODB(path)
-
+
# on demand, we init our pack cache
num_packs = len(pdb.entities())
assert num_packs
assert pdb._st_mtime != 0
-
- # test pack directory changed:
+
+ # test pack directory changed:
# packs removed - rename a file, should affect the glob
pack_path = pdb.entities()[0].pack().path()
new_pack_path = pack_path + "renamed"
os.rename(pack_path, new_pack_path)
-
+
pdb.update_cache(force=True)
assert len(pdb.entities()) == num_packs - 1
-
+
# packs added
os.rename(new_pack_path, pack_path)
pdb.update_cache(force=True)
assert len(pdb.entities()) == num_packs
-
+
# bang on the cache
# access the Entities directly, as there is no iteration interface
# yet ( or required for now )
sha_list = list(pdb.sha_iter())
assert len(sha_list) == pdb.size()
-
+
# hit all packs in random order
random.shuffle(sha_list)
-
+
for sha in sha_list:
info = pdb.info(sha)
stream = pdb.stream(sha)
# END for each sha to query
-
-
+
# test short finding - be a bit more brutal here
max_bytes = 19
min_bytes = 2
@@ -61,16 +61,16 @@ class TestPackDB(TestDBBase):
for i, sha in enumerate(sha_list):
short_sha = sha[:max((i % max_bytes), min_bytes)]
try:
- assert pdb.partial_to_complete_sha(short_sha, len(short_sha)*2) == sha
+ assert pdb.partial_to_complete_sha(short_sha, len(short_sha) * 2) == sha
except AmbiguousObjectName:
num_ambiguous += 1
- pass # valid, we can have short objects
+ pass # valid, we can have short objects
# END exception handling
# END for each sha to find
-
+
# we should have at least one ambiguous, considering the small sizes
- # but in our pack, there is no ambigious ...
+ # but in our pack, there is no ambigious ...
# assert num_ambiguous
-
+
# non-existing
self.failUnlessRaises(BadObject, pdb.partial_to_complete_sha, "\0\0", 4)
diff --git a/git/test/db/py/test_ref.py b/git/test/db/py/test_ref.py
index dfaf9644..4b5dd134 100644
--- a/git/test/db/py/test_ref.py
+++ b/git/test/db/py/test_ref.py
@@ -6,16 +6,17 @@ from git.test.db.lib import *
from git.db.py.ref import PureReferenceDB
from git.util import (
- NULL_BIN_SHA,
- hex_to_bin
- )
+ NULL_BIN_SHA,
+ hex_to_bin
+)
import os
-
+
+
class TestPureReferenceDB(TestDBBase):
-
+
needs_ro_repo = False
-
+
def make_alt_file(self, alt_path, alt_list):
"""Create an alternates file which contains the given alternates.
The list can be empty"""
@@ -23,40 +24,37 @@ class TestPureReferenceDB(TestDBBase):
for alt in alt_list:
alt_file.write(alt + "\n")
alt_file.close()
-
+
@with_rw_directory
def test_writing(self, path):
- NULL_BIN_SHA = '\0' * 20
-
+ NULL_BIN_SHA = '\0' * 20
+
alt_path = os.path.join(path, 'alternates')
rdb = PureReferenceDB(alt_path)
assert len(rdb.databases()) == 0
assert rdb.size() == 0
assert len(list(rdb.sha_iter())) == 0
-
+
# try empty, non-existing
assert not rdb.has_object(NULL_BIN_SHA)
-
-
+
# setup alternate file
# add two, one is invalid
own_repo_path = fixture_path('../../../.git/objects') # use own repo
self.make_alt_file(alt_path, [own_repo_path, "invalid/path"])
rdb.update_cache()
assert len(rdb.databases()) == 1
-
+
# we should now find a default revision of ours
git_sha = hex_to_bin("5aebcd5cb3340fb31776941d7e4d518a712a8655")
assert rdb.has_object(git_sha)
-
+
# remove valid
self.make_alt_file(alt_path, ["just/one/invalid/path"])
rdb.update_cache()
assert len(rdb.databases()) == 0
-
+
# add valid
self.make_alt_file(alt_path, [own_repo_path])
rdb.update_cache()
assert len(rdb.databases()) == 1
-
-
diff --git a/git/test/db/pygit2/lib.py b/git/test/db/pygit2/lib.py
index fab762e7..76441333 100644
--- a/git/test/db/pygit2/lib.py
+++ b/git/test/db/pygit2/lib.py
@@ -1,14 +1,15 @@
"""pygit2 specific utilities, as well as all the default ones"""
from git.test.lib import (
- InheritedTestMethodsOverrideWrapperMetaClsAutoMixin,
- needs_module_or_skip
- )
+ InheritedTestMethodsOverrideWrapperMetaClsAutoMixin,
+ needs_module_or_skip
+)
__all__ = ['needs_pygit2_or_skip', 'Pygit2RequiredMetaMixin']
#{ Decoorators
+
def needs_pygit2_or_skip(func):
"""Skip this test if we have no pygit2 - print warning"""
return needs_module_or_skip('pygit2')(func)
@@ -17,6 +18,7 @@ def needs_pygit2_or_skip(func):
#{ MetaClasses
+
class Pygit2RequiredMetaMixin(InheritedTestMethodsOverrideWrapperMetaClsAutoMixin):
decorator = [needs_pygit2_or_skip]
diff --git a/git/test/db/pygit2/test_base.py b/git/test/db/pygit2/test_base.py
index 52ee24f5..dc1b0ac5 100644
--- a/git/test/db/pygit2/test_base.py
+++ b/git/test/db/pygit2/test_base.py
@@ -7,7 +7,6 @@ from git.test.lib import TestBase, with_rw_repo
from git.test.db.base import RepoBase
-
try:
import pygit2
except ImportError:
@@ -17,16 +16,15 @@ except ImportError:
else:
# now we know pygit2 is available, to do futher imports
from git.db.pygit2.complex import Pygit2CompatibilityGitDB as Pygit2DB
-
-#END handle imports
+
+# END handle imports
+
class TestPyGit2DBBase(RepoBase):
__metaclass__ = Pygit2RequiredMetaMixin
RepoCls = Pygit2DB
-
+
@needs_pygit2_or_skip
@with_rw_repo('HEAD', bare=False)
def test_basics(self, rw_repo):
db = Pygit2DB(rw_repo.working_tree_dir)
-
-
diff --git a/git/test/db/test_base.py b/git/test/db/test_base.py
index 78da9f04..39c935a6 100644
--- a/git/test/db/test_base.py
+++ b/git/test/db/test_base.py
@@ -5,6 +5,7 @@
from lib import *
from git.db import RefSpec
+
class TestBase(TestDBBase):
needs_ro_repo = False
@@ -17,4 +18,3 @@ class TestBase(TestDBBase):
assert rs.delete_destination()
assert rs.source is None
assert rs.destination == "something"
-