diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2014-02-09 21:23:51 +0100 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2014-02-09 21:23:51 +0100 |
commit | 15dd52cd578691930cea194e003fa80dd02f40eb (patch) | |
tree | ef4c9c5f705dd1ca743b7ceefe5b91b11ad15010 /git/test/db | |
parent | 660bdca125aa9dcca7a7730535bec433edb8ba02 (diff) | |
download | gitpython-15dd52cd578691930cea194e003fa80dd02f40eb.tar.gz |
tabs to 4 spaces - overall state of this branch is desolate, but fixable. Needs plenty of work
Diffstat (limited to 'git/test/db')
-rw-r--r-- | git/test/db/base.py | 1192 | ||||
-rw-r--r-- | git/test/db/cmd/test_base.py | 148 | ||||
-rw-r--r-- | git/test/db/dulwich/lib.py | 12 | ||||
-rw-r--r-- | git/test/db/dulwich/test_base.py | 32 | ||||
-rw-r--r-- | git/test/db/lib.py | 456 | ||||
-rw-r--r-- | git/test/db/py/test_base.py | 12 | ||||
-rw-r--r-- | git/test/db/py/test_git.py | 74 | ||||
-rw-r--r-- | git/test/db/py/test_loose.py | 54 | ||||
-rw-r--r-- | git/test/db/py/test_mem.py | 42 | ||||
-rw-r--r-- | git/test/db/py/test_pack.py | 122 | ||||
-rw-r--r-- | git/test/db/py/test_ref.py | 102 | ||||
-rw-r--r-- | git/test/db/pygit2/lib.py | 12 | ||||
-rw-r--r-- | git/test/db/pygit2/test_base.py | 32 | ||||
-rw-r--r-- | git/test/db/test_base.py | 20 |
14 files changed, 1155 insertions, 1155 deletions
diff --git a/git/test/db/base.py b/git/test/db/base.py index 0f5eebe4..202a4353 100644 --- a/git/test/db/base.py +++ b/git/test/db/base.py @@ -23,619 +23,619 @@ from git.db.compat import RepoCompatibilityInterface class RepoGlobalsItemDeletorMetaCls(GlobalsItemDeletorMetaCls): - ModuleToDelete = 'RepoBase' - + ModuleToDelete = 'RepoBase' + class RepoBase(TestDBBase): - """Basic test for everything a fully implemented repository should support""" - __metaclass__ = RepoGlobalsItemDeletorMetaCls - - def test_new_should_raise_on_invalid_repo_location(self): - self.failUnlessRaises(InvalidGitRepositoryError, self.RepoCls, tempfile.gettempdir()) + """Basic test for everything a fully implemented repository should support""" + __metaclass__ = RepoGlobalsItemDeletorMetaCls + + def test_new_should_raise_on_invalid_repo_location(self): + self.failUnlessRaises(InvalidGitRepositoryError, self.RepoCls, tempfile.gettempdir()) - def test_new_should_raise_on_non_existant_path(self): - self.failUnlessRaises(NoSuchPathError, self.RepoCls, "repos/foobar") + def test_new_should_raise_on_non_existant_path(self): + self.failUnlessRaises(NoSuchPathError, self.RepoCls, "repos/foobar") - def test_repo_creation_from_different_paths(self): - r_from_gitdir = self.RepoCls(self.rorepo.git_dir) - assert r_from_gitdir.git_dir == self.rorepo.git_dir - assert r_from_gitdir.git_dir.endswith('.git') - assert not self.rorepo.git.working_dir.endswith('.git') - assert r_from_gitdir.git.working_dir == self.rorepo.git.working_dir + def test_repo_creation_from_different_paths(self): + r_from_gitdir = self.RepoCls(self.rorepo.git_dir) + assert r_from_gitdir.git_dir == self.rorepo.git_dir + assert r_from_gitdir.git_dir.endswith('.git') + assert not self.rorepo.git.working_dir.endswith('.git') + assert r_from_gitdir.git.working_dir == self.rorepo.git.working_dir - def test_description(self): - txt = "Test repository" - self.rorepo.description = txt - assert_equal(self.rorepo.description, txt) + def test_description(self): + txt = "Test repository" + self.rorepo.description = txt + assert_equal(self.rorepo.description, txt) - def test_heads_should_return_array_of_head_objects(self): - for head in self.rorepo.heads: - assert_equal(Head, head.__class__) + def test_heads_should_return_array_of_head_objects(self): + for head in self.rorepo.heads: + assert_equal(Head, head.__class__) - def test_heads_should_populate_head_data(self): - for head in self.rorepo.heads: - assert head.name - assert isinstance(head.commit,Commit) - # END for each head - - assert isinstance(self.rorepo.heads.master, Head) - assert isinstance(self.rorepo.heads['master'], Head) - - def test_tree_from_revision(self): - tree = self.rorepo.tree('0.1.6') - assert len(tree.hexsha) == 40 - assert tree.type == "tree" - assert self.rorepo.tree(tree) == tree - - # try from invalid revision that does not exist - self.failUnlessRaises(BadObject, self.rorepo.tree, 'hello world') - - def test_commit_from_revision(self): - commit = self.rorepo.commit('0.1.4') - assert commit.type == 'commit' - assert self.rorepo.commit(commit) == commit + def test_heads_should_populate_head_data(self): + for head in self.rorepo.heads: + assert head.name + assert isinstance(head.commit,Commit) + # END for each head + + assert isinstance(self.rorepo.heads.master, Head) + assert isinstance(self.rorepo.heads['master'], Head) + + def test_tree_from_revision(self): + tree = self.rorepo.tree('0.1.6') + assert len(tree.hexsha) == 40 + assert tree.type == "tree" + assert self.rorepo.tree(tree) == tree + + # try from invalid revision that does not exist + self.failUnlessRaises(BadObject, self.rorepo.tree, 'hello world') + + def test_commit_from_revision(self): + commit = self.rorepo.commit('0.1.4') + assert commit.type == 'commit' + assert self.rorepo.commit(commit) == commit - def test_commits(self): - mc = 10 - commits = list(self.rorepo.iter_commits('0.1.6', max_count=mc)) - assert len(commits) == mc - - c = commits[0] - assert_equal('9a4b1d4d11eee3c5362a4152216376e634bd14cf', c.hexsha) - assert_equal(["c76852d0bff115720af3f27acdb084c59361e5f6"], [p.hexsha for p in c.parents]) - assert_equal("ce41fc29549042f1aa09cc03174896cf23f112e3", c.tree.hexsha) - assert_equal("Michael Trier", c.author.name) - assert_equal("mtrier@gmail.com", c.author.email) - assert_equal(1232829715, c.authored_date) - assert_equal(5*3600, c.author_tz_offset) - assert_equal("Michael Trier", c.committer.name) - assert_equal("mtrier@gmail.com", c.committer.email) - assert_equal(1232829715, c.committed_date) - assert_equal(5*3600, c.committer_tz_offset) - assert_equal("Bumped version 0.1.6\n", c.message) + def test_commits(self): + mc = 10 + commits = list(self.rorepo.iter_commits('0.1.6', max_count=mc)) + assert len(commits) == mc + + c = commits[0] + assert_equal('9a4b1d4d11eee3c5362a4152216376e634bd14cf', c.hexsha) + assert_equal(["c76852d0bff115720af3f27acdb084c59361e5f6"], [p.hexsha for p in c.parents]) + assert_equal("ce41fc29549042f1aa09cc03174896cf23f112e3", c.tree.hexsha) + assert_equal("Michael Trier", c.author.name) + assert_equal("mtrier@gmail.com", c.author.email) + assert_equal(1232829715, c.authored_date) + assert_equal(5*3600, c.author_tz_offset) + assert_equal("Michael Trier", c.committer.name) + assert_equal("mtrier@gmail.com", c.committer.email) + assert_equal(1232829715, c.committed_date) + assert_equal(5*3600, c.committer_tz_offset) + assert_equal("Bumped version 0.1.6\n", c.message) - c = commits[1] - assert isinstance(c.parents, tuple) + c = commits[1] + assert isinstance(c.parents, tuple) - def test_trees(self): - mc = 30 - num_trees = 0 - for tree in self.rorepo.iter_trees('0.1.5', max_count=mc): - num_trees += 1 - assert isinstance(tree, Tree) - # END for each tree - assert num_trees == mc + def test_trees(self): + mc = 30 + num_trees = 0 + for tree in self.rorepo.iter_trees('0.1.5', max_count=mc): + num_trees += 1 + assert isinstance(tree, Tree) + # END for each tree + assert num_trees == mc - def _assert_empty_repo(self, repo): - # test all kinds of things with an empty, freshly initialized repo. - # It should throw good errors - - # entries should be empty - assert len(repo.index.entries) == 0 - - # head is accessible - assert repo.head - assert repo.head.ref - assert not repo.head.is_valid() - - # we can change the head to some other ref - head_ref = Head.from_path(repo, Head.to_full_path('some_head')) - assert not head_ref.is_valid() - repo.head.ref = head_ref - - # is_dirty can handle all kwargs - for args in ((1, 0, 0), (0, 1, 0), (0, 0, 1)): - assert not repo.is_dirty(*args) - # END for each arg - - # we can add a file to the index ( if we are not bare ) - if not repo.bare: - pass - # END test repos with working tree - + def _assert_empty_repo(self, repo): + # test all kinds of things with an empty, freshly initialized repo. + # It should throw good errors + + # entries should be empty + assert len(repo.index.entries) == 0 + + # head is accessible + assert repo.head + assert repo.head.ref + assert not repo.head.is_valid() + + # we can change the head to some other ref + head_ref = Head.from_path(repo, Head.to_full_path('some_head')) + assert not head_ref.is_valid() + repo.head.ref = head_ref + + # is_dirty can handle all kwargs + for args in ((1, 0, 0), (0, 1, 0), (0, 0, 1)): + assert not repo.is_dirty(*args) + # END for each arg + + # we can add a file to the index ( if we are not bare ) + if not repo.bare: + pass + # END test repos with working tree + - def test_init(self): - prev_cwd = os.getcwd() - os.chdir(tempfile.gettempdir()) - git_dir_rela = "repos/foo/bar.git" - del_dir_abs = os.path.abspath("repos") - git_dir_abs = os.path.abspath(git_dir_rela) - try: - # with specific path - for path in (git_dir_rela, git_dir_abs): - r = self.RepoCls.init(path=path, bare=True) - assert isinstance(r, self.RepoCls) - assert r.bare == True - assert os.path.isdir(r.git_dir) - - self._assert_empty_repo(r) - - # test clone - clone_path = path + "_clone" - rc = r.clone(clone_path) - self._assert_empty_repo(rc) - - - try: - shutil.rmtree(clone_path) - except OSError: - # when relative paths are used, the clone may actually be inside - # of the parent directory - pass - # END exception handling - - # try again, this time with the absolute version - rc = self.RepoCls.clone_from(r.git_dir, clone_path) - self._assert_empty_repo(rc) - - shutil.rmtree(git_dir_abs) - try: - shutil.rmtree(clone_path) - except OSError: - # when relative paths are used, the clone may actually be inside - # of the parent directory - pass - # END exception handling - - # END for each path - - os.makedirs(git_dir_rela) - os.chdir(git_dir_rela) - r = self.RepoCls.init(bare=False) - r.bare == False - - self._assert_empty_repo(r) - finally: - try: - shutil.rmtree(del_dir_abs) - except OSError: - pass - os.chdir(prev_cwd) - # END restore previous state - - def test_bare_property(self): - if isinstance(self.rorepo, RepoCompatibilityInterface): - self.rorepo.bare - #END handle compatability - self.rorepo.is_bare + def test_init(self): + prev_cwd = os.getcwd() + os.chdir(tempfile.gettempdir()) + git_dir_rela = "repos/foo/bar.git" + del_dir_abs = os.path.abspath("repos") + git_dir_abs = os.path.abspath(git_dir_rela) + try: + # with specific path + for path in (git_dir_rela, git_dir_abs): + r = self.RepoCls.init(path=path, bare=True) + assert isinstance(r, self.RepoCls) + assert r.bare == True + assert os.path.isdir(r.git_dir) + + self._assert_empty_repo(r) + + # test clone + clone_path = path + "_clone" + rc = r.clone(clone_path) + self._assert_empty_repo(rc) + + + try: + shutil.rmtree(clone_path) + except OSError: + # when relative paths are used, the clone may actually be inside + # of the parent directory + pass + # END exception handling + + # try again, this time with the absolute version + rc = self.RepoCls.clone_from(r.git_dir, clone_path) + self._assert_empty_repo(rc) + + shutil.rmtree(git_dir_abs) + try: + shutil.rmtree(clone_path) + except OSError: + # when relative paths are used, the clone may actually be inside + # of the parent directory + pass + # END exception handling + + # END for each path + + os.makedirs(git_dir_rela) + os.chdir(git_dir_rela) + r = self.RepoCls.init(bare=False) + r.bare == False + + self._assert_empty_repo(r) + finally: + try: + shutil.rmtree(del_dir_abs) + except OSError: + pass + os.chdir(prev_cwd) + # END restore previous state + + def test_bare_property(self): + if isinstance(self.rorepo, RepoCompatibilityInterface): + self.rorepo.bare + #END handle compatability + self.rorepo.is_bare - def test_daemon_export(self): - orig_val = self.rorepo.daemon_export - self.rorepo.daemon_export = not orig_val - assert self.rorepo.daemon_export == ( not orig_val ) - self.rorepo.daemon_export = orig_val - assert self.rorepo.daemon_export == orig_val + def test_daemon_export(self): + orig_val = self.rorepo.daemon_export + self.rorepo.daemon_export = not orig_val + assert self.rorepo.daemon_export == ( not orig_val ) + self.rorepo.daemon_export = orig_val + assert self.rorepo.daemon_export == orig_val - def test_alternates(self): - cur_alternates = self.rorepo.alternates - # empty alternates - self.rorepo.alternates = [] - assert self.rorepo.alternates == [] - alts = [ "other/location", "this/location" ] - self.rorepo.alternates = alts - assert alts == self.rorepo.alternates - self.rorepo.alternates = cur_alternates + def test_alternates(self): + cur_alternates = self.rorepo.alternates + # empty alternates + self.rorepo.alternates = [] + assert self.rorepo.alternates == [] + alts = [ "other/location", "this/location" ] + self.rorepo.alternates = alts + assert alts == self.rorepo.alternates + self.rorepo.alternates = cur_alternates - def test_repr(self): - assert_equal('<git.Repo "%s">' % rorepo_dir(), repr(self.rorepo)) + def test_repr(self): + assert_equal('<git.Repo "%s">' % rorepo_dir(), repr(self.rorepo)) - def test_is_dirty_with_bare_repository(self): - orig_value = self.rorepo._bare - self.rorepo._bare = True - assert_false(self.rorepo.is_dirty()) - self.rorepo._bare = orig_value + def test_is_dirty_with_bare_repository(self): + orig_value = self.rorepo._bare + self.rorepo._bare = True + assert_false(self.rorepo.is_dirty()) + self.rorepo._bare = orig_value - def test_is_dirty(self): - self.rorepo._bare = False - for index in (0,1): - for working_tree in (0,1): - for untracked_files in (0,1): - assert self.rorepo.is_dirty(index, working_tree, untracked_files) in (True, False) - # END untracked files - # END working tree - # END index - orig_val = self.rorepo._bare - self.rorepo._bare = True - assert self.rorepo.is_dirty() == False - self.rorepo._bare = orig_val + def test_is_dirty(self): + self.rorepo._bare = False + for index in (0,1): + for working_tree in (0,1): + for untracked_files in (0,1): + assert self.rorepo.is_dirty(index, working_tree, untracked_files) in (True, False) + # END untracked files + # END working tree + # END index + orig_val = self.rorepo._bare + self.rorepo._bare = True + assert self.rorepo.is_dirty() == False + self.rorepo._bare = orig_val - def test_head(self): - assert self.rorepo.head.reference.object == self.rorepo.active_branch.object + def test_head(self): + assert self.rorepo.head.reference.object == self.rorepo.active_branch.object - def test_index(self): - index = self.rorepo.index - assert isinstance(index, IndexFile) - - def test_tag(self): - assert self.rorepo.tag('0.1.5').commit - assert self.rorepo.tag('refs/tags/0.1.5').commit - - def test_archive(self): - tmpfile = os.tmpfile() - self.rorepo.archive(tmpfile, '0.1.5') - assert tmpfile.tell() - - @patch_object(Git, '_call_process') - def test_should_display_blame_information(self, git): - git.return_value = fixture('blame') - b = self.rorepo.blame( 'master', 'lib/git.py') - assert_equal(13, len(b)) - assert_equal( 2, len(b[0]) ) - # assert_equal(25, reduce(lambda acc, x: acc + len(x[-1]), b)) - assert_equal(hash(b[0][0]), hash(b[9][0])) - c = b[0][0] - assert_true(git.called) - assert_equal(git.call_args, (('blame', 'master', '--', 'lib/git.py'), {'p': True})) - - assert_equal('634396b2f541a9f2d58b00be1a07f0c358b999b3', c.hexsha) - assert_equal('Tom Preston-Werner', c.author.name) - assert_equal('tom@mojombo.com', c.author.email) - assert_equal(1191997100, c.authored_date) - assert_equal('Tom Preston-Werner', c.committer.name) - assert_equal('tom@mojombo.com', c.committer.email) - assert_equal(1191997100, c.committed_date) - assert_equal('initial grit setup', c.message) - - # test the 'lines per commit' entries - tlist = b[0][1] - assert_true( tlist ) - assert_true( isinstance( tlist[0], basestring ) ) - assert_true( len( tlist ) < sum( len(t) for t in tlist ) ) # test for single-char bug - - def test_blame_real(self): - c = 0 - for item in self.rorepo.head.commit.tree.traverse( - predicate=lambda i, d: i.type == 'blob' and i.path.endswith('.py')): - c += 1 - b = self.rorepo.blame(self.rorepo.head, item.path) - #END for each item to traverse - assert c - - def test_untracked_files(self): - base = self.rorepo.working_tree_dir - files = ( join_path_native(base, "__test_myfile"), - join_path_native(base, "__test_other_file") ) - num_recently_untracked = 0 - try: - for fpath in files: - fd = open(fpath,"wb") - fd.close() - # END for each filename - untracked_files = self.rorepo.untracked_files - num_recently_untracked = len(untracked_files) - - # assure we have all names - they are relative to the git-dir - num_test_untracked = 0 - for utfile in untracked_files: - num_test_untracked += join_path_native(base, utfile) in files - assert len(files) == num_test_untracked - finally: - for fpath in files: - if os.path.isfile(fpath): - os.remove(fpath) - # END handle files - - assert len(self.rorepo.untracked_files) == (num_recently_untracked - len(files)) - - def test_config_reader(self): - reader = self.rorepo.config_reader() # all config files - assert reader.read_only - reader = self.rorepo.config_reader("repository") # single config file - assert reader.read_only - - def test_config_writer(self): - for config_level in self.rorepo.config_level: - try: - writer = self.rorepo.config_writer(config_level) - assert not writer.read_only - except IOError: - # its okay not to get a writer for some configuration files if we - # have no permissions - pass - # END for each config level - - def test_creation_deletion(self): - # just a very quick test to assure it generally works. There are - # specialized cases in the test_refs module - head = self.rorepo.create_head("new_head", "HEAD~1") - self.rorepo.delete_head(head) - - tag = self.rorepo.create_tag("new_tag", "HEAD~2") - self.rorepo.delete_tag(tag) - self.rorepo.config_writer() - remote = self.rorepo.create_remote("new_remote", "git@server:repo.git") - self.rorepo.delete_remote(remote) - - def test_comparison_and_hash(self): - # this is only a preliminary test, more testing done in test_index - assert self.rorepo == self.rorepo and not (self.rorepo != self.rorepo) - assert len(set((self.rorepo, self.rorepo))) == 1 - - def test_git_cmd(self): - # test CatFileContentStream, just to be very sure we have no fencepost errors - # last \n is the terminating newline that it expects - l1 = "0123456789\n" - l2 = "abcdefghijklmnopqrstxy\n" - l3 = "z\n" - d = "%s%s%s\n" % (l1, l2, l3) - - l1p = l1[:5] - - # full size - # size is without terminating newline - def mkfull(): - return Git.CatFileContentStream(len(d)-1, StringIO(d)) - - ts = 5 - def mktiny(): - return Git.CatFileContentStream(ts, StringIO(d)) - - # readlines no limit - s = mkfull() - lines = s.readlines() - assert len(lines) == 3 and lines[-1].endswith('\n') - assert s._stream.tell() == len(d) # must have scrubbed to the end - - # realines line limit - s = mkfull() - lines = s.readlines(5) - assert len(lines) == 1 - - # readlines on tiny sections - s = mktiny() - lines = s.readlines() - assert len(lines) == 1 and lines[0] == l1p - assert s._stream.tell() == ts+1 - - # readline no limit - s = mkfull() - assert s.readline() == l1 - assert s.readline() == l2 - assert s.readline() == l3 - assert s.readline() == '' - assert s._stream.tell() == len(d) - - # readline limit - s = mkfull() - assert s.readline(5) == l1p - assert s.readline() == l1[5:] - - # readline on tiny section - s = mktiny() - assert s.readline() == l1p - assert s.readline() == '' - assert s._stream.tell() == ts+1 - - # read no limit - s = mkfull() - assert s.read() == d[:-1] - assert s.read() == '' - assert s._stream.tell() == len(d) - - # read limit - s = mkfull() - assert s.read(5) == l1p - assert s.read(6) == l1[5:] - assert s._stream.tell() == 5 + 6 # its not yet done - - # read tiny - s = mktiny() - assert s.read(2) == l1[:2] - assert s._stream.tell() == 2 - assert s.read() == l1[2:ts] - assert s._stream.tell() == ts+1 - - def _assert_rev_parse_types(self, name, rev_obj): - rev_parse = self.rorepo.rev_parse - - if rev_obj.type == 'tag': - rev_obj = rev_obj.object - - # tree and blob type - obj = rev_parse(name + '^{tree}') - assert obj == rev_obj.tree - - obj = rev_parse(name + ':CHANGES') - assert obj.type == 'blob' and obj.path == 'CHANGES' - assert rev_obj.tree['CHANGES'] == obj - - - def _assert_rev_parse(self, name): - """tries multiple different rev-parse syntaxes with the given name - :return: parsed object""" - rev_parse = self.rorepo.rev_parse - orig_obj = rev_parse(name) - if orig_obj.type == 'tag': - obj = orig_obj.object - else: - obj = orig_obj - # END deref tags by default - - # try history - rev = name + "~" - obj2 = rev_parse(rev) - assert obj2 == obj.parents[0] - self._assert_rev_parse_types(rev, obj2) - - # history with number - ni = 11 - history = [obj.parents[0]] - for pn in range(ni): - history.append(history[-1].parents[0]) - # END get given amount of commits - - for pn in range(11): - rev = name + "~%i" % (pn+1) - obj2 = rev_parse(rev) - assert obj2 == history[pn] - self._assert_rev_parse_types(rev, obj2) - # END history check - - # parent ( default ) - rev = name + "^" - obj2 = rev_parse(rev) - assert obj2 == obj.parents[0] - self._assert_rev_parse_types(rev, obj2) - - # parent with number - for pn, parent in enumerate(obj.parents): - rev = name + "^%i" % (pn+1) - assert rev_parse(rev) == parent - self._assert_rev_parse_types(rev, parent) - # END for each parent - - return orig_obj - - @with_rw_repo('HEAD', bare=False) - def test_rw_rev_parse(self, rwrepo): - # verify it does not confuse branches with hexsha ids - ahead = rwrepo.create_head('aaaaaaaa') - assert(rwrepo.rev_parse(str(ahead)) == ahead.commit) - - def test_rev_parse(self): - rev_parse = self.rorepo.rev_parse - - # try special case: This one failed at some point, make sure its fixed - assert rev_parse("33ebe").hexsha == "33ebe7acec14b25c5f84f35a664803fcab2f7781" - - # start from reference - num_resolved = 0 - - for ref in Reference.iter_items(self.rorepo): - path_tokens = ref.path.split("/") - for pt in range(len(path_tokens)): - path_section = '/'.join(path_tokens[-(pt+1):]) - try: - obj = self._assert_rev_parse(path_section) - assert obj.type == ref.object.type - num_resolved += 1 - except BadObject: - print "failed on %s" % path_section - # is fine, in case we have something like 112, which belongs to remotes/rname/merge-requests/112 - pass - # END exception handling - # END for each token - # END for each reference - assert num_resolved - - # it works with tags ! - tag = self._assert_rev_parse('0.1.4') - assert tag.type == 'tag' - - # try full sha directly ( including type conversion ) - assert tag.object == rev_parse(tag.object.hexsha) - self._assert_rev_parse_types(tag.object.hexsha, tag.object) - - - # multiple tree types result in the same tree: HEAD^{tree}^{tree}:CHANGES - rev = '0.1.4^{tree}^{tree}' - assert rev_parse(rev) == tag.object.tree - assert rev_parse(rev+':CHANGES') == tag.object.tree['CHANGES'] - - - # try to get parents from first revision - it should fail as no such revision - # exists - first_rev = "33ebe7acec14b25c5f84f35a664803fcab2f7781" - commit = rev_parse(first_rev) - assert len(commit.parents) == 0 - assert commit.hexsha == first_rev - self.failUnlessRaises(BadObject, rev_parse, first_rev+"~") - self.failUnlessRaises(BadObject, rev_parse, first_rev+"^") - - # short SHA1 - commit2 = rev_parse(first_rev[:20]) - assert commit2 == commit - commit2 = rev_parse(first_rev[:5]) - assert commit2 == commit - - - # todo: dereference tag into a blob 0.1.7^{blob} - quite a special one - # needs a tag which points to a blob - - - # ref^0 returns commit being pointed to, same with ref~0, and ^{} - tag = rev_parse('0.1.4') - for token in (('~0', '^0', '^{}')): - assert tag.object == rev_parse('0.1.4%s' % token) - # END handle multiple tokens - - # try partial parsing - max_items = 40 - for i, binsha in enumerate(self.rorepo.odb.sha_iter()): - assert rev_parse(bin_to_hex(binsha)[:8-(i%2)]).binsha == binsha - if i > max_items: - # this is rather slow currently, as rev_parse returns an object - # which requires accessing packs, it has some additional overhead - break - # END for each binsha in repo - - # missing closing brace commit^{tree - self.failUnlessRaises(ValueError, rev_parse, '0.1.4^{tree') - - # missing starting brace - self.failUnlessRaises(ValueError, rev_parse, '0.1.4^tree}') - - # REVLOG - ####### - head = self.rorepo.head - - # need to specify a ref when using the @ syntax - self.failUnlessRaises(BadObject, rev_parse, "%s@{0}" % head.commit.hexsha) - - # uses HEAD.ref by default - assert rev_parse('@{0}') == head.commit - if not head.is_detached: - refspec = '%s@{0}' % head.ref.name - assert rev_parse(refspec) == head.ref.commit - # all additional specs work as well - assert rev_parse(refspec+"^{tree}") == head.commit.tree - assert rev_parse(refspec+":CHANGES").type == 'blob' - #END operate on non-detached head - - # the most recent previous position of the currently checked out branch - - try: - assert rev_parse('@{1}') != head.commit - except IndexError: - # on new checkouts, there isn't even a single past branch position - # in the log - pass - #END handle fresh checkouts - - # position doesn't exist - self.failUnlessRaises(IndexError, rev_parse, '@{10000}') - - # currently, nothing more is supported - self.failUnlessRaises(NotImplementedError, rev_parse, "@{1 week ago}") - - def test_submodules(self): - assert len(self.rorepo.submodules) == 2 # non-recursive - # in previous configurations, we had recursive repositories so this would compare to 2 - # now there is only one left, as gitdb was merged, but we have smmap instead - assert len(list(self.rorepo.iter_submodules())) == 2 - - assert isinstance(self.rorepo.submodule("async"), Submodule) - self.failUnlessRaises(ValueError, self.rorepo.submodule, "doesn't exist") - - @with_rw_repo('HEAD', bare=False) - def test_submodule_update(self, rwrepo): - # fails in bare mode - rwrepo._bare = True - # special handling: there are repo implementations which have a bare attribute. IN that case, set it directly - if not rwrepo.bare: - rwrepo.bare = True - self.failUnlessRaises(InvalidGitRepositoryError, rwrepo.submodule_update) - rwrepo._bare = False - if rwrepo.bare: - rwrepo.bare = False - #END special repo handling - - # test create submodule - sm = rwrepo.submodules[0] - sm = rwrepo.create_submodule("my_new_sub", "some_path", join_path_native(self.rorepo.working_tree_dir, sm.path)) - assert isinstance(sm, Submodule) - - # note: the rest of this functionality is tested in test_submodule - - + def test_index(self): + index = self.rorepo.index + assert isinstance(index, IndexFile) + + def test_tag(self): + assert self.rorepo.tag('0.1.5').commit + assert self.rorepo.tag('refs/tags/0.1.5').commit + + def test_archive(self): + tmpfile = os.tmpfile() + self.rorepo.archive(tmpfile, '0.1.5') + assert tmpfile.tell() + + @patch_object(Git, '_call_process') + def test_should_display_blame_information(self, git): + git.return_value = fixture('blame') + b = self.rorepo.blame( 'master', 'lib/git.py') + assert_equal(13, len(b)) + assert_equal( 2, len(b[0]) ) + # assert_equal(25, reduce(lambda acc, x: acc + len(x[-1]), b)) + assert_equal(hash(b[0][0]), hash(b[9][0])) + c = b[0][0] + assert_true(git.called) + assert_equal(git.call_args, (('blame', 'master', '--', 'lib/git.py'), {'p': True})) + + assert_equal('634396b2f541a9f2d58b00be1a07f0c358b999b3', c.hexsha) + assert_equal('Tom Preston-Werner', c.author.name) + assert_equal('tom@mojombo.com', c.author.email) + assert_equal(1191997100, c.authored_date) + assert_equal('Tom Preston-Werner', c.committer.name) + assert_equal('tom@mojombo.com', c.committer.email) + assert_equal(1191997100, c.committed_date) + assert_equal('initial grit setup', c.message) + + # test the 'lines per commit' entries + tlist = b[0][1] + assert_true( tlist ) + assert_true( isinstance( tlist[0], basestring ) ) + assert_true( len( tlist ) < sum( len(t) for t in tlist ) ) # test for single-char bug + + def test_blame_real(self): + c = 0 + for item in self.rorepo.head.commit.tree.traverse( + predicate=lambda i, d: i.type == 'blob' and i.path.endswith('.py')): + c += 1 + b = self.rorepo.blame(self.rorepo.head, item.path) + #END for each item to traverse + assert c + + def test_untracked_files(self): + base = self.rorepo.working_tree_dir + files = ( join_path_native(base, "__test_myfile"), + join_path_native(base, "__test_other_file") ) + num_recently_untracked = 0 + try: + for fpath in files: + fd = open(fpath,"wb") + fd.close() + # END for each filename + untracked_files = self.rorepo.untracked_files + num_recently_untracked = len(untracked_files) + + # assure we have all names - they are relative to the git-dir + num_test_untracked = 0 + for utfile in untracked_files: + num_test_untracked += join_path_native(base, utfile) in files + assert len(files) == num_test_untracked + finally: + for fpath in files: + if os.path.isfile(fpath): + os.remove(fpath) + # END handle files + + assert len(self.rorepo.untracked_files) == (num_recently_untracked - len(files)) + + def test_config_reader(self): + reader = self.rorepo.config_reader() # all config files + assert reader.read_only + reader = self.rorepo.config_reader("repository") # single config file + assert reader.read_only + + def test_config_writer(self): + for config_level in self.rorepo.config_level: + try: + writer = self.rorepo.config_writer(config_level) + assert not writer.read_only + except IOError: + # its okay not to get a writer for some configuration files if we + # have no permissions + pass + # END for each config level + + def test_creation_deletion(self): + # just a very quick test to assure it generally works. There are + # specialized cases in the test_refs module + head = self.rorepo.create_head("new_head", "HEAD~1") + self.rorepo.delete_head(head) + + tag = self.rorepo.create_tag("new_tag", "HEAD~2") + self.rorepo.delete_tag(tag) + self.rorepo.config_writer() + remote = self.rorepo.create_remote("new_remote", "git@server:repo.git") + self.rorepo.delete_remote(remote) + + def test_comparison_and_hash(self): + # this is only a preliminary test, more testing done in test_index + assert self.rorepo == self.rorepo and not (self.rorepo != self.rorepo) + assert len(set((self.rorepo, self.rorepo))) == 1 + + def test_git_cmd(self): + # test CatFileContentStream, just to be very sure we have no fencepost errors + # last \n is the terminating newline that it expects + l1 = "0123456789\n" + l2 = "abcdefghijklmnopqrstxy\n" + l3 = "z\n" + d = "%s%s%s\n" % (l1, l2, l3) + + l1p = l1[:5] + + # full size + # size is without terminating newline + def mkfull(): + return Git.CatFileContentStream(len(d)-1, StringIO(d)) + + ts = 5 + def mktiny(): + return Git.CatFileContentStream(ts, StringIO(d)) + + # readlines no limit + s = mkfull() + lines = s.readlines() + assert len(lines) == 3 and lines[-1].endswith('\n') + assert s._stream.tell() == len(d) # must have scrubbed to the end + + # realines line limit + s = mkfull() + lines = s.readlines(5) + assert len(lines) == 1 + + # readlines on tiny sections + s = mktiny() + lines = s.readlines() + assert len(lines) == 1 and lines[0] == l1p + assert s._stream.tell() == ts+1 + + # readline no limit + s = mkfull() + assert s.readline() == l1 + assert s.readline() == l2 + assert s.readline() == l3 + assert s.readline() == '' + assert s._stream.tell() == len(d) + + # readline limit + s = mkfull() + assert s.readline(5) == l1p + assert s.readline() == l1[5:] + + # readline on tiny section + s = mktiny() + assert s.readline() == l1p + assert s.readline() == '' + assert s._stream.tell() == ts+1 + + # read no limit + s = mkfull() + assert s.read() == d[:-1] + assert s.read() == '' + assert s._stream.tell() == len(d) + + # read limit + s = mkfull() + assert s.read(5) == l1p + assert s.read(6) == l1[5:] + assert s._stream.tell() == 5 + 6 # its not yet done + + # read tiny + s = mktiny() + assert s.read(2) == l1[:2] + assert s._stream.tell() == 2 + assert s.read() == l1[2:ts] + assert s._stream.tell() == ts+1 + + def _assert_rev_parse_types(self, name, rev_obj): + rev_parse = self.rorepo.rev_parse + + if rev_obj.type == 'tag': + rev_obj = rev_obj.object + + # tree and blob type + obj = rev_parse(name + '^{tree}') + assert obj == rev_obj.tree + + obj = rev_parse(name + ':CHANGES') + assert obj.type == 'blob' and obj.path == 'CHANGES' + assert rev_obj.tree['CHANGES'] == obj + + + def _assert_rev_parse(self, name): + """tries multiple different rev-parse syntaxes with the given name + :return: parsed object""" + rev_parse = self.rorepo.rev_parse + orig_obj = rev_parse(name) + if orig_obj.type == 'tag': + obj = orig_obj.object + else: + obj = orig_obj + # END deref tags by default + + # try history + rev = name + "~" + obj2 = rev_parse(rev) + assert obj2 == obj.parents[0] + self._assert_rev_parse_types(rev, obj2) + + # history with number + ni = 11 + history = [obj.parents[0]] + for pn in range(ni): + history.append(history[-1].parents[0]) + # END get given amount of commits + + for pn in range(11): + rev = name + "~%i" % (pn+1) + obj2 = rev_parse(rev) + assert obj2 == history[pn] + self._assert_rev_parse_types(rev, obj2) + # END history check + + # parent ( default ) + rev = name + "^" + obj2 = rev_parse(rev) + assert obj2 == obj.parents[0] + self._assert_rev_parse_types(rev, obj2) + + # parent with number + for pn, parent in enumerate(obj.parents): + rev = name + "^%i" % (pn+1) + assert rev_parse(rev) == parent + self._assert_rev_parse_types(rev, parent) + # END for each parent + + return orig_obj + + @with_rw_repo('HEAD', bare=False) + def test_rw_rev_parse(self, rwrepo): + # verify it does not confuse branches with hexsha ids + ahead = rwrepo.create_head('aaaaaaaa') + assert(rwrepo.rev_parse(str(ahead)) == ahead.commit) + + def test_rev_parse(self): + rev_parse = self.rorepo.rev_parse + + # try special case: This one failed at some point, make sure its fixed + assert rev_parse("33ebe").hexsha == "33ebe7acec14b25c5f84f35a664803fcab2f7781" + + # start from reference + num_resolved = 0 + + for ref in Reference.iter_items(self.rorepo): + path_tokens = ref.path.split("/") + for pt in range(len(path_tokens)): + path_section = '/'.join(path_tokens[-(pt+1):]) + try: + obj = self._assert_rev_parse(path_section) + assert obj.type == ref.object.type + num_resolved += 1 + except BadObject: + print "failed on %s" % path_section + # is fine, in case we have something like 112, which belongs to remotes/rname/merge-requests/112 + pass + # END exception handling + # END for each token + # END for each reference + assert num_resolved + + # it works with tags ! + tag = self._assert_rev_parse('0.1.4') + assert tag.type == 'tag' + + # try full sha directly ( including type conversion ) + assert tag.object == rev_parse(tag.object.hexsha) + self._assert_rev_parse_types(tag.object.hexsha, tag.object) + + + # multiple tree types result in the same tree: HEAD^{tree}^{tree}:CHANGES + rev = '0.1.4^{tree}^{tree}' + assert rev_parse(rev) == tag.object.tree + assert rev_parse(rev+':CHANGES') == tag.object.tree['CHANGES'] + + + # try to get parents from first revision - it should fail as no such revision + # exists + first_rev = "33ebe7acec14b25c5f84f35a664803fcab2f7781" + commit = rev_parse(first_rev) + assert len(commit.parents) == 0 + assert commit.hexsha == first_rev + self.failUnlessRaises(BadObject, rev_parse, first_rev+"~") + self.failUnlessRaises(BadObject, rev_parse, first_rev+"^") + + # short SHA1 + commit2 = rev_parse(first_rev[:20]) + assert commit2 == commit + commit2 = rev_parse(first_rev[:5]) + assert commit2 == commit + + + # todo: dereference tag into a blob 0.1.7^{blob} - quite a special one + # needs a tag which points to a blob + + + # ref^0 returns commit being pointed to, same with ref~0, and ^{} + tag = rev_parse('0.1.4') + for token in (('~0', '^0', '^{}')): + assert tag.object == rev_parse('0.1.4%s' % token) + # END handle multiple tokens + + # try partial parsing + max_items = 40 + for i, binsha in enumerate(self.rorepo.odb.sha_iter()): + assert rev_parse(bin_to_hex(binsha)[:8-(i%2)]).binsha == binsha + if i > max_items: + # this is rather slow currently, as rev_parse returns an object + # which requires accessing packs, it has some additional overhead + break + # END for each binsha in repo + + # missing closing brace commit^{tree + self.failUnlessRaises(ValueError, rev_parse, '0.1.4^{tree') + + # missing starting brace + self.failUnlessRaises(ValueError, rev_parse, '0.1.4^tree}') + + # REVLOG + ####### + head = self.rorepo.head + + # need to specify a ref when using the @ syntax + self.failUnlessRaises(BadObject, rev_parse, "%s@{0}" % head.commit.hexsha) + + # uses HEAD.ref by default + assert rev_parse('@{0}') == head.commit + if not head.is_detached: + refspec = '%s@{0}' % head.ref.name + assert rev_parse(refspec) == head.ref.commit + # all additional specs work as well + assert rev_parse(refspec+"^{tree}") == head.commit.tree + assert rev_parse(refspec+":CHANGES").type == 'blob' + #END operate on non-detached head + + # the most recent previous position of the currently checked out branch + + try: + assert rev_parse('@{1}') != head.commit + except IndexError: + # on new checkouts, there isn't even a single past branch position + # in the log + pass + #END handle fresh checkouts + + # position doesn't exist + self.failUnlessRaises(IndexError, rev_parse, '@{10000}') + + # currently, nothing more is supported + self.failUnlessRaises(NotImplementedError, rev_parse, "@{1 week ago}") + + def test_submodules(self): + assert len(self.rorepo.submodules) == 2 # non-recursive + # in previous configurations, we had recursive repositories so this would compare to 2 + # now there is only one left, as gitdb was merged, but we have smmap instead + assert len(list(self.rorepo.iter_submodules())) == 2 + + assert isinstance(self.rorepo.submodule("async"), Submodule) + self.failUnlessRaises(ValueError, self.rorepo.submodule, "doesn't exist") + + @with_rw_repo('HEAD', bare=False) + def test_submodule_update(self, rwrepo): + # fails in bare mode + rwrepo._bare = True + # special handling: there are repo implementations which have a bare attribute. IN that case, set it directly + if not rwrepo.bare: + rwrepo.bare = True + self.failUnlessRaises(InvalidGitRepositoryError, rwrepo.submodule_update) + rwrepo._bare = False + if rwrepo.bare: + rwrepo.bare = False + #END special repo handling + + # test create submodule + sm = rwrepo.submodules[0] + sm = rwrepo.create_submodule("my_new_sub", "some_path", join_path_native(self.rorepo.working_tree_dir, sm.path)) + assert isinstance(sm, Submodule) + + # note: the rest of this functionality is tested in test_submodule + + diff --git a/git/test/db/cmd/test_base.py b/git/test/db/cmd/test_base.py index cbb4a339..890c0232 100644 --- a/git/test/db/cmd/test_base.py +++ b/git/test/db/cmd/test_base.py @@ -14,78 +14,78 @@ from git.db.cmd.base import * from git.refs import TagReference, Reference, RemoteReference class TestBase(RepoBase): - RepoCls = CmdCompatibilityGitDB + RepoCls = CmdCompatibilityGitDB - def test_basics(self): - gdb = self.rorepo - - # partial to complete - works with everything - hexsha = bin_to_hex(gdb.partial_to_complete_sha_hex("0.1.6")) - assert len(hexsha) == 40 - - assert bin_to_hex(gdb.partial_to_complete_sha_hex(hexsha[:20])) == hexsha - - # fails with BadObject - for invalid_rev in ("0000", "bad/ref", "super bad"): - self.failUnlessRaises(BadObject, gdb.partial_to_complete_sha_hex, invalid_rev) - - def test_fetch_info(self): - self.failUnlessRaises(ValueError, CmdCmdFetchInfo._from_line, self.rorepo, "nonsense", '') - self.failUnlessRaises(ValueError, CmdCmdFetchInfo._from_line, self.rorepo, "? [up to date] 0.1.7RC -> origin/0.1.7RC", '') - - - def test_fetch_info(self): - # assure we can handle remote-tracking branches - fetch_info_line_fmt = "c437ee5deb8d00cf02f03720693e4c802e99f390 not-for-merge %s '0.3' of git://github.com/gitpython-developers/GitPython" - remote_info_line_fmt = "* [new branch] nomatter -> %s" - fi = CmdFetchInfo._from_line(self.rorepo, - remote_info_line_fmt % "local/master", - fetch_info_line_fmt % 'remote-tracking branch') - - # we wouldn't be here if it wouldn't have worked - - # handles non-default refspecs: One can specify a different path in refs/remotes - # or a special path just in refs/something for instance - - fi = CmdFetchInfo._from_line(self.rorepo, - remote_info_line_fmt % "subdir/tagname", - fetch_info_line_fmt % 'tag') - - assert isinstance(fi.ref, TagReference) - assert fi.ref.path.startswith('refs/tags') - - # it could be in a remote direcftory though - fi = CmdFetchInfo._from_line(self.rorepo, - remote_info_line_fmt % "remotename/tags/tagname", - fetch_info_line_fmt % 'tag') - - assert isinstance(fi.ref, TagReference) - assert fi.ref.path.startswith('refs/remotes/') - - # it can also be anywhere ! - tag_path = "refs/something/remotename/tags/tagname" - fi = CmdFetchInfo._from_line(self.rorepo, - remote_info_line_fmt % tag_path, - fetch_info_line_fmt % 'tag') - - assert isinstance(fi.ref, TagReference) - assert fi.ref.path == tag_path - - # branches default to refs/remotes - fi = CmdFetchInfo._from_line(self.rorepo, - remote_info_line_fmt % "remotename/branch", - fetch_info_line_fmt % 'branch') - - assert isinstance(fi.ref, RemoteReference) - assert fi.ref.remote_name == 'remotename' - - # but you can force it anywhere, in which case we only have a references - fi = CmdFetchInfo._from_line(self.rorepo, - remote_info_line_fmt % "refs/something/branch", - fetch_info_line_fmt % 'branch') - - assert type(fi.ref) is Reference - assert fi.ref.path == "refs/something/branch" - - - + def test_basics(self): + gdb = self.rorepo + + # partial to complete - works with everything + hexsha = bin_to_hex(gdb.partial_to_complete_sha_hex("0.1.6")) + assert len(hexsha) == 40 + + assert bin_to_hex(gdb.partial_to_complete_sha_hex(hexsha[:20])) == hexsha + + # fails with BadObject + for invalid_rev in ("0000", "bad/ref", "super bad"): + self.failUnlessRaises(BadObject, gdb.partial_to_complete_sha_hex, invalid_rev) + + def test_fetch_info(self): + self.failUnlessRaises(ValueError, CmdCmdFetchInfo._from_line, self.rorepo, "nonsense", '') + self.failUnlessRaises(ValueError, CmdCmdFetchInfo._from_line, self.rorepo, "? [up to date] 0.1.7RC -> origin/0.1.7RC", '') + + + def test_fetch_info(self): + # assure we can handle remote-tracking branches + fetch_info_line_fmt = "c437ee5deb8d00cf02f03720693e4c802e99f390 not-for-merge %s '0.3' of git://github.com/gitpython-developers/GitPython" + remote_info_line_fmt = "* [new branch] nomatter -> %s" + fi = CmdFetchInfo._from_line(self.rorepo, + remote_info_line_fmt % "local/master", + fetch_info_line_fmt % 'remote-tracking branch') + + # we wouldn't be here if it wouldn't have worked + + # handles non-default refspecs: One can specify a different path in refs/remotes + # or a special path just in refs/something for instance + + fi = CmdFetchInfo._from_line(self.rorepo, + remote_info_line_fmt % "subdir/tagname", + fetch_info_line_fmt % 'tag') + + assert isinstance(fi.ref, TagReference) + assert fi.ref.path.startswith('refs/tags') + + # it could be in a remote direcftory though + fi = CmdFetchInfo._from_line(self.rorepo, + remote_info_line_fmt % "remotename/tags/tagname", + fetch_info_line_fmt % 'tag') + + assert isinstance(fi.ref, TagReference) + assert fi.ref.path.startswith('refs/remotes/') + + # it can also be anywhere ! + tag_path = "refs/something/remotename/tags/tagname" + fi = CmdFetchInfo._from_line(self.rorepo, + remote_info_line_fmt % tag_path, + fetch_info_line_fmt % 'tag') + + assert isinstance(fi.ref, TagReference) + assert fi.ref.path == tag_path + + # branches default to refs/remotes + fi = CmdFetchInfo._from_line(self.rorepo, + remote_info_line_fmt % "remotename/branch", + fetch_info_line_fmt % 'branch') + + assert isinstance(fi.ref, RemoteReference) + assert fi.ref.remote_name == 'remotename' + + # but you can force it anywhere, in which case we only have a references + fi = CmdFetchInfo._from_line(self.rorepo, + remote_info_line_fmt % "refs/something/branch", + fetch_info_line_fmt % 'branch') + + assert type(fi.ref) is Reference + assert fi.ref.path == "refs/something/branch" + + + diff --git a/git/test/db/dulwich/lib.py b/git/test/db/dulwich/lib.py index 56734064..a58469f1 100644 --- a/git/test/db/dulwich/lib.py +++ b/git/test/db/dulwich/lib.py @@ -1,23 +1,23 @@ """dulwich specific utilities, as well as all the default ones""" from git.test.lib import ( - InheritedTestMethodsOverrideWrapperMetaClsAutoMixin, - needs_module_or_skip - ) + InheritedTestMethodsOverrideWrapperMetaClsAutoMixin, + needs_module_or_skip + ) __all__ = ['needs_dulwich_or_skip', 'DulwichRequiredMetaMixin'] #{ Decoorators def needs_dulwich_or_skip(func): - """Skip this test if we have no dulwich - print warning""" - return needs_module_or_skip('dulwich')(func) + """Skip this test if we have no dulwich - print warning""" + return needs_module_or_skip('dulwich')(func) #}END decorators #{ MetaClasses class DulwichRequiredMetaMixin(InheritedTestMethodsOverrideWrapperMetaClsAutoMixin): - decorator = [needs_dulwich_or_skip] + decorator = [needs_dulwich_or_skip] #} END metaclasses diff --git a/git/test/db/dulwich/test_base.py b/git/test/db/dulwich/test_base.py index 78416518..ed2f8975 100644 --- a/git/test/db/dulwich/test_base.py +++ b/git/test/db/dulwich/test_base.py @@ -9,24 +9,24 @@ from git.test.db.base import RepoBase try: - import dulwich + import dulwich except ImportError: - # om this case, all other dulwich tests will be skipped - # Need to properly initialize the class though, otherwise it would fail - from git.db.complex import PureCompatibilityGitDB as DulwichDB + # om this case, all other dulwich tests will be skipped + # Need to properly initialize the class though, otherwise it would fail + from git.db.complex import PureCompatibilityGitDB as DulwichDB else: - # now we know dulwich is available, to do futher imports - from git.db.dulwich.complex import DulwichCompatibilityGitDB as DulwichDB - + # now we know dulwich is available, to do futher imports + from git.db.dulwich.complex import DulwichCompatibilityGitDB as DulwichDB + #END handle imports class TestDulwichDBBase(RepoBase): - __metaclass__ = DulwichRequiredMetaMixin - RepoCls = DulwichDB - - @needs_dulwich_or_skip - @with_rw_repo('HEAD', bare=False) - def test_basics(self, rw_repo): - db = DulwichDB(rw_repo.working_tree_dir) - - + __metaclass__ = DulwichRequiredMetaMixin + RepoCls = DulwichDB + + @needs_dulwich_or_skip + @with_rw_repo('HEAD', bare=False) + def test_basics(self, rw_repo): + db = DulwichDB(rw_repo.working_tree_dir) + + diff --git a/git/test/db/lib.py b/git/test/db/lib.py index 2b3ddde5..df9fec76 100644 --- a/git/test/db/lib.py +++ b/git/test/db/lib.py @@ -4,21 +4,21 @@ # the New BSD License: http://www.opensource.org/licenses/bsd-license.php """Base classes for object db testing""" from git.test.lib import ( - with_rw_directory, - with_packs_rw, - ZippedStoreShaWriter, - fixture_path, - TestBase, - rorepo_dir, - ) + with_rw_directory, + with_packs_rw, + ZippedStoreShaWriter, + fixture_path, + TestBase, + rorepo_dir, + ) from git.stream import Sha1Writer from git.base import ( - IStream, - OStream, - OInfo - ) - + IStream, + OStream, + OInfo + ) + from git.exc import BadObject from git.typ import str_blob_type @@ -28,220 +28,220 @@ from struct import pack __all__ = ('TestDBBase', 'with_rw_directory', 'with_packs_rw', 'fixture_path') - + class TestDBBase(TestBase): - """Base Class providing default functionality to all tests such as: - - - Utility functions provided by the TestCase base of the unittest method such as:: - self.fail("todo") - self.failUnlessRaises(...) - - - Class level repository which is considered read-only as it is shared among - all test cases in your type. - Access it using:: - self.rorepo # 'ro' stands for read-only - - The rorepo is in fact your current project's git repo. If you refer to specific - shas for your objects, be sure you choose some that are part of the immutable portion - of the project history ( to assure tests don't fail for others ). - - Derived types can override the default repository type to create a different - read-only repo, allowing to test their specific type - """ - - # data - two_lines = "1234\nhello world" - all_data = (two_lines, ) - - #{ Configuration - # The repository type to instantiate. It takes at least a path to operate upon - # during instantiation. - RepoCls = None - - # if True, a read-only repo will be provided and RepoCls must be set. - # Otherwise it may remain unset - needs_ro_repo = True - #} END configuration - - @classmethod - def setUpAll(cls): - """ - Dynamically add a read-only repository to our actual type. This way - each test type has its own repository - """ - if cls.needs_ro_repo: - if cls is not TestDBBase: - assert cls.RepoCls is not None, "RepoCls class member must be set in %s" % cls - cls.rorepo = cls.RepoCls(rorepo_dir()) - #END handle rorepo - - def _assert_object_writing_simple(self, db): - # write a bunch of objects and query their streams and info - null_objs = db.size() - ni = 250 - for i in xrange(ni): - data = pack(">L", i) - istream = IStream(str_blob_type, len(data), StringIO(data)) - new_istream = db.store(istream) - assert new_istream is istream - assert db.has_object(istream.binsha) - - info = db.info(istream.binsha) - assert isinstance(info, OInfo) - assert info.type == istream.type and info.size == istream.size - - stream = db.stream(istream.binsha) - assert isinstance(stream, OStream) - assert stream.binsha == info.binsha and stream.type == info.type - assert stream.read() == data - # END for each item - - assert db.size() == null_objs + ni - shas = list(db.sha_iter()) - assert len(shas) == db.size() - assert len(shas[0]) == 20 - - - def _assert_object_writing(self, db): - """General tests to verify object writing, compatible to ObjectDBW - :note: requires write access to the database""" - # start in 'dry-run' mode, using a simple sha1 writer - ostreams = (ZippedStoreShaWriter, None) - for ostreamcls in ostreams: - for data in self.all_data: - dry_run = ostreamcls is not None - ostream = None - if ostreamcls is not None: - ostream = ostreamcls() - assert isinstance(ostream, Sha1Writer) - # END create ostream - - prev_ostream = db.set_ostream(ostream) - assert type(prev_ostream) in ostreams or prev_ostream in ostreams - - istream = IStream(str_blob_type, len(data), StringIO(data)) - - # store returns same istream instance, with new sha set - my_istream = db.store(istream) - sha = istream.binsha - assert my_istream is istream - assert db.has_object(sha) != dry_run - assert len(sha) == 20 - - # verify data - the slow way, we want to run code - if not dry_run: - info = db.info(sha) - assert str_blob_type == info.type - assert info.size == len(data) - - ostream = db.stream(sha) - assert ostream.read() == data - assert ostream.type == str_blob_type - assert ostream.size == len(data) - else: - self.failUnlessRaises(BadObject, db.info, sha) - self.failUnlessRaises(BadObject, db.stream, sha) - - # DIRECT STREAM COPY - # our data hase been written in object format to the StringIO - # we pasesd as output stream. No physical database representation - # was created. - # Test direct stream copy of object streams, the result must be - # identical to what we fed in - ostream.seek(0) - istream.stream = ostream - assert istream.binsha is not None - prev_sha = istream.binsha - - db.set_ostream(ZippedStoreShaWriter()) - db.store(istream) - assert istream.binsha == prev_sha - new_ostream = db.ostream() - - # note: only works as long our store write uses the same compression - # level, which is zip_best - assert ostream.getvalue() == new_ostream.getvalue() - # END for each data set - # END for each dry_run mode - - def _assert_object_writing_async(self, db): - """Test generic object writing using asynchronous access""" - ni = 5000 - def istream_generator(offset=0, ni=ni): - for data_src in xrange(ni): - data = str(data_src + offset) - yield IStream(str_blob_type, len(data), StringIO(data)) - # END for each item - # END generator utility - - # for now, we are very trusty here as we expect it to work if it worked - # in the single-stream case - - # write objects - reader = IteratorReader(istream_generator()) - istream_reader = db.store_async(reader) - istreams = istream_reader.read() # read all - assert istream_reader.task().error() is None - assert len(istreams) == ni - - for stream in istreams: - assert stream.error is None - assert len(stream.binsha) == 20 - assert isinstance(stream, IStream) - # END assert each stream - - # test has-object-async - we must have all previously added ones - reader = IteratorReader( istream.binsha for istream in istreams ) - hasobject_reader = db.has_object_async(reader) - count = 0 - for sha, has_object in hasobject_reader: - assert has_object - count += 1 - # END for each sha - assert count == ni - - # read the objects we have just written - reader = IteratorReader( istream.binsha for istream in istreams ) - ostream_reader = db.stream_async(reader) - - # read items individually to prevent hitting possible sys-limits - count = 0 - for ostream in ostream_reader: - assert isinstance(ostream, OStream) - count += 1 - # END for each ostream - assert ostream_reader.task().error() is None - assert count == ni - - # get info about our items - reader = IteratorReader( istream.binsha for istream in istreams ) - info_reader = db.info_async(reader) - - count = 0 - for oinfo in info_reader: - assert isinstance(oinfo, OInfo) - count += 1 - # END for each oinfo instance - assert count == ni - - - # combined read-write using a converter - # add 2500 items, and obtain their output streams - nni = 2500 - reader = IteratorReader(istream_generator(offset=ni, ni=nni)) - istream_to_sha = lambda istreams: [ istream.binsha for istream in istreams ] - - istream_reader = db.store_async(reader) - istream_reader.set_post_cb(istream_to_sha) - - ostream_reader = db.stream_async(istream_reader) - - count = 0 - # read it individually, otherwise we might run into the ulimit - for ostream in ostream_reader: - assert isinstance(ostream, OStream) - count += 1 - # END for each ostream - assert count == nni - - + """Base Class providing default functionality to all tests such as: + + - Utility functions provided by the TestCase base of the unittest method such as:: + self.fail("todo") + self.failUnlessRaises(...) + + - Class level repository which is considered read-only as it is shared among + all test cases in your type. + Access it using:: + self.rorepo # 'ro' stands for read-only + + The rorepo is in fact your current project's git repo. If you refer to specific + shas for your objects, be sure you choose some that are part of the immutable portion + of the project history ( to assure tests don't fail for others ). + + Derived types can override the default repository type to create a different + read-only repo, allowing to test their specific type + """ + + # data + two_lines = "1234\nhello world" + all_data = (two_lines, ) + + #{ Configuration + # The repository type to instantiate. It takes at least a path to operate upon + # during instantiation. + RepoCls = None + + # if True, a read-only repo will be provided and RepoCls must be set. + # Otherwise it may remain unset + needs_ro_repo = True + #} END configuration + + @classmethod + def setUpAll(cls): + """ + Dynamically add a read-only repository to our actual type. This way + each test type has its own repository + """ + if cls.needs_ro_repo: + if cls is not TestDBBase: + assert cls.RepoCls is not None, "RepoCls class member must be set in %s" % cls + cls.rorepo = cls.RepoCls(rorepo_dir()) + #END handle rorepo + + def _assert_object_writing_simple(self, db): + # write a bunch of objects and query their streams and info + null_objs = db.size() + ni = 250 + for i in xrange(ni): + data = pack(">L", i) + istream = IStream(str_blob_type, len(data), StringIO(data)) + new_istream = db.store(istream) + assert new_istream is istream + assert db.has_object(istream.binsha) + + info = db.info(istream.binsha) + assert isinstance(info, OInfo) + assert info.type == istream.type and info.size == istream.size + + stream = db.stream(istream.binsha) + assert isinstance(stream, OStream) + assert stream.binsha == info.binsha and stream.type == info.type + assert stream.read() == data + # END for each item + + assert db.size() == null_objs + ni + shas = list(db.sha_iter()) + assert len(shas) == db.size() + assert len(shas[0]) == 20 + + + def _assert_object_writing(self, db): + """General tests to verify object writing, compatible to ObjectDBW + :note: requires write access to the database""" + # start in 'dry-run' mode, using a simple sha1 writer + ostreams = (ZippedStoreShaWriter, None) + for ostreamcls in ostreams: + for data in self.all_data: + dry_run = ostreamcls is not None + ostream = None + if ostreamcls is not None: + ostream = ostreamcls() + assert isinstance(ostream, Sha1Writer) + # END create ostream + + prev_ostream = db.set_ostream(ostream) + assert type(prev_ostream) in ostreams or prev_ostream in ostreams + + istream = IStream(str_blob_type, len(data), StringIO(data)) + + # store returns same istream instance, with new sha set + my_istream = db.store(istream) + sha = istream.binsha + assert my_istream is istream + assert db.has_object(sha) != dry_run + assert len(sha) == 20 + + # verify data - the slow way, we want to run code + if not dry_run: + info = db.info(sha) + assert str_blob_type == info.type + assert info.size == len(data) + + ostream = db.stream(sha) + assert ostream.read() == data + assert ostream.type == str_blob_type + assert ostream.size == len(data) + else: + self.failUnlessRaises(BadObject, db.info, sha) + self.failUnlessRaises(BadObject, db.stream, sha) + + # DIRECT STREAM COPY + # our data hase been written in object format to the StringIO + # we pasesd as output stream. No physical database representation + # was created. + # Test direct stream copy of object streams, the result must be + # identical to what we fed in + ostream.seek(0) + istream.stream = ostream + assert istream.binsha is not None + prev_sha = istream.binsha + + db.set_ostream(ZippedStoreShaWriter()) + db.store(istream) + assert istream.binsha == prev_sha + new_ostream = db.ostream() + + # note: only works as long our store write uses the same compression + # level, which is zip_best + assert ostream.getvalue() == new_ostream.getvalue() + # END for each data set + # END for each dry_run mode + + def _assert_object_writing_async(self, db): + """Test generic object writing using asynchronous access""" + ni = 5000 + def istream_generator(offset=0, ni=ni): + for data_src in xrange(ni): + data = str(data_src + offset) + yield IStream(str_blob_type, len(data), StringIO(data)) + # END for each item + # END generator utility + + # for now, we are very trusty here as we expect it to work if it worked + # in the single-stream case + + # write objects + reader = IteratorReader(istream_generator()) + istream_reader = db.store_async(reader) + istreams = istream_reader.read() # read all + assert istream_reader.task().error() is None + assert len(istreams) == ni + + for stream in istreams: + assert stream.error is None + assert len(stream.binsha) == 20 + assert isinstance(stream, IStream) + # END assert each stream + + # test has-object-async - we must have all previously added ones + reader = IteratorReader( istream.binsha for istream in istreams ) + hasobject_reader = db.has_object_async(reader) + count = 0 + for sha, has_object in hasobject_reader: + assert has_object + count += 1 + # END for each sha + assert count == ni + + # read the objects we have just written + reader = IteratorReader( istream.binsha for istream in istreams ) + ostream_reader = db.stream_async(reader) + + # read items individually to prevent hitting possible sys-limits + count = 0 + for ostream in ostream_reader: + assert isinstance(ostream, OStream) + count += 1 + # END for each ostream + assert ostream_reader.task().error() is None + assert count == ni + + # get info about our items + reader = IteratorReader( istream.binsha for istream in istreams ) + info_reader = db.info_async(reader) + + count = 0 + for oinfo in info_reader: + assert isinstance(oinfo, OInfo) + count += 1 + # END for each oinfo instance + assert count == ni + + + # combined read-write using a converter + # add 2500 items, and obtain their output streams + nni = 2500 + reader = IteratorReader(istream_generator(offset=ni, ni=nni)) + istream_to_sha = lambda istreams: [ istream.binsha for istream in istreams ] + + istream_reader = db.store_async(reader) + istream_reader.set_post_cb(istream_to_sha) + + ostream_reader = db.stream_async(istream_reader) + + count = 0 + # read it individually, otherwise we might run into the ulimit + for ostream in ostream_reader: + assert isinstance(ostream, OStream) + count += 1 + # END for each ostream + assert count == nni + + diff --git a/git/test/db/py/test_base.py b/git/test/db/py/test_base.py index 6b06bbe9..5d076bb2 100644 --- a/git/test/db/py/test_base.py +++ b/git/test/db/py/test_base.py @@ -8,9 +8,9 @@ from git.test.db.base import RepoBase from git.db.complex import PureCompatibilityGitDB class TestPyDBBase(RepoBase): - - RepoCls = PureCompatibilityGitDB - - def test_basics(self): - pass - + + RepoCls = PureCompatibilityGitDB + + def test_basics(self): + pass + diff --git a/git/test/db/py/test_git.py b/git/test/db/py/test_git.py index ecaa5c8f..4f5b5fb5 100644 --- a/git/test/db/py/test_git.py +++ b/git/test/db/py/test_git.py @@ -12,40 +12,40 @@ from git.util import hex_to_bin, bin_to_hex import os class TestGitDB(TestDBBase): - needs_ro_repo = False - - def test_reading(self): - gdb = PureGitODB(os.path.join(rorepo_dir(), 'objects')) - - # we have packs and loose objects, alternates doesn't necessarily exist - assert 1 < len(gdb.databases()) < 4 - - # access should be possible - git_sha = hex_to_bin("5aebcd5cb3340fb31776941d7e4d518a712a8655") - assert isinstance(gdb.info(git_sha), OInfo) - assert isinstance(gdb.stream(git_sha), OStream) - assert gdb.size() > 200 - sha_list = list(gdb.sha_iter()) - assert len(sha_list) == gdb.size() - - - # This is actually a test for compound functionality, but it doesn't - # have a separate test module - # test partial shas - # this one as uneven and quite short - assert gdb.partial_to_complete_sha_hex('5aebcd') == hex_to_bin("5aebcd5cb3340fb31776941d7e4d518a712a8655") - - # mix even/uneven hexshas - for i, binsha in enumerate(sha_list[:50]): - assert gdb.partial_to_complete_sha_hex(bin_to_hex(binsha)[:8-(i%2)]) == binsha - # END for each sha - - self.failUnlessRaises(BadObject, gdb.partial_to_complete_sha_hex, "0000") - - @with_rw_directory - def test_writing(self, path): - gdb = PureGitODB(path) - - # its possible to write objects - self._assert_object_writing(gdb) - self._assert_object_writing_async(gdb) + needs_ro_repo = False + + def test_reading(self): + gdb = PureGitODB(os.path.join(rorepo_dir(), 'objects')) + + # we have packs and loose objects, alternates doesn't necessarily exist + assert 1 < len(gdb.databases()) < 4 + + # access should be possible + git_sha = hex_to_bin("5aebcd5cb3340fb31776941d7e4d518a712a8655") + assert isinstance(gdb.info(git_sha), OInfo) + assert isinstance(gdb.stream(git_sha), OStream) + assert gdb.size() > 200 + sha_list = list(gdb.sha_iter()) + assert len(sha_list) == gdb.size() + + + # This is actually a test for compound functionality, but it doesn't + # have a separate test module + # test partial shas + # this one as uneven and quite short + assert gdb.partial_to_complete_sha_hex('5aebcd') == hex_to_bin("5aebcd5cb3340fb31776941d7e4d518a712a8655") + + # mix even/uneven hexshas + for i, binsha in enumerate(sha_list[:50]): + assert gdb.partial_to_complete_sha_hex(bin_to_hex(binsha)[:8-(i%2)]) == binsha + # END for each sha + + self.failUnlessRaises(BadObject, gdb.partial_to_complete_sha_hex, "0000") + + @with_rw_directory + def test_writing(self, path): + gdb = PureGitODB(path) + + # its possible to write objects + self._assert_object_writing(gdb) + self._assert_object_writing_async(gdb) diff --git a/git/test/db/py/test_loose.py b/git/test/db/py/test_loose.py index 0c9b4831..cfb0ca3a 100644 --- a/git/test/db/py/test_loose.py +++ b/git/test/db/py/test_loose.py @@ -6,31 +6,31 @@ from git.test.db.lib import TestDBBase, with_rw_directory from git.db.py.loose import PureLooseObjectODB from git.exc import BadObject from git.util import bin_to_hex - + class TestLooseDB(TestDBBase): - - needs_ro_repo = False - - @with_rw_directory - def test_basics(self, path): - ldb = PureLooseObjectODB(path) - - # write data - self._assert_object_writing(ldb) - self._assert_object_writing_async(ldb) - - # verify sha iteration and size - shas = list(ldb.sha_iter()) - assert shas and len(shas[0]) == 20 - - assert len(shas) == ldb.size() - - # verify find short object - long_sha = bin_to_hex(shas[-1]) - for short_sha in (long_sha[:20], long_sha[:5]): - assert bin_to_hex(ldb.partial_to_complete_sha_hex(short_sha)) == long_sha - # END for each sha - - self.failUnlessRaises(BadObject, ldb.partial_to_complete_sha_hex, '0000') - # raises if no object could be foudn - + + needs_ro_repo = False + + @with_rw_directory + def test_basics(self, path): + ldb = PureLooseObjectODB(path) + + # write data + self._assert_object_writing(ldb) + self._assert_object_writing_async(ldb) + + # verify sha iteration and size + shas = list(ldb.sha_iter()) + assert shas and len(shas[0]) == 20 + + assert len(shas) == ldb.size() + + # verify find short object + long_sha = bin_to_hex(shas[-1]) + for short_sha in (long_sha[:20], long_sha[:5]): + assert bin_to_hex(ldb.partial_to_complete_sha_hex(short_sha)) == long_sha + # END for each sha + + self.failUnlessRaises(BadObject, ldb.partial_to_complete_sha_hex, '0000') + # raises if no object could be foudn + diff --git a/git/test/db/py/test_mem.py b/git/test/db/py/test_mem.py index bc98dc56..bb879554 100644 --- a/git/test/db/py/test_mem.py +++ b/git/test/db/py/test_mem.py @@ -5,26 +5,26 @@ from git.test.db.lib import TestDBBase, with_rw_directory from git.db.py.mem import PureMemoryDB from git.db.py.loose import PureLooseObjectODB - + class TestPureMemoryDB(TestDBBase): - - needs_ro_repo = False + + needs_ro_repo = False - @with_rw_directory - def test_writing(self, path): - mdb = PureMemoryDB() - - # write data - self._assert_object_writing_simple(mdb) - - # test stream copy - ldb = PureLooseObjectODB(path) - assert ldb.size() == 0 - num_streams_copied = mdb.stream_copy(mdb.sha_iter(), ldb) - assert num_streams_copied == mdb.size() - - assert ldb.size() == mdb.size() - for sha in mdb.sha_iter(): - assert ldb.has_object(sha) - assert ldb.stream(sha).read() == mdb.stream(sha).read() - # END verify objects where copied and are equal + @with_rw_directory + def test_writing(self, path): + mdb = PureMemoryDB() + + # write data + self._assert_object_writing_simple(mdb) + + # test stream copy + ldb = PureLooseObjectODB(path) + assert ldb.size() == 0 + num_streams_copied = mdb.stream_copy(mdb.sha_iter(), ldb) + assert num_streams_copied == mdb.size() + + assert ldb.size() == mdb.size() + for sha in mdb.sha_iter(): + assert ldb.has_object(sha) + assert ldb.stream(sha).read() == mdb.stream(sha).read() + # END verify objects where copied and are equal diff --git a/git/test/db/py/test_pack.py b/git/test/db/py/test_pack.py index 5043f446..54dc2e2c 100644 --- a/git/test/db/py/test_pack.py +++ b/git/test/db/py/test_pack.py @@ -13,64 +13,64 @@ import os import random class TestPackDB(TestDBBase): - - needs_ro_repo = False - - @with_packs_rw - def test_writing(self, path): - pdb = PurePackedODB(path) - - # on demand, we init our pack cache - num_packs = len(pdb.entities()) - assert num_packs - assert pdb._st_mtime != 0 - - # test pack directory changed: - # packs removed - rename a file, should affect the glob - pack_path = pdb.entities()[0].pack().path() - new_pack_path = pack_path + "renamed" - os.rename(pack_path, new_pack_path) - - pdb.update_cache(force=True) - assert len(pdb.entities()) == num_packs - 1 - - # packs added - os.rename(new_pack_path, pack_path) - pdb.update_cache(force=True) - assert len(pdb.entities()) == num_packs - - # bang on the cache - # access the Entities directly, as there is no iteration interface - # yet ( or required for now ) - sha_list = list(pdb.sha_iter()) - assert len(sha_list) == pdb.size() - - # hit all packs in random order - random.shuffle(sha_list) - - for sha in sha_list: - info = pdb.info(sha) - stream = pdb.stream(sha) - # END for each sha to query - - - # test short finding - be a bit more brutal here - max_bytes = 19 - min_bytes = 2 - num_ambiguous = 0 - for i, sha in enumerate(sha_list): - short_sha = sha[:max((i % max_bytes), min_bytes)] - try: - assert pdb.partial_to_complete_sha(short_sha, len(short_sha)*2) == sha - except AmbiguousObjectName: - num_ambiguous += 1 - pass # valid, we can have short objects - # END exception handling - # END for each sha to find - - # we should have at least one ambiguous, considering the small sizes - # but in our pack, there is no ambigious ... - # assert num_ambiguous - - # non-existing - self.failUnlessRaises(BadObject, pdb.partial_to_complete_sha, "\0\0", 4) + + needs_ro_repo = False + + @with_packs_rw + def test_writing(self, path): + pdb = PurePackedODB(path) + + # on demand, we init our pack cache + num_packs = len(pdb.entities()) + assert num_packs + assert pdb._st_mtime != 0 + + # test pack directory changed: + # packs removed - rename a file, should affect the glob + pack_path = pdb.entities()[0].pack().path() + new_pack_path = pack_path + "renamed" + os.rename(pack_path, new_pack_path) + + pdb.update_cache(force=True) + assert len(pdb.entities()) == num_packs - 1 + + # packs added + os.rename(new_pack_path, pack_path) + pdb.update_cache(force=True) + assert len(pdb.entities()) == num_packs + + # bang on the cache + # access the Entities directly, as there is no iteration interface + # yet ( or required for now ) + sha_list = list(pdb.sha_iter()) + assert len(sha_list) == pdb.size() + + # hit all packs in random order + random.shuffle(sha_list) + + for sha in sha_list: + info = pdb.info(sha) + stream = pdb.stream(sha) + # END for each sha to query + + + # test short finding - be a bit more brutal here + max_bytes = 19 + min_bytes = 2 + num_ambiguous = 0 + for i, sha in enumerate(sha_list): + short_sha = sha[:max((i % max_bytes), min_bytes)] + try: + assert pdb.partial_to_complete_sha(short_sha, len(short_sha)*2) == sha + except AmbiguousObjectName: + num_ambiguous += 1 + pass # valid, we can have short objects + # END exception handling + # END for each sha to find + + # we should have at least one ambiguous, considering the small sizes + # but in our pack, there is no ambigious ... + # assert num_ambiguous + + # non-existing + self.failUnlessRaises(BadObject, pdb.partial_to_complete_sha, "\0\0", 4) diff --git a/git/test/db/py/test_ref.py b/git/test/db/py/test_ref.py index c5374dc9..dfaf9644 100644 --- a/git/test/db/py/test_ref.py +++ b/git/test/db/py/test_ref.py @@ -6,57 +6,57 @@ from git.test.db.lib import * from git.db.py.ref import PureReferenceDB from git.util import ( - NULL_BIN_SHA, - hex_to_bin - ) + NULL_BIN_SHA, + hex_to_bin + ) import os - + class TestPureReferenceDB(TestDBBase): - - needs_ro_repo = False - - def make_alt_file(self, alt_path, alt_list): - """Create an alternates file which contains the given alternates. - The list can be empty""" - alt_file = open(alt_path, "wb") - for alt in alt_list: - alt_file.write(alt + "\n") - alt_file.close() - - @with_rw_directory - def test_writing(self, path): - NULL_BIN_SHA = '\0' * 20 - - alt_path = os.path.join(path, 'alternates') - rdb = PureReferenceDB(alt_path) - assert len(rdb.databases()) == 0 - assert rdb.size() == 0 - assert len(list(rdb.sha_iter())) == 0 - - # try empty, non-existing - assert not rdb.has_object(NULL_BIN_SHA) - - - # setup alternate file - # add two, one is invalid - own_repo_path = fixture_path('../../../.git/objects') # use own repo - self.make_alt_file(alt_path, [own_repo_path, "invalid/path"]) - rdb.update_cache() - assert len(rdb.databases()) == 1 - - # we should now find a default revision of ours - git_sha = hex_to_bin("5aebcd5cb3340fb31776941d7e4d518a712a8655") - assert rdb.has_object(git_sha) - - # remove valid - self.make_alt_file(alt_path, ["just/one/invalid/path"]) - rdb.update_cache() - assert len(rdb.databases()) == 0 - - # add valid - self.make_alt_file(alt_path, [own_repo_path]) - rdb.update_cache() - assert len(rdb.databases()) == 1 - - + + needs_ro_repo = False + + def make_alt_file(self, alt_path, alt_list): + """Create an alternates file which contains the given alternates. + The list can be empty""" + alt_file = open(alt_path, "wb") + for alt in alt_list: + alt_file.write(alt + "\n") + alt_file.close() + + @with_rw_directory + def test_writing(self, path): + NULL_BIN_SHA = '\0' * 20 + + alt_path = os.path.join(path, 'alternates') + rdb = PureReferenceDB(alt_path) + assert len(rdb.databases()) == 0 + assert rdb.size() == 0 + assert len(list(rdb.sha_iter())) == 0 + + # try empty, non-existing + assert not rdb.has_object(NULL_BIN_SHA) + + + # setup alternate file + # add two, one is invalid + own_repo_path = fixture_path('../../../.git/objects') # use own repo + self.make_alt_file(alt_path, [own_repo_path, "invalid/path"]) + rdb.update_cache() + assert len(rdb.databases()) == 1 + + # we should now find a default revision of ours + git_sha = hex_to_bin("5aebcd5cb3340fb31776941d7e4d518a712a8655") + assert rdb.has_object(git_sha) + + # remove valid + self.make_alt_file(alt_path, ["just/one/invalid/path"]) + rdb.update_cache() + assert len(rdb.databases()) == 0 + + # add valid + self.make_alt_file(alt_path, [own_repo_path]) + rdb.update_cache() + assert len(rdb.databases()) == 1 + + diff --git a/git/test/db/pygit2/lib.py b/git/test/db/pygit2/lib.py index 356df9dc..fab762e7 100644 --- a/git/test/db/pygit2/lib.py +++ b/git/test/db/pygit2/lib.py @@ -1,23 +1,23 @@ """pygit2 specific utilities, as well as all the default ones""" from git.test.lib import ( - InheritedTestMethodsOverrideWrapperMetaClsAutoMixin, - needs_module_or_skip - ) + InheritedTestMethodsOverrideWrapperMetaClsAutoMixin, + needs_module_or_skip + ) __all__ = ['needs_pygit2_or_skip', 'Pygit2RequiredMetaMixin'] #{ Decoorators def needs_pygit2_or_skip(func): - """Skip this test if we have no pygit2 - print warning""" - return needs_module_or_skip('pygit2')(func) + """Skip this test if we have no pygit2 - print warning""" + return needs_module_or_skip('pygit2')(func) #}END decorators #{ MetaClasses class Pygit2RequiredMetaMixin(InheritedTestMethodsOverrideWrapperMetaClsAutoMixin): - decorator = [needs_pygit2_or_skip] + decorator = [needs_pygit2_or_skip] #} END metaclasses diff --git a/git/test/db/pygit2/test_base.py b/git/test/db/pygit2/test_base.py index 246a1643..52ee24f5 100644 --- a/git/test/db/pygit2/test_base.py +++ b/git/test/db/pygit2/test_base.py @@ -9,24 +9,24 @@ from git.test.db.base import RepoBase try: - import pygit2 + import pygit2 except ImportError: - # om this case, all other pygit2 tests will be skipped - # Need to properly initialize the class though, otherwise it would fail - from git.db.complex import PureCompatibilityGitDB as Pygit2DB + # om this case, all other pygit2 tests will be skipped + # Need to properly initialize the class though, otherwise it would fail + from git.db.complex import PureCompatibilityGitDB as Pygit2DB else: - # now we know pygit2 is available, to do futher imports - from git.db.pygit2.complex import Pygit2CompatibilityGitDB as Pygit2DB - + # now we know pygit2 is available, to do futher imports + from git.db.pygit2.complex import Pygit2CompatibilityGitDB as Pygit2DB + #END handle imports class TestPyGit2DBBase(RepoBase): - __metaclass__ = Pygit2RequiredMetaMixin - RepoCls = Pygit2DB - - @needs_pygit2_or_skip - @with_rw_repo('HEAD', bare=False) - def test_basics(self, rw_repo): - db = Pygit2DB(rw_repo.working_tree_dir) - - + __metaclass__ = Pygit2RequiredMetaMixin + RepoCls = Pygit2DB + + @needs_pygit2_or_skip + @with_rw_repo('HEAD', bare=False) + def test_basics(self, rw_repo): + db = Pygit2DB(rw_repo.working_tree_dir) + + diff --git a/git/test/db/test_base.py b/git/test/db/test_base.py index 2a882d0a..78da9f04 100644 --- a/git/test/db/test_base.py +++ b/git/test/db/test_base.py @@ -7,14 +7,14 @@ from git.db import RefSpec class TestBase(TestDBBase): - needs_ro_repo = False + needs_ro_repo = False - @with_rw_directory - def test_basics(self, path): - self.failUnlessRaises(ValueError, RefSpec, None, None) - rs = RefSpec(None, "something") - assert rs.force == False - assert rs.delete_destination() - assert rs.source is None - assert rs.destination == "something" - + @with_rw_directory + def test_basics(self, path): + self.failUnlessRaises(ValueError, RefSpec, None, None) + rs = RefSpec(None, "something") + assert rs.force == False + assert rs.delete_destination() + assert rs.source is None + assert rs.destination == "something" + |