diff options
Diffstat (limited to 'git/test/lib')
-rw-r--r-- | git/test/lib/__init__.py | 6 | ||||
-rw-r--r-- | git/test/lib/asserts.py | 18 | ||||
-rw-r--r-- | git/test/lib/base.py | 76 | ||||
-rw-r--r-- | git/test/lib/helper.py | 139 |
4 files changed, 137 insertions, 102 deletions
diff --git a/git/test/lib/__init__.py b/git/test/lib/__init__.py index a0656438..a94b6617 100644 --- a/git/test/lib/__init__.py +++ b/git/test/lib/__init__.py @@ -5,7 +5,7 @@ # the BSD License: http://www.opensource.org/licenses/bsd-license.php import inspect -# TODO: Separate names - they do repeat unfortunately. Also deduplicate it, +# TODO: Separate names - they do repeat unfortunately. Also deduplicate it, # redesign decorators to support multiple database types in succession. from base import * @@ -14,5 +14,5 @@ from asserts import * from helper import * -__all__ = [ name for name, obj in locals().items() - if not (name.startswith('_') or inspect.ismodule(obj)) ] +__all__ = [name for name, obj in locals().items() + if not (name.startswith('_') or inspect.ismodule(obj))] diff --git a/git/test/lib/asserts.py b/git/test/lib/asserts.py index fa754b92..351901dc 100644 --- a/git/test/lib/asserts.py +++ b/git/test/lib/asserts.py @@ -10,41 +10,49 @@ from nose import tools from nose.tools import * import stat -__all__ = ['assert_instance_of', 'assert_not_instance_of', +__all__ = ['assert_instance_of', 'assert_not_instance_of', 'assert_none', 'assert_not_none', 'assert_match', 'assert_not_match', 'assert_mode_644', 'assert_mode_755'] + tools.__all__ + def assert_instance_of(expected, actual, msg=None): """Verify that object is an instance of expected """ assert isinstance(actual, expected), msg + def assert_not_instance_of(expected, actual, msg=None): """Verify that object is not an instance of expected """ assert not isinstance(actual, expected, msg) - + + def assert_none(actual, msg=None): """verify that item is None""" assert actual is None, msg + def assert_not_none(actual, msg=None): """verify that item is None""" assert actual is not None, msg + def assert_match(pattern, string, msg=None): """verify that the pattern matches the string""" assert_not_none(re.search(pattern, string), msg) + def assert_not_match(pattern, string, msg=None): """verify that the pattern does not match the string""" assert_none(re.search(pattern, string), msg) - + + def assert_mode_644(mode): """Verify given mode is 644""" - assert (mode & stat.S_IROTH) and (mode & stat.S_IRGRP) + assert (mode & stat.S_IROTH) and (mode & stat.S_IRGRP) assert (mode & stat.S_IWUSR) and (mode & stat.S_IRUSR) and not (mode & stat.S_IXUSR) + def assert_mode_755(mode): """Verify given mode is 755""" assert (mode & stat.S_IROTH) and (mode & stat.S_IRGRP) and (mode & stat.S_IXOTH) and (mode & stat.S_IXGRP) - assert (mode & stat.S_IWUSR) and (mode & stat.S_IRUSR) and (mode & stat.S_IXUSR)
\ No newline at end of file + assert (mode & stat.S_IWUSR) and (mode & stat.S_IRUSR) and (mode & stat.S_IXUSR) diff --git a/git/test/lib/base.py b/git/test/lib/base.py index 298e8e05..39bc9b73 100644 --- a/git/test/lib/base.py +++ b/git/test/lib/base.py @@ -4,15 +4,15 @@ # the New BSD License: http://www.opensource.org/licenses/bsd-license.php """Utilities used in ODB testing""" from git.base import OStream -from git.stream import ( - Sha1Writer, - ZippedStoreShaWriter - ) +from git.stream import ( + Sha1Writer, + ZippedStoreShaWriter +) from git.util import ( - zlib, - dirname - ) + zlib, + dirname +) import sys import random @@ -32,6 +32,7 @@ import gc def with_rw_directory(func): """Create a temporary directory which can be written to, remove it if the test suceeds, but leave it otherwise to aid additional debugging""" + def wrapper(self): path = maketemp(prefix=func.__name__) os.mkdir(path) @@ -45,7 +46,7 @@ def with_rw_directory(func): raise finally: # Need to collect here to be sure all handles have been closed. It appears - # a windows-only issue. In fact things should be deleted, as well as + # a windows-only issue. In fact things should be deleted, as well as # memory maps closed, once objects go out of scope. For some reason # though this is not the case here unless we collect explicitly. if not keep: @@ -53,7 +54,7 @@ def with_rw_directory(func): shutil.rmtree(path) # END handle exception # END wrapper - + wrapper.__name__ = func.__name__ return wrapper @@ -65,6 +66,7 @@ def with_rw_repo(func): being on a certain branch or on anything really except for the default tags that should exist Wrapped function obtains a git repository """ + def wrapper(self, path): src_dir = dirname(dirname(dirname(__file__))) assert(os.path.isdir(path)) @@ -73,24 +75,24 @@ def with_rw_repo(func): target_gitdir = os.path.join(path, '.git') assert os.path.isdir(target_gitdir) return func(self, self.RepoCls(target_gitdir)) - #END wrapper + # END wrapper wrapper.__name__ = func.__name__ return with_rw_directory(wrapper) - def with_packs_rw(func): """Function that provides a path into which the packs for testing should be copied. Will pass on the path to the actual function afterwards - + :note: needs with_rw_directory wrapped around it""" + def wrapper(self, path): src_pack_glob = fixture_path('packs/*') print src_pack_glob copy_files_globbed(src_pack_glob, path, hard_link_ok=True) return func(self, path) # END wrapper - + wrapper.__name__ = func.__name__ return with_rw_directory(wrapper) @@ -98,6 +100,7 @@ def with_packs_rw(func): #{ Routines + def rorepo_dir(): """:return: path to our own repository, being our own .git directory. :note: doesn't work in bare repositories""" @@ -105,6 +108,7 @@ def rorepo_dir(): assert os.path.isdir(base) return base + def maketemp(*args, **kwargs): """Wrapper around default tempfile.mktemp to fix an osx issue""" tdir = tempfile.mktemp(*args, **kwargs) @@ -112,19 +116,23 @@ def maketemp(*args, **kwargs): tdir = '/private' + tdir return tdir + def fixture_path(relapath=''): """:return: absolute path into the fixture directory :param relapath: relative path into the fixtures directory, or '' to obtain the fixture directory itself""" test_dir = os.path.dirname(os.path.dirname(__file__)) return os.path.join(test_dir, "fixtures", relapath) - + + def fixture(name): return open(fixture_path(name), 'rb').read() + def absolute_project_path(): return os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) + def copy_files_globbed(source_glob, target_dir, hard_link_ok=False): """Copy all files found according to the given source glob into the target directory :param hard_link_ok: if True, hard links will be created if possible. Otherwise @@ -141,7 +149,7 @@ def copy_files_globbed(source_glob, target_dir, hard_link_ok=False): shutil.copy(src_file, target_dir) # END try hard link # END for each file to copy - + def make_bytes(size_in_bytes, randomize=False): """:return: string with given size in bytes @@ -155,11 +163,13 @@ def make_bytes(size_in_bytes, randomize=False): a = array('i', producer) return a.tostring() + def make_object(type, data): """:return: bytes resembling an uncompressed object""" odata = "blob %i\0" % len(data) return odata + data - + + def make_memory_file(size_in_bytes, randomize=False): """:return: tuple(size_of_stream, stream) :param randomize: try to produce a very random stream""" @@ -170,31 +180,33 @@ def make_memory_file(size_in_bytes, randomize=False): #{ Stream Utilities + class DummyStream(object): - def __init__(self): - self.was_read = False - self.bytes = 0 - self.closed = False - - def read(self, size): - self.was_read = True - self.bytes = size - - def close(self): - self.closed = True - - def _assert(self): - assert self.was_read + + def __init__(self): + self.was_read = False + self.bytes = 0 + self.closed = False + + def read(self, size): + self.was_read = True + self.bytes = size + + def close(self): + self.closed = True + + def _assert(self): + assert self.was_read class DeriveTest(OStream): + def __init__(self, sha, type, size, stream, *args, **kwargs): self.myarg = kwargs.pop('myarg') self.args = args - + def _assert(self): assert self.args assert self.myarg #} END stream utilitiess - diff --git a/git/test/lib/helper.py b/git/test/lib/helper.py index bb17745c..db6eb121 100644 --- a/git/test/lib/helper.py +++ b/git/test/lib/helper.py @@ -16,36 +16,37 @@ import warnings from nose import SkipTest from base import ( - maketemp, - rorepo_dir - ) + maketemp, + rorepo_dir +) __all__ = ( - 'StringProcessAdapter', 'GlobalsItemDeletorMetaCls', 'InheritedTestMethodsOverrideWrapperMetaClsAutoMixin', - 'with_rw_repo', 'with_rw_and_rw_remote_repo', 'TestBase', 'TestCase', 'needs_module_or_skip' - ) + 'StringProcessAdapter', 'GlobalsItemDeletorMetaCls', 'InheritedTestMethodsOverrideWrapperMetaClsAutoMixin', + 'with_rw_repo', 'with_rw_and_rw_remote_repo', 'TestBase', 'TestCase', 'needs_module_or_skip' +) - -#{ Adapters - +#{ Adapters + class StringProcessAdapter(object): + """Allows to use strings as Process object as returned by SubProcess.Popen. Its tailored to work with the test system only""" - + def __init__(self, input_string): self.stdout = cStringIO.StringIO(input_string) self.stderr = cStringIO.StringIO() - + def wait(self): return 0 - + poll = wait - + #} END adapters -#{ Decorators +#{ Decorators + def _rmtree_onerror(osremove, fullpath, exec_info): """ @@ -54,35 +55,37 @@ def _rmtree_onerror(osremove, fullpath, exec_info): """ if os.name != 'nt' or osremove is not os.remove: raise - + os.chmod(fullpath, 0777) os.remove(fullpath) + def with_rw_repo(working_tree_ref, bare=False): """ Same as with_bare_repo, but clones the rorepo as non-bare repository, checking out the working tree at the given working_tree_ref. - + This repository type is more costly due to the working copy checkout. - + To make working with relative paths easier, the cwd will be set to the working dir of the repository. """ assert isinstance(working_tree_ref, basestring), "Decorator requires ref name for working tree checkout" + def argument_passer(func): def repo_creator(self): prefix = 'non_' if bare: prefix = '' - #END handle prefix + # END handle prefix repo_dir = maketemp("%sbare_%s" % (prefix, func.__name__)) rw_repo = self.rorepo.clone(repo_dir, shared=True, bare=bare, n=True) - + rw_repo.head.commit = rw_repo.commit(working_tree_ref) if not bare: rw_repo.head.reference.checkout() # END handle checkout - + prev_cwd = os.getcwd() os.chdir(rw_repo.working_dir) try: @@ -104,7 +107,8 @@ def with_rw_repo(working_tree_ref, bare=False): return repo_creator # END argument passer return argument_passer - + + def with_rw_and_rw_remote_repo(working_tree_ref): """ Same as with_rw_repo, but also provides a writable remote repository from which the @@ -112,36 +116,38 @@ def with_rw_and_rw_remote_repo(working_tree_ref): run the remote_repo. The remote repository was cloned as bare repository from the rorepo, wheras the rw repo has a working tree and was cloned from the remote repository. - + remote_repo has two remotes: origin and daemon_origin. One uses a local url, the other uses a server url. The daemon setup must be done on system level and should be an inetd service that serves tempdir.gettempdir() and all directories in it. - + The following scetch demonstrates this:: rorepo ---<bare clone>---> rw_remote_repo ---<clone>---> rw_repo - + The test case needs to support the following signature:: def case(self, rw_repo, rw_remote_repo) - + This setup allows you to test push and pull scenarios and hooks nicely. - + See working dir info in with_rw_repo """ assert isinstance(working_tree_ref, basestring), "Decorator requires ref name for working tree checkout" + def argument_passer(func): def remote_repo_creator(self): remote_repo_dir = maketemp("remote_repo_%s" % func.__name__) repo_dir = maketemp("remote_clone_non_bare_repo") - + rw_remote_repo = self.rorepo.clone(remote_repo_dir, shared=True, bare=True) - rw_repo = rw_remote_repo.clone(repo_dir, shared=True, bare=False, n=True) # recursive alternates info ? + # recursive alternates info ? + rw_repo = rw_remote_repo.clone(repo_dir, shared=True, bare=False, n=True) rw_repo.head.commit = working_tree_ref rw_repo.head.reference.checkout() - + # prepare for git-daemon rw_remote_repo.daemon_export = True - + # this thing is just annoying ! crw = rw_remote_repo.config_writer() section = "daemon" @@ -152,28 +158,30 @@ def with_rw_and_rw_remote_repo(working_tree_ref): crw.set(section, "receivepack", True) # release lock del(crw) - - # initialize the remote - first do it as local remote and pull, then + + # initialize the remote - first do it as local remote and pull, then # we change the url to point to the daemon. The daemon should be started # by the user, not by us d_remote = Remote.create(rw_repo, "daemon_origin", remote_repo_dir) d_remote.fetch() remote_repo_url = "git://localhost%s" % remote_repo_dir - + d_remote.config_writer.set('url', remote_repo_url) - + # try to list remotes to diagnoes whether the server is up try: rw_repo.git.ls_remote(d_remote) - except GitCommandError,e: + except GitCommandError, e: print str(e) if os.name == 'nt': - raise AssertionError('git-daemon needs to run this test, but windows does not have one. Otherwise, run: git-daemon "%s"' % os.path.dirname(_mktemp())) + raise AssertionError( + 'git-daemon needs to run this test, but windows does not have one. Otherwise, run: git-daemon "%s"' % os.path.dirname(_mktemp())) else: - raise AssertionError('Please start a git-daemon to run this test, execute: git-daemon "%s"' % os.path.dirname(_mktemp())) + raise AssertionError( + 'Please start a git-daemon to run this test, execute: git-daemon "%s"' % os.path.dirname(_mktemp())) # END make assertion - #END catch ls remote error - + # END catch ls remote error + # adjust working dir prev_cwd = os.getcwd() os.chdir(rw_repo.working_dir) @@ -191,9 +199,10 @@ def with_rw_and_rw_remote_repo(working_tree_ref): return remote_repo_creator # END remote repo creator # END argument parsser - + return argument_passer - + + def needs_module_or_skip(module): """Decorator to be used for test cases only. Print a warning if the given module could not be imported, and skip the test. @@ -207,25 +216,28 @@ def needs_module_or_skip(module): msg = "Module %r is required to run this test - skipping" % module warnings.warn(msg) raise SkipTest(msg) - #END check import + # END check import return func(self, *args, **kwargs) - #END wrapper + # END wrapper wrapper.__name__ = func.__name__ return wrapper - #END argpasser + # END argpasser return argpasser - + #} END decorators #{ Meta Classes + + class GlobalsItemDeletorMetaCls(type): + """Utiltiy to prevent the RepoBase to be picked up by nose as the metacls will delete the instance from the globals""" #{ Configuration # Set this to a string name of the module to delete ModuleToDelete = None #} END configuration - + def __new__(metacls, name, bases, clsdict): assert metacls.ModuleToDelete is not None, "Invalid metaclass configuration" new_type = super(GlobalsItemDeletorMetaCls, metacls).__new__(metacls, name, bases, clsdict) @@ -235,22 +247,23 @@ class GlobalsItemDeletorMetaCls(type): delattr(mod, metacls.ModuleToDelete) except AttributeError: pass - #END skip case that people import our base without actually using it - #END handle deletion + # END skip case that people import our base without actually using it + # END handle deletion return new_type - - + + class InheritedTestMethodsOverrideWrapperMetaClsAutoMixin(object): + """Automatically picks up the actual metaclass of the the type to be created, that is the one inherited by one of the bases, and patch up its __new__ to use the InheritedTestMethodsOverrideWrapperInstanceDecorator with our configured decorator""" - + #{ Configuration # decorator function to use when wrapping the inherited methods. Put it into a list as first member # to hide it from being created as class method decorator = [] #}END configuration - + @classmethod def _find_metacls(metacls, bases): """emulate pythons lookup""" @@ -259,9 +272,9 @@ class InheritedTestMethodsOverrideWrapperMetaClsAutoMixin(object): if hasattr(base, mcls_attr): return getattr(base, mcls_attr) return metacls._find_metacls(base.__bases__) - #END for each base + # END for each base raise AssertionError("base class had not metaclass attached") - + @classmethod def _patch_methods_recursive(metacls, bases, clsdict): """depth-first patching of methods""" @@ -270,33 +283,35 @@ class InheritedTestMethodsOverrideWrapperMetaClsAutoMixin(object): for name, item in base.__dict__.iteritems(): if not name.startswith('test_'): continue - #END skip non-tests + # END skip non-tests clsdict[name] = metacls.decorator[0](item) - #END for each item - #END for each base - + # END for each item + # END for each base + def __new__(metacls, name, bases, clsdict): assert metacls.decorator, "'decorator' member needs to be set in subclass" base_metacls = metacls._find_metacls(bases) metacls._patch_methods_recursive(bases, clsdict) return base_metacls.__new__(base_metacls, name, bases, clsdict) - + #} END meta classes - + + class TestBase(TestCase): + """ Base Class providing default functionality to all tests such as: - Utility functions provided by the TestCase base of the unittest method such as:: self.fail("todo") self.failUnlessRaises(...) """ - + @classmethod def setUp(cls): """This method is only called to provide the most basic functionality Subclasses may just override it or implement it differently""" cls.rorepo = Repo(rorepo_dir()) - + def _make_file(self, rela_path, data, repo=None): """ Create a file at the given path relative to our repository, filled |