summaryrefslogtreecommitdiff
path: root/git/test/lib
diff options
context:
space:
mode:
Diffstat (limited to 'git/test/lib')
-rw-r--r--git/test/lib/__init__.py6
-rw-r--r--git/test/lib/asserts.py18
-rw-r--r--git/test/lib/base.py76
-rw-r--r--git/test/lib/helper.py139
4 files changed, 137 insertions, 102 deletions
diff --git a/git/test/lib/__init__.py b/git/test/lib/__init__.py
index a0656438..a94b6617 100644
--- a/git/test/lib/__init__.py
+++ b/git/test/lib/__init__.py
@@ -5,7 +5,7 @@
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
import inspect
-# TODO: Separate names - they do repeat unfortunately. Also deduplicate it,
+# TODO: Separate names - they do repeat unfortunately. Also deduplicate it,
# redesign decorators to support multiple database types in succession.
from base import *
@@ -14,5 +14,5 @@ from asserts import *
from helper import *
-__all__ = [ name for name, obj in locals().items()
- if not (name.startswith('_') or inspect.ismodule(obj)) ]
+__all__ = [name for name, obj in locals().items()
+ if not (name.startswith('_') or inspect.ismodule(obj))]
diff --git a/git/test/lib/asserts.py b/git/test/lib/asserts.py
index fa754b92..351901dc 100644
--- a/git/test/lib/asserts.py
+++ b/git/test/lib/asserts.py
@@ -10,41 +10,49 @@ from nose import tools
from nose.tools import *
import stat
-__all__ = ['assert_instance_of', 'assert_not_instance_of',
+__all__ = ['assert_instance_of', 'assert_not_instance_of',
'assert_none', 'assert_not_none',
'assert_match', 'assert_not_match', 'assert_mode_644',
'assert_mode_755'] + tools.__all__
+
def assert_instance_of(expected, actual, msg=None):
"""Verify that object is an instance of expected """
assert isinstance(actual, expected), msg
+
def assert_not_instance_of(expected, actual, msg=None):
"""Verify that object is not an instance of expected """
assert not isinstance(actual, expected, msg)
-
+
+
def assert_none(actual, msg=None):
"""verify that item is None"""
assert actual is None, msg
+
def assert_not_none(actual, msg=None):
"""verify that item is None"""
assert actual is not None, msg
+
def assert_match(pattern, string, msg=None):
"""verify that the pattern matches the string"""
assert_not_none(re.search(pattern, string), msg)
+
def assert_not_match(pattern, string, msg=None):
"""verify that the pattern does not match the string"""
assert_none(re.search(pattern, string), msg)
-
+
+
def assert_mode_644(mode):
"""Verify given mode is 644"""
- assert (mode & stat.S_IROTH) and (mode & stat.S_IRGRP)
+ assert (mode & stat.S_IROTH) and (mode & stat.S_IRGRP)
assert (mode & stat.S_IWUSR) and (mode & stat.S_IRUSR) and not (mode & stat.S_IXUSR)
+
def assert_mode_755(mode):
"""Verify given mode is 755"""
assert (mode & stat.S_IROTH) and (mode & stat.S_IRGRP) and (mode & stat.S_IXOTH) and (mode & stat.S_IXGRP)
- assert (mode & stat.S_IWUSR) and (mode & stat.S_IRUSR) and (mode & stat.S_IXUSR) \ No newline at end of file
+ assert (mode & stat.S_IWUSR) and (mode & stat.S_IRUSR) and (mode & stat.S_IXUSR)
diff --git a/git/test/lib/base.py b/git/test/lib/base.py
index 298e8e05..39bc9b73 100644
--- a/git/test/lib/base.py
+++ b/git/test/lib/base.py
@@ -4,15 +4,15 @@
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Utilities used in ODB testing"""
from git.base import OStream
-from git.stream import (
- Sha1Writer,
- ZippedStoreShaWriter
- )
+from git.stream import (
+ Sha1Writer,
+ ZippedStoreShaWriter
+)
from git.util import (
- zlib,
- dirname
- )
+ zlib,
+ dirname
+)
import sys
import random
@@ -32,6 +32,7 @@ import gc
def with_rw_directory(func):
"""Create a temporary directory which can be written to, remove it if the
test suceeds, but leave it otherwise to aid additional debugging"""
+
def wrapper(self):
path = maketemp(prefix=func.__name__)
os.mkdir(path)
@@ -45,7 +46,7 @@ def with_rw_directory(func):
raise
finally:
# Need to collect here to be sure all handles have been closed. It appears
- # a windows-only issue. In fact things should be deleted, as well as
+ # a windows-only issue. In fact things should be deleted, as well as
# memory maps closed, once objects go out of scope. For some reason
# though this is not the case here unless we collect explicitly.
if not keep:
@@ -53,7 +54,7 @@ def with_rw_directory(func):
shutil.rmtree(path)
# END handle exception
# END wrapper
-
+
wrapper.__name__ = func.__name__
return wrapper
@@ -65,6 +66,7 @@ def with_rw_repo(func):
being on a certain branch or on anything really except for the default tags
that should exist
Wrapped function obtains a git repository """
+
def wrapper(self, path):
src_dir = dirname(dirname(dirname(__file__)))
assert(os.path.isdir(path))
@@ -73,24 +75,24 @@ def with_rw_repo(func):
target_gitdir = os.path.join(path, '.git')
assert os.path.isdir(target_gitdir)
return func(self, self.RepoCls(target_gitdir))
- #END wrapper
+ # END wrapper
wrapper.__name__ = func.__name__
return with_rw_directory(wrapper)
-
def with_packs_rw(func):
"""Function that provides a path into which the packs for testing should be
copied. Will pass on the path to the actual function afterwards
-
+
:note: needs with_rw_directory wrapped around it"""
+
def wrapper(self, path):
src_pack_glob = fixture_path('packs/*')
print src_pack_glob
copy_files_globbed(src_pack_glob, path, hard_link_ok=True)
return func(self, path)
# END wrapper
-
+
wrapper.__name__ = func.__name__
return with_rw_directory(wrapper)
@@ -98,6 +100,7 @@ def with_packs_rw(func):
#{ Routines
+
def rorepo_dir():
""":return: path to our own repository, being our own .git directory.
:note: doesn't work in bare repositories"""
@@ -105,6 +108,7 @@ def rorepo_dir():
assert os.path.isdir(base)
return base
+
def maketemp(*args, **kwargs):
"""Wrapper around default tempfile.mktemp to fix an osx issue"""
tdir = tempfile.mktemp(*args, **kwargs)
@@ -112,19 +116,23 @@ def maketemp(*args, **kwargs):
tdir = '/private' + tdir
return tdir
+
def fixture_path(relapath=''):
""":return: absolute path into the fixture directory
:param relapath: relative path into the fixtures directory, or ''
to obtain the fixture directory itself"""
test_dir = os.path.dirname(os.path.dirname(__file__))
return os.path.join(test_dir, "fixtures", relapath)
-
+
+
def fixture(name):
return open(fixture_path(name), 'rb').read()
+
def absolute_project_path():
return os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
+
def copy_files_globbed(source_glob, target_dir, hard_link_ok=False):
"""Copy all files found according to the given source glob into the target directory
:param hard_link_ok: if True, hard links will be created if possible. Otherwise
@@ -141,7 +149,7 @@ def copy_files_globbed(source_glob, target_dir, hard_link_ok=False):
shutil.copy(src_file, target_dir)
# END try hard link
# END for each file to copy
-
+
def make_bytes(size_in_bytes, randomize=False):
""":return: string with given size in bytes
@@ -155,11 +163,13 @@ def make_bytes(size_in_bytes, randomize=False):
a = array('i', producer)
return a.tostring()
+
def make_object(type, data):
""":return: bytes resembling an uncompressed object"""
odata = "blob %i\0" % len(data)
return odata + data
-
+
+
def make_memory_file(size_in_bytes, randomize=False):
""":return: tuple(size_of_stream, stream)
:param randomize: try to produce a very random stream"""
@@ -170,31 +180,33 @@ def make_memory_file(size_in_bytes, randomize=False):
#{ Stream Utilities
+
class DummyStream(object):
- def __init__(self):
- self.was_read = False
- self.bytes = 0
- self.closed = False
-
- def read(self, size):
- self.was_read = True
- self.bytes = size
-
- def close(self):
- self.closed = True
-
- def _assert(self):
- assert self.was_read
+
+ def __init__(self):
+ self.was_read = False
+ self.bytes = 0
+ self.closed = False
+
+ def read(self, size):
+ self.was_read = True
+ self.bytes = size
+
+ def close(self):
+ self.closed = True
+
+ def _assert(self):
+ assert self.was_read
class DeriveTest(OStream):
+
def __init__(self, sha, type, size, stream, *args, **kwargs):
self.myarg = kwargs.pop('myarg')
self.args = args
-
+
def _assert(self):
assert self.args
assert self.myarg
#} END stream utilitiess
-
diff --git a/git/test/lib/helper.py b/git/test/lib/helper.py
index bb17745c..db6eb121 100644
--- a/git/test/lib/helper.py
+++ b/git/test/lib/helper.py
@@ -16,36 +16,37 @@ import warnings
from nose import SkipTest
from base import (
- maketemp,
- rorepo_dir
- )
+ maketemp,
+ rorepo_dir
+)
__all__ = (
- 'StringProcessAdapter', 'GlobalsItemDeletorMetaCls', 'InheritedTestMethodsOverrideWrapperMetaClsAutoMixin',
- 'with_rw_repo', 'with_rw_and_rw_remote_repo', 'TestBase', 'TestCase', 'needs_module_or_skip'
- )
+ 'StringProcessAdapter', 'GlobalsItemDeletorMetaCls', 'InheritedTestMethodsOverrideWrapperMetaClsAutoMixin',
+ 'with_rw_repo', 'with_rw_and_rw_remote_repo', 'TestBase', 'TestCase', 'needs_module_or_skip'
+)
-
-#{ Adapters
-
+#{ Adapters
+
class StringProcessAdapter(object):
+
"""Allows to use strings as Process object as returned by SubProcess.Popen.
Its tailored to work with the test system only"""
-
+
def __init__(self, input_string):
self.stdout = cStringIO.StringIO(input_string)
self.stderr = cStringIO.StringIO()
-
+
def wait(self):
return 0
-
+
poll = wait
-
+
#} END adapters
-#{ Decorators
+#{ Decorators
+
def _rmtree_onerror(osremove, fullpath, exec_info):
"""
@@ -54,35 +55,37 @@ def _rmtree_onerror(osremove, fullpath, exec_info):
"""
if os.name != 'nt' or osremove is not os.remove:
raise
-
+
os.chmod(fullpath, 0777)
os.remove(fullpath)
+
def with_rw_repo(working_tree_ref, bare=False):
"""
Same as with_bare_repo, but clones the rorepo as non-bare repository, checking
out the working tree at the given working_tree_ref.
-
+
This repository type is more costly due to the working copy checkout.
-
+
To make working with relative paths easier, the cwd will be set to the working
dir of the repository.
"""
assert isinstance(working_tree_ref, basestring), "Decorator requires ref name for working tree checkout"
+
def argument_passer(func):
def repo_creator(self):
prefix = 'non_'
if bare:
prefix = ''
- #END handle prefix
+ # END handle prefix
repo_dir = maketemp("%sbare_%s" % (prefix, func.__name__))
rw_repo = self.rorepo.clone(repo_dir, shared=True, bare=bare, n=True)
-
+
rw_repo.head.commit = rw_repo.commit(working_tree_ref)
if not bare:
rw_repo.head.reference.checkout()
# END handle checkout
-
+
prev_cwd = os.getcwd()
os.chdir(rw_repo.working_dir)
try:
@@ -104,7 +107,8 @@ def with_rw_repo(working_tree_ref, bare=False):
return repo_creator
# END argument passer
return argument_passer
-
+
+
def with_rw_and_rw_remote_repo(working_tree_ref):
"""
Same as with_rw_repo, but also provides a writable remote repository from which the
@@ -112,36 +116,38 @@ def with_rw_and_rw_remote_repo(working_tree_ref):
run the remote_repo.
The remote repository was cloned as bare repository from the rorepo, wheras
the rw repo has a working tree and was cloned from the remote repository.
-
+
remote_repo has two remotes: origin and daemon_origin. One uses a local url,
the other uses a server url. The daemon setup must be done on system level
and should be an inetd service that serves tempdir.gettempdir() and all
directories in it.
-
+
The following scetch demonstrates this::
rorepo ---<bare clone>---> rw_remote_repo ---<clone>---> rw_repo
-
+
The test case needs to support the following signature::
def case(self, rw_repo, rw_remote_repo)
-
+
This setup allows you to test push and pull scenarios and hooks nicely.
-
+
See working dir info in with_rw_repo
"""
assert isinstance(working_tree_ref, basestring), "Decorator requires ref name for working tree checkout"
+
def argument_passer(func):
def remote_repo_creator(self):
remote_repo_dir = maketemp("remote_repo_%s" % func.__name__)
repo_dir = maketemp("remote_clone_non_bare_repo")
-
+
rw_remote_repo = self.rorepo.clone(remote_repo_dir, shared=True, bare=True)
- rw_repo = rw_remote_repo.clone(repo_dir, shared=True, bare=False, n=True) # recursive alternates info ?
+ # recursive alternates info ?
+ rw_repo = rw_remote_repo.clone(repo_dir, shared=True, bare=False, n=True)
rw_repo.head.commit = working_tree_ref
rw_repo.head.reference.checkout()
-
+
# prepare for git-daemon
rw_remote_repo.daemon_export = True
-
+
# this thing is just annoying !
crw = rw_remote_repo.config_writer()
section = "daemon"
@@ -152,28 +158,30 @@ def with_rw_and_rw_remote_repo(working_tree_ref):
crw.set(section, "receivepack", True)
# release lock
del(crw)
-
- # initialize the remote - first do it as local remote and pull, then
+
+ # initialize the remote - first do it as local remote and pull, then
# we change the url to point to the daemon. The daemon should be started
# by the user, not by us
d_remote = Remote.create(rw_repo, "daemon_origin", remote_repo_dir)
d_remote.fetch()
remote_repo_url = "git://localhost%s" % remote_repo_dir
-
+
d_remote.config_writer.set('url', remote_repo_url)
-
+
# try to list remotes to diagnoes whether the server is up
try:
rw_repo.git.ls_remote(d_remote)
- except GitCommandError,e:
+ except GitCommandError, e:
print str(e)
if os.name == 'nt':
- raise AssertionError('git-daemon needs to run this test, but windows does not have one. Otherwise, run: git-daemon "%s"' % os.path.dirname(_mktemp()))
+ raise AssertionError(
+ 'git-daemon needs to run this test, but windows does not have one. Otherwise, run: git-daemon "%s"' % os.path.dirname(_mktemp()))
else:
- raise AssertionError('Please start a git-daemon to run this test, execute: git-daemon "%s"' % os.path.dirname(_mktemp()))
+ raise AssertionError(
+ 'Please start a git-daemon to run this test, execute: git-daemon "%s"' % os.path.dirname(_mktemp()))
# END make assertion
- #END catch ls remote error
-
+ # END catch ls remote error
+
# adjust working dir
prev_cwd = os.getcwd()
os.chdir(rw_repo.working_dir)
@@ -191,9 +199,10 @@ def with_rw_and_rw_remote_repo(working_tree_ref):
return remote_repo_creator
# END remote repo creator
# END argument parsser
-
+
return argument_passer
-
+
+
def needs_module_or_skip(module):
"""Decorator to be used for test cases only.
Print a warning if the given module could not be imported, and skip the test.
@@ -207,25 +216,28 @@ def needs_module_or_skip(module):
msg = "Module %r is required to run this test - skipping" % module
warnings.warn(msg)
raise SkipTest(msg)
- #END check import
+ # END check import
return func(self, *args, **kwargs)
- #END wrapper
+ # END wrapper
wrapper.__name__ = func.__name__
return wrapper
- #END argpasser
+ # END argpasser
return argpasser
-
+
#} END decorators
#{ Meta Classes
+
+
class GlobalsItemDeletorMetaCls(type):
+
"""Utiltiy to prevent the RepoBase to be picked up by nose as the metacls
will delete the instance from the globals"""
#{ Configuration
# Set this to a string name of the module to delete
ModuleToDelete = None
#} END configuration
-
+
def __new__(metacls, name, bases, clsdict):
assert metacls.ModuleToDelete is not None, "Invalid metaclass configuration"
new_type = super(GlobalsItemDeletorMetaCls, metacls).__new__(metacls, name, bases, clsdict)
@@ -235,22 +247,23 @@ class GlobalsItemDeletorMetaCls(type):
delattr(mod, metacls.ModuleToDelete)
except AttributeError:
pass
- #END skip case that people import our base without actually using it
- #END handle deletion
+ # END skip case that people import our base without actually using it
+ # END handle deletion
return new_type
-
-
+
+
class InheritedTestMethodsOverrideWrapperMetaClsAutoMixin(object):
+
"""Automatically picks up the actual metaclass of the the type to be created,
that is the one inherited by one of the bases, and patch up its __new__ to use
the InheritedTestMethodsOverrideWrapperInstanceDecorator with our configured decorator"""
-
+
#{ Configuration
# decorator function to use when wrapping the inherited methods. Put it into a list as first member
# to hide it from being created as class method
decorator = []
#}END configuration
-
+
@classmethod
def _find_metacls(metacls, bases):
"""emulate pythons lookup"""
@@ -259,9 +272,9 @@ class InheritedTestMethodsOverrideWrapperMetaClsAutoMixin(object):
if hasattr(base, mcls_attr):
return getattr(base, mcls_attr)
return metacls._find_metacls(base.__bases__)
- #END for each base
+ # END for each base
raise AssertionError("base class had not metaclass attached")
-
+
@classmethod
def _patch_methods_recursive(metacls, bases, clsdict):
"""depth-first patching of methods"""
@@ -270,33 +283,35 @@ class InheritedTestMethodsOverrideWrapperMetaClsAutoMixin(object):
for name, item in base.__dict__.iteritems():
if not name.startswith('test_'):
continue
- #END skip non-tests
+ # END skip non-tests
clsdict[name] = metacls.decorator[0](item)
- #END for each item
- #END for each base
-
+ # END for each item
+ # END for each base
+
def __new__(metacls, name, bases, clsdict):
assert metacls.decorator, "'decorator' member needs to be set in subclass"
base_metacls = metacls._find_metacls(bases)
metacls._patch_methods_recursive(bases, clsdict)
return base_metacls.__new__(base_metacls, name, bases, clsdict)
-
+
#} END meta classes
-
+
+
class TestBase(TestCase):
+
"""
Base Class providing default functionality to all tests such as:
- Utility functions provided by the TestCase base of the unittest method such as::
self.fail("todo")
self.failUnlessRaises(...)
"""
-
+
@classmethod
def setUp(cls):
"""This method is only called to provide the most basic functionality
Subclasses may just override it or implement it differently"""
cls.rorepo = Repo(rorepo_dir())
-
+
def _make_file(self, rela_path, data, repo=None):
"""
Create a file at the given path relative to our repository, filled