summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-12 12:38:02 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-12 12:40:07 +0200
commit7a0b79ee574999ecbc76696506352e4a5a0d7159 (patch)
treee0e2aa63b7dc649083858366eaedb6ac4cc5739b
parent1d8a577ffc6ad7ce1465001ddebdc157aecc1617 (diff)
downloadgitpython-7a0b79ee574999ecbc76696506352e4a5a0d7159.tar.gz
task: improved naming of task types, improved pool test to be less dependent on starting with just the main thread
-rw-r--r--lib/git/async/task.py26
-rw-r--r--test/git/async/task.py26
-rw-r--r--test/git/async/test_performance.py4
-rw-r--r--test/git/async/test_pool.py15
4 files changed, 42 insertions, 29 deletions
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index b7b5e699..ac948dc0 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -8,21 +8,27 @@ import weakref
import sys
import new
-__all__ = ('OutputChannelTask', 'ThreadTaskBase', 'InputIteratorTaskBase',
- 'InputIteratorThreadTask', 'InputChannelTask')
+__all__ = ('Task', 'ThreadTaskBase', 'IteratorTaskBase',
+ 'IteratorThreadTask', 'ChannelThreadTask')
-class OutputChannelTask(Node):
+class Task(Node):
"""Abstracts a named task, which contains
additional information on how the task should be queued and processed.
- Results of the item processing are sent to a write channel, which is to be
+ Results of the item processing are sent to a writer, which is to be
set by the creator using the ``set_writer`` method.
+ Items are read using the internal ``_read`` callable, subclasses are meant to
+ set this to a callable that supports the Reader interface's read function.
+
* **min_count** assures that not less than min_count items will be processed per call.
* **max_chunksize** assures that multi-threading is happening in smaller chunks. If
someone wants all items to be processed, using read(0), the whole task would go to
one worker, as well as dependent tasks. If you want finer granularity , you can
- specify this here, causing chunks to be no larger than max_chunksize"""
+ specify this here, causing chunks to be no larger than max_chunksize
+ * **apply_single** if True, default True, individual items will be given to the
+ worker function. If False, a list of possibly multiple items will be passed
+ instead."""
__slots__ = ( '_read', # method to yield items to process
'_out_writer', # output write channel
'_exc', # exception caught
@@ -178,32 +184,32 @@ class ThreadTaskBase(object):
pass
-class InputIteratorTaskBase(OutputChannelTask):
+class IteratorTaskBase(Task):
"""Implements a task which processes items from an iterable in a multi-processing
safe manner"""
__slots__ = tuple()
def __init__(self, iterator, *args, **kwargs):
- OutputChannelTask.__init__(self, *args, **kwargs)
+ Task.__init__(self, *args, **kwargs)
self._read = IteratorReader(iterator).read
# defaults to returning our items unchanged
self.fun = lambda item: item
-class InputIteratorThreadTask(InputIteratorTaskBase, ThreadTaskBase):
+class IteratorThreadTask(IteratorTaskBase, ThreadTaskBase):
"""An input iterator for threaded pools"""
lock_type = threading.Lock
-class InputChannelTask(OutputChannelTask, ThreadTaskBase):
+class ChannelThreadTask(Task, ThreadTaskBase):
"""Uses an input channel as source for reading items
For instantiation, it takes all arguments of its base, the first one needs
to be the input channel to read from though."""
__slots__ = "_pool_ref"
def __init__(self, in_reader, *args, **kwargs):
- OutputChannelTask.__init__(self, *args, **kwargs)
+ Task.__init__(self, *args, **kwargs)
self._read = in_reader.read
self._pool_ref = None
diff --git a/test/git/async/task.py b/test/git/async/task.py
index f3599efe..583cb1f8 100644
--- a/test/git/async/task.py
+++ b/test/git/async/task.py
@@ -51,18 +51,18 @@ class _TestTaskBase(object):
return self
-class TestThreadTaskNode(_TestTaskBase, InputIteratorThreadTask):
+class TestThreadTask(_TestTaskBase, IteratorThreadTask):
pass
-class TestThreadFailureNode(TestThreadTaskNode):
+class TestFailureThreadTask(TestThreadTask):
"""Fails after X items"""
def __init__(self, *args, **kwargs):
self.fail_after = kwargs.pop('fail_after')
- super(TestThreadFailureNode, self).__init__(*args, **kwargs)
+ super(TestFailureThreadTask, self).__init__(*args, **kwargs)
def do_fun(self, item):
- item = TestThreadTaskNode.do_fun(self, item)
+ item = TestThreadTask.do_fun(self, item)
self.lock.acquire()
try:
@@ -74,15 +74,15 @@ class TestThreadFailureNode(TestThreadTaskNode):
return item
-class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask):
+class TestChannelThreadTask(_TestTaskBase, ChannelThreadTask):
"""Apply a transformation on items read from an input channel"""
def __init__(self, *args, **kwargs):
self.fail_after = kwargs.pop('fail_after', 0)
- super(TestThreadInputChannelTaskNode, self).__init__(*args, **kwargs)
+ super(TestChannelThreadTask, self).__init__(*args, **kwargs)
def do_fun(self, item):
"""return tuple(i, i*2)"""
- item = super(TestThreadInputChannelTaskNode, self).do_fun(item)
+ item = super(TestChannelThreadTask, self).do_fun(item)
# fail after support
if self.fail_after:
@@ -102,7 +102,7 @@ class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask):
# END handle tuple
-class TestThreadPerformanceTaskNode(InputChannelTask):
+class TestPerformanceThreadTask(ChannelThreadTask):
"""Applies no operation to the item, and does not lock, measuring
the actual throughput of the system"""
@@ -110,14 +110,14 @@ class TestThreadPerformanceTaskNode(InputChannelTask):
return item
-class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask):
+class TestVerifyChannelThreadTask(_TestTaskBase, ChannelThreadTask):
"""An input channel task, which verifies the result of its input channels,
should be last in the chain.
Id must be int"""
def do_fun(self, item):
"""return tuple(i, i*2)"""
- item = super(TestThreadInputChannelVerifyTaskNode, self).do_fun(item)
+ item = super(TestVerifyChannelThreadTask, self).do_fun(item)
# make sure the computation order matches
assert isinstance(item, tuple), "input was no tuple: %s" % item
@@ -137,7 +137,7 @@ def make_proxy_method(t):
return lambda item: wt.do_fun(item)
def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0,
- feedercls=TestThreadTaskNode, transformercls=TestThreadInputChannelTaskNode,
+ feedercls=TestThreadTask, transformercls=TestChannelThreadTask,
include_verifier=True):
"""Create a task chain of feeder, count transformers and order verifcator
to the pool p, like t1 -> t2 -> t3
@@ -179,7 +179,7 @@ def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_of
# END setup failure
if include_verifier:
- verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None)
+ verifier = TestVerifyChannelThreadTask(inrc, 'verifier', None)
#verifier.fun = verifier.do_fun
verifier.fun = make_proxy_method(verifier)
vrc = p.add_task(verifier)
@@ -190,7 +190,7 @@ def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_of
# END handle include verifier
return tasks, rcs
-def make_iterator_task(ni, taskcls=TestThreadTaskNode, **kwargs):
+def make_iterator_task(ni, taskcls=TestThreadTask, **kwargs):
""":return: task which yields ni items
:param taskcls: the actual iterator type to use
:param **kwargs: additional kwargs to be passed to the task"""
diff --git a/test/git/async/test_performance.py b/test/git/async/test_performance.py
index 896d230e..703c8593 100644
--- a/test/git/async/test_performance.py
+++ b/test/git/async/test_performance.py
@@ -26,8 +26,8 @@ class TestThreadPoolPerformance(TestBase):
for num_transformers in (1, 5, 10):
for read_mode in range(2):
ts, rcs = add_task_chain(pool, ni, count=num_transformers,
- feedercls=InputIteratorThreadTask,
- transformercls=TestThreadPerformanceTaskNode,
+ feedercls=IteratorThreadTask,
+ transformercls=TestPerformanceThreadTask,
include_verifier=False)
mode_info = "read(0)"
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py
index 0042c4a8..aab618aa 100644
--- a/test/git/async/test_pool.py
+++ b/test/git/async/test_pool.py
@@ -198,7 +198,7 @@ class TestThreadPool(TestBase):
# test failure after ni / 2 items
# This makes sure it correctly closes the channel on failure to prevent blocking
nri = ni/2
- task = make_task(TestThreadFailureNode, fail_after=ni/2)
+ task = make_task(TestFailureThreadTask, fail_after=ni/2)
rc = p.add_task(task)
assert len(rc.read()) == nri
assert task.is_done()
@@ -374,7 +374,14 @@ class TestThreadPool(TestBase):
@terminate_threads
def test_base(self):
- assert len(threading.enumerate()) == 1
+ max_wait_attempts = 3
+ sleep_time = 0.1
+ for mc in range(max_wait_attempts):
+ # wait for threads to die
+ if len(threading.enumerate()) != 1:
+ time.sleep(sleep_time)
+ # END for each attempt
+ assert len(threading.enumerate()) == 1, "Waited %f s for threads to die, its still alive" % (max_wait_attempts, sleep_time)
p = ThreadPool()
@@ -401,7 +408,7 @@ class TestThreadPool(TestBase):
# SINGLE TASK SERIAL SYNC MODE
##############################
# put a few unrelated tasks that we forget about - check ref counts and cleanup
- t1, t2 = TestThreadTaskNode(iter(list()), "nothing1", None), TestThreadTaskNode(iter(list()), "nothing2", None)
+ t1, t2 = TestThreadTask(iter(list()), "nothing1", None), TestThreadTask(iter(list()), "nothing2", None)
urc1 = p.add_task(t1)
urc2 = p.add_task(t2)
assert p.num_tasks() == 2
@@ -416,7 +423,7 @@ class TestThreadPool(TestBase):
assert p.num_tasks() == 0
assert sys.getrefcount(t2) == 2
- t3 = TestThreadInputChannelTaskNode(urc2, "channel", None)
+ t3 = TestChannelThreadTask(urc2, "channel", None)
urc3 = p.add_task(t3)
assert p.num_tasks() == 1
del(urc3)