diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-12 12:38:02 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-12 12:40:07 +0200 |
commit | 7a0b79ee574999ecbc76696506352e4a5a0d7159 (patch) | |
tree | e0e2aa63b7dc649083858366eaedb6ac4cc5739b | |
parent | 1d8a577ffc6ad7ce1465001ddebdc157aecc1617 (diff) | |
download | gitpython-7a0b79ee574999ecbc76696506352e4a5a0d7159.tar.gz |
task: improved naming of task types, improved pool test to be less dependent on starting with just the main thread
-rw-r--r-- | lib/git/async/task.py | 26 | ||||
-rw-r--r-- | test/git/async/task.py | 26 | ||||
-rw-r--r-- | test/git/async/test_performance.py | 4 | ||||
-rw-r--r-- | test/git/async/test_pool.py | 15 |
4 files changed, 42 insertions, 29 deletions
diff --git a/lib/git/async/task.py b/lib/git/async/task.py index b7b5e699..ac948dc0 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -8,21 +8,27 @@ import weakref import sys import new -__all__ = ('OutputChannelTask', 'ThreadTaskBase', 'InputIteratorTaskBase', - 'InputIteratorThreadTask', 'InputChannelTask') +__all__ = ('Task', 'ThreadTaskBase', 'IteratorTaskBase', + 'IteratorThreadTask', 'ChannelThreadTask') -class OutputChannelTask(Node): +class Task(Node): """Abstracts a named task, which contains additional information on how the task should be queued and processed. - Results of the item processing are sent to a write channel, which is to be + Results of the item processing are sent to a writer, which is to be set by the creator using the ``set_writer`` method. + Items are read using the internal ``_read`` callable, subclasses are meant to + set this to a callable that supports the Reader interface's read function. + * **min_count** assures that not less than min_count items will be processed per call. * **max_chunksize** assures that multi-threading is happening in smaller chunks. If someone wants all items to be processed, using read(0), the whole task would go to one worker, as well as dependent tasks. If you want finer granularity , you can - specify this here, causing chunks to be no larger than max_chunksize""" + specify this here, causing chunks to be no larger than max_chunksize + * **apply_single** if True, default True, individual items will be given to the + worker function. If False, a list of possibly multiple items will be passed + instead.""" __slots__ = ( '_read', # method to yield items to process '_out_writer', # output write channel '_exc', # exception caught @@ -178,32 +184,32 @@ class ThreadTaskBase(object): pass -class InputIteratorTaskBase(OutputChannelTask): +class IteratorTaskBase(Task): """Implements a task which processes items from an iterable in a multi-processing safe manner""" __slots__ = tuple() def __init__(self, iterator, *args, **kwargs): - OutputChannelTask.__init__(self, *args, **kwargs) + Task.__init__(self, *args, **kwargs) self._read = IteratorReader(iterator).read # defaults to returning our items unchanged self.fun = lambda item: item -class InputIteratorThreadTask(InputIteratorTaskBase, ThreadTaskBase): +class IteratorThreadTask(IteratorTaskBase, ThreadTaskBase): """An input iterator for threaded pools""" lock_type = threading.Lock -class InputChannelTask(OutputChannelTask, ThreadTaskBase): +class ChannelThreadTask(Task, ThreadTaskBase): """Uses an input channel as source for reading items For instantiation, it takes all arguments of its base, the first one needs to be the input channel to read from though.""" __slots__ = "_pool_ref" def __init__(self, in_reader, *args, **kwargs): - OutputChannelTask.__init__(self, *args, **kwargs) + Task.__init__(self, *args, **kwargs) self._read = in_reader.read self._pool_ref = None diff --git a/test/git/async/task.py b/test/git/async/task.py index f3599efe..583cb1f8 100644 --- a/test/git/async/task.py +++ b/test/git/async/task.py @@ -51,18 +51,18 @@ class _TestTaskBase(object): return self -class TestThreadTaskNode(_TestTaskBase, InputIteratorThreadTask): +class TestThreadTask(_TestTaskBase, IteratorThreadTask): pass -class TestThreadFailureNode(TestThreadTaskNode): +class TestFailureThreadTask(TestThreadTask): """Fails after X items""" def __init__(self, *args, **kwargs): self.fail_after = kwargs.pop('fail_after') - super(TestThreadFailureNode, self).__init__(*args, **kwargs) + super(TestFailureThreadTask, self).__init__(*args, **kwargs) def do_fun(self, item): - item = TestThreadTaskNode.do_fun(self, item) + item = TestThreadTask.do_fun(self, item) self.lock.acquire() try: @@ -74,15 +74,15 @@ class TestThreadFailureNode(TestThreadTaskNode): return item -class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask): +class TestChannelThreadTask(_TestTaskBase, ChannelThreadTask): """Apply a transformation on items read from an input channel""" def __init__(self, *args, **kwargs): self.fail_after = kwargs.pop('fail_after', 0) - super(TestThreadInputChannelTaskNode, self).__init__(*args, **kwargs) + super(TestChannelThreadTask, self).__init__(*args, **kwargs) def do_fun(self, item): """return tuple(i, i*2)""" - item = super(TestThreadInputChannelTaskNode, self).do_fun(item) + item = super(TestChannelThreadTask, self).do_fun(item) # fail after support if self.fail_after: @@ -102,7 +102,7 @@ class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask): # END handle tuple -class TestThreadPerformanceTaskNode(InputChannelTask): +class TestPerformanceThreadTask(ChannelThreadTask): """Applies no operation to the item, and does not lock, measuring the actual throughput of the system""" @@ -110,14 +110,14 @@ class TestThreadPerformanceTaskNode(InputChannelTask): return item -class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask): +class TestVerifyChannelThreadTask(_TestTaskBase, ChannelThreadTask): """An input channel task, which verifies the result of its input channels, should be last in the chain. Id must be int""" def do_fun(self, item): """return tuple(i, i*2)""" - item = super(TestThreadInputChannelVerifyTaskNode, self).do_fun(item) + item = super(TestVerifyChannelThreadTask, self).do_fun(item) # make sure the computation order matches assert isinstance(item, tuple), "input was no tuple: %s" % item @@ -137,7 +137,7 @@ def make_proxy_method(t): return lambda item: wt.do_fun(item) def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0, - feedercls=TestThreadTaskNode, transformercls=TestThreadInputChannelTaskNode, + feedercls=TestThreadTask, transformercls=TestChannelThreadTask, include_verifier=True): """Create a task chain of feeder, count transformers and order verifcator to the pool p, like t1 -> t2 -> t3 @@ -179,7 +179,7 @@ def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_of # END setup failure if include_verifier: - verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None) + verifier = TestVerifyChannelThreadTask(inrc, 'verifier', None) #verifier.fun = verifier.do_fun verifier.fun = make_proxy_method(verifier) vrc = p.add_task(verifier) @@ -190,7 +190,7 @@ def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_of # END handle include verifier return tasks, rcs -def make_iterator_task(ni, taskcls=TestThreadTaskNode, **kwargs): +def make_iterator_task(ni, taskcls=TestThreadTask, **kwargs): """:return: task which yields ni items :param taskcls: the actual iterator type to use :param **kwargs: additional kwargs to be passed to the task""" diff --git a/test/git/async/test_performance.py b/test/git/async/test_performance.py index 896d230e..703c8593 100644 --- a/test/git/async/test_performance.py +++ b/test/git/async/test_performance.py @@ -26,8 +26,8 @@ class TestThreadPoolPerformance(TestBase): for num_transformers in (1, 5, 10): for read_mode in range(2): ts, rcs = add_task_chain(pool, ni, count=num_transformers, - feedercls=InputIteratorThreadTask, - transformercls=TestThreadPerformanceTaskNode, + feedercls=IteratorThreadTask, + transformercls=TestPerformanceThreadTask, include_verifier=False) mode_info = "read(0)" diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 0042c4a8..aab618aa 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -198,7 +198,7 @@ class TestThreadPool(TestBase): # test failure after ni / 2 items # This makes sure it correctly closes the channel on failure to prevent blocking nri = ni/2 - task = make_task(TestThreadFailureNode, fail_after=ni/2) + task = make_task(TestFailureThreadTask, fail_after=ni/2) rc = p.add_task(task) assert len(rc.read()) == nri assert task.is_done() @@ -374,7 +374,14 @@ class TestThreadPool(TestBase): @terminate_threads def test_base(self): - assert len(threading.enumerate()) == 1 + max_wait_attempts = 3 + sleep_time = 0.1 + for mc in range(max_wait_attempts): + # wait for threads to die + if len(threading.enumerate()) != 1: + time.sleep(sleep_time) + # END for each attempt + assert len(threading.enumerate()) == 1, "Waited %f s for threads to die, its still alive" % (max_wait_attempts, sleep_time) p = ThreadPool() @@ -401,7 +408,7 @@ class TestThreadPool(TestBase): # SINGLE TASK SERIAL SYNC MODE ############################## # put a few unrelated tasks that we forget about - check ref counts and cleanup - t1, t2 = TestThreadTaskNode(iter(list()), "nothing1", None), TestThreadTaskNode(iter(list()), "nothing2", None) + t1, t2 = TestThreadTask(iter(list()), "nothing1", None), TestThreadTask(iter(list()), "nothing2", None) urc1 = p.add_task(t1) urc2 = p.add_task(t2) assert p.num_tasks() == 2 @@ -416,7 +423,7 @@ class TestThreadPool(TestBase): assert p.num_tasks() == 0 assert sys.getrefcount(t2) == 2 - t3 = TestThreadInputChannelTaskNode(urc2, "channel", None) + t3 = TestChannelThreadTask(urc2, "channel", None) urc3 = p.add_task(t3) assert p.num_tasks() == 1 del(urc3) |