summaryrefslogtreecommitdiff
path: root/test/git/async
diff options
context:
space:
mode:
Diffstat (limited to 'test/git/async')
-rw-r--r--test/git/async/task.py26
-rw-r--r--test/git/async/test_performance.py4
-rw-r--r--test/git/async/test_pool.py15
3 files changed, 26 insertions, 19 deletions
diff --git a/test/git/async/task.py b/test/git/async/task.py
index f3599efe..583cb1f8 100644
--- a/test/git/async/task.py
+++ b/test/git/async/task.py
@@ -51,18 +51,18 @@ class _TestTaskBase(object):
return self
-class TestThreadTaskNode(_TestTaskBase, InputIteratorThreadTask):
+class TestThreadTask(_TestTaskBase, IteratorThreadTask):
pass
-class TestThreadFailureNode(TestThreadTaskNode):
+class TestFailureThreadTask(TestThreadTask):
"""Fails after X items"""
def __init__(self, *args, **kwargs):
self.fail_after = kwargs.pop('fail_after')
- super(TestThreadFailureNode, self).__init__(*args, **kwargs)
+ super(TestFailureThreadTask, self).__init__(*args, **kwargs)
def do_fun(self, item):
- item = TestThreadTaskNode.do_fun(self, item)
+ item = TestThreadTask.do_fun(self, item)
self.lock.acquire()
try:
@@ -74,15 +74,15 @@ class TestThreadFailureNode(TestThreadTaskNode):
return item
-class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask):
+class TestChannelThreadTask(_TestTaskBase, ChannelThreadTask):
"""Apply a transformation on items read from an input channel"""
def __init__(self, *args, **kwargs):
self.fail_after = kwargs.pop('fail_after', 0)
- super(TestThreadInputChannelTaskNode, self).__init__(*args, **kwargs)
+ super(TestChannelThreadTask, self).__init__(*args, **kwargs)
def do_fun(self, item):
"""return tuple(i, i*2)"""
- item = super(TestThreadInputChannelTaskNode, self).do_fun(item)
+ item = super(TestChannelThreadTask, self).do_fun(item)
# fail after support
if self.fail_after:
@@ -102,7 +102,7 @@ class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask):
# END handle tuple
-class TestThreadPerformanceTaskNode(InputChannelTask):
+class TestPerformanceThreadTask(ChannelThreadTask):
"""Applies no operation to the item, and does not lock, measuring
the actual throughput of the system"""
@@ -110,14 +110,14 @@ class TestThreadPerformanceTaskNode(InputChannelTask):
return item
-class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask):
+class TestVerifyChannelThreadTask(_TestTaskBase, ChannelThreadTask):
"""An input channel task, which verifies the result of its input channels,
should be last in the chain.
Id must be int"""
def do_fun(self, item):
"""return tuple(i, i*2)"""
- item = super(TestThreadInputChannelVerifyTaskNode, self).do_fun(item)
+ item = super(TestVerifyChannelThreadTask, self).do_fun(item)
# make sure the computation order matches
assert isinstance(item, tuple), "input was no tuple: %s" % item
@@ -137,7 +137,7 @@ def make_proxy_method(t):
return lambda item: wt.do_fun(item)
def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0,
- feedercls=TestThreadTaskNode, transformercls=TestThreadInputChannelTaskNode,
+ feedercls=TestThreadTask, transformercls=TestChannelThreadTask,
include_verifier=True):
"""Create a task chain of feeder, count transformers and order verifcator
to the pool p, like t1 -> t2 -> t3
@@ -179,7 +179,7 @@ def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_of
# END setup failure
if include_verifier:
- verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None)
+ verifier = TestVerifyChannelThreadTask(inrc, 'verifier', None)
#verifier.fun = verifier.do_fun
verifier.fun = make_proxy_method(verifier)
vrc = p.add_task(verifier)
@@ -190,7 +190,7 @@ def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_of
# END handle include verifier
return tasks, rcs
-def make_iterator_task(ni, taskcls=TestThreadTaskNode, **kwargs):
+def make_iterator_task(ni, taskcls=TestThreadTask, **kwargs):
""":return: task which yields ni items
:param taskcls: the actual iterator type to use
:param **kwargs: additional kwargs to be passed to the task"""
diff --git a/test/git/async/test_performance.py b/test/git/async/test_performance.py
index 896d230e..703c8593 100644
--- a/test/git/async/test_performance.py
+++ b/test/git/async/test_performance.py
@@ -26,8 +26,8 @@ class TestThreadPoolPerformance(TestBase):
for num_transformers in (1, 5, 10):
for read_mode in range(2):
ts, rcs = add_task_chain(pool, ni, count=num_transformers,
- feedercls=InputIteratorThreadTask,
- transformercls=TestThreadPerformanceTaskNode,
+ feedercls=IteratorThreadTask,
+ transformercls=TestPerformanceThreadTask,
include_verifier=False)
mode_info = "read(0)"
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py
index 0042c4a8..aab618aa 100644
--- a/test/git/async/test_pool.py
+++ b/test/git/async/test_pool.py
@@ -198,7 +198,7 @@ class TestThreadPool(TestBase):
# test failure after ni / 2 items
# This makes sure it correctly closes the channel on failure to prevent blocking
nri = ni/2
- task = make_task(TestThreadFailureNode, fail_after=ni/2)
+ task = make_task(TestFailureThreadTask, fail_after=ni/2)
rc = p.add_task(task)
assert len(rc.read()) == nri
assert task.is_done()
@@ -374,7 +374,14 @@ class TestThreadPool(TestBase):
@terminate_threads
def test_base(self):
- assert len(threading.enumerate()) == 1
+ max_wait_attempts = 3
+ sleep_time = 0.1
+ for mc in range(max_wait_attempts):
+ # wait for threads to die
+ if len(threading.enumerate()) != 1:
+ time.sleep(sleep_time)
+ # END for each attempt
+ assert len(threading.enumerate()) == 1, "Waited %f s for threads to die, its still alive" % (max_wait_attempts, sleep_time)
p = ThreadPool()
@@ -401,7 +408,7 @@ class TestThreadPool(TestBase):
# SINGLE TASK SERIAL SYNC MODE
##############################
# put a few unrelated tasks that we forget about - check ref counts and cleanup
- t1, t2 = TestThreadTaskNode(iter(list()), "nothing1", None), TestThreadTaskNode(iter(list()), "nothing2", None)
+ t1, t2 = TestThreadTask(iter(list()), "nothing1", None), TestThreadTask(iter(list()), "nothing2", None)
urc1 = p.add_task(t1)
urc2 = p.add_task(t2)
assert p.num_tasks() == 2
@@ -416,7 +423,7 @@ class TestThreadPool(TestBase):
assert p.num_tasks() == 0
assert sys.getrefcount(t2) == 2
- t3 = TestThreadInputChannelTaskNode(urc2, "channel", None)
+ t3 = TestChannelThreadTask(urc2, "channel", None)
urc3 = p.add_task(t3)
assert p.num_tasks() == 1
del(urc3)