diff options
Diffstat (limited to 'test/git/async')
-rw-r--r-- | test/git/async/task.py | 26 | ||||
-rw-r--r-- | test/git/async/test_performance.py | 4 | ||||
-rw-r--r-- | test/git/async/test_pool.py | 15 |
3 files changed, 26 insertions, 19 deletions
diff --git a/test/git/async/task.py b/test/git/async/task.py index f3599efe..583cb1f8 100644 --- a/test/git/async/task.py +++ b/test/git/async/task.py @@ -51,18 +51,18 @@ class _TestTaskBase(object): return self -class TestThreadTaskNode(_TestTaskBase, InputIteratorThreadTask): +class TestThreadTask(_TestTaskBase, IteratorThreadTask): pass -class TestThreadFailureNode(TestThreadTaskNode): +class TestFailureThreadTask(TestThreadTask): """Fails after X items""" def __init__(self, *args, **kwargs): self.fail_after = kwargs.pop('fail_after') - super(TestThreadFailureNode, self).__init__(*args, **kwargs) + super(TestFailureThreadTask, self).__init__(*args, **kwargs) def do_fun(self, item): - item = TestThreadTaskNode.do_fun(self, item) + item = TestThreadTask.do_fun(self, item) self.lock.acquire() try: @@ -74,15 +74,15 @@ class TestThreadFailureNode(TestThreadTaskNode): return item -class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask): +class TestChannelThreadTask(_TestTaskBase, ChannelThreadTask): """Apply a transformation on items read from an input channel""" def __init__(self, *args, **kwargs): self.fail_after = kwargs.pop('fail_after', 0) - super(TestThreadInputChannelTaskNode, self).__init__(*args, **kwargs) + super(TestChannelThreadTask, self).__init__(*args, **kwargs) def do_fun(self, item): """return tuple(i, i*2)""" - item = super(TestThreadInputChannelTaskNode, self).do_fun(item) + item = super(TestChannelThreadTask, self).do_fun(item) # fail after support if self.fail_after: @@ -102,7 +102,7 @@ class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask): # END handle tuple -class TestThreadPerformanceTaskNode(InputChannelTask): +class TestPerformanceThreadTask(ChannelThreadTask): """Applies no operation to the item, and does not lock, measuring the actual throughput of the system""" @@ -110,14 +110,14 @@ class TestThreadPerformanceTaskNode(InputChannelTask): return item -class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask): +class TestVerifyChannelThreadTask(_TestTaskBase, ChannelThreadTask): """An input channel task, which verifies the result of its input channels, should be last in the chain. Id must be int""" def do_fun(self, item): """return tuple(i, i*2)""" - item = super(TestThreadInputChannelVerifyTaskNode, self).do_fun(item) + item = super(TestVerifyChannelThreadTask, self).do_fun(item) # make sure the computation order matches assert isinstance(item, tuple), "input was no tuple: %s" % item @@ -137,7 +137,7 @@ def make_proxy_method(t): return lambda item: wt.do_fun(item) def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0, - feedercls=TestThreadTaskNode, transformercls=TestThreadInputChannelTaskNode, + feedercls=TestThreadTask, transformercls=TestChannelThreadTask, include_verifier=True): """Create a task chain of feeder, count transformers and order verifcator to the pool p, like t1 -> t2 -> t3 @@ -179,7 +179,7 @@ def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_of # END setup failure if include_verifier: - verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None) + verifier = TestVerifyChannelThreadTask(inrc, 'verifier', None) #verifier.fun = verifier.do_fun verifier.fun = make_proxy_method(verifier) vrc = p.add_task(verifier) @@ -190,7 +190,7 @@ def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_of # END handle include verifier return tasks, rcs -def make_iterator_task(ni, taskcls=TestThreadTaskNode, **kwargs): +def make_iterator_task(ni, taskcls=TestThreadTask, **kwargs): """:return: task which yields ni items :param taskcls: the actual iterator type to use :param **kwargs: additional kwargs to be passed to the task""" diff --git a/test/git/async/test_performance.py b/test/git/async/test_performance.py index 896d230e..703c8593 100644 --- a/test/git/async/test_performance.py +++ b/test/git/async/test_performance.py @@ -26,8 +26,8 @@ class TestThreadPoolPerformance(TestBase): for num_transformers in (1, 5, 10): for read_mode in range(2): ts, rcs = add_task_chain(pool, ni, count=num_transformers, - feedercls=InputIteratorThreadTask, - transformercls=TestThreadPerformanceTaskNode, + feedercls=IteratorThreadTask, + transformercls=TestPerformanceThreadTask, include_verifier=False) mode_info = "read(0)" diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 0042c4a8..aab618aa 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -198,7 +198,7 @@ class TestThreadPool(TestBase): # test failure after ni / 2 items # This makes sure it correctly closes the channel on failure to prevent blocking nri = ni/2 - task = make_task(TestThreadFailureNode, fail_after=ni/2) + task = make_task(TestFailureThreadTask, fail_after=ni/2) rc = p.add_task(task) assert len(rc.read()) == nri assert task.is_done() @@ -374,7 +374,14 @@ class TestThreadPool(TestBase): @terminate_threads def test_base(self): - assert len(threading.enumerate()) == 1 + max_wait_attempts = 3 + sleep_time = 0.1 + for mc in range(max_wait_attempts): + # wait for threads to die + if len(threading.enumerate()) != 1: + time.sleep(sleep_time) + # END for each attempt + assert len(threading.enumerate()) == 1, "Waited %f s for threads to die, its still alive" % (max_wait_attempts, sleep_time) p = ThreadPool() @@ -401,7 +408,7 @@ class TestThreadPool(TestBase): # SINGLE TASK SERIAL SYNC MODE ############################## # put a few unrelated tasks that we forget about - check ref counts and cleanup - t1, t2 = TestThreadTaskNode(iter(list()), "nothing1", None), TestThreadTaskNode(iter(list()), "nothing2", None) + t1, t2 = TestThreadTask(iter(list()), "nothing1", None), TestThreadTask(iter(list()), "nothing2", None) urc1 = p.add_task(t1) urc2 = p.add_task(t2) assert p.num_tasks() == 2 @@ -416,7 +423,7 @@ class TestThreadPool(TestBase): assert p.num_tasks() == 0 assert sys.getrefcount(t2) == 2 - t3 = TestThreadInputChannelTaskNode(urc2, "channel", None) + t3 = TestChannelThreadTask(urc2, "channel", None) urc3 = p.add_task(t3) assert p.num_tasks() == 1 del(urc3) |