summaryrefslogtreecommitdiff
path: root/test/git
diff options
context:
space:
mode:
Diffstat (limited to 'test/git')
-rw-r--r--test/git/async/task.py38
-rw-r--r--test/git/async/test_performance.py51
2 files changed, 76 insertions, 13 deletions
diff --git a/test/git/async/task.py b/test/git/async/task.py
index 9cc3cb9d..f3599efe 100644
--- a/test/git/async/task.py
+++ b/test/git/async/task.py
@@ -102,6 +102,14 @@ class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask):
# END handle tuple
+class TestThreadPerformanceTaskNode(InputChannelTask):
+ """Applies no operation to the item, and does not lock, measuring
+ the actual throughput of the system"""
+
+ def do_fun(self, item):
+ return item
+
+
class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask):
"""An input channel task, which verifies the result of its input channels,
should be last in the chain.
@@ -121,7 +129,6 @@ class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask):
return item
-
#{ Utilities
def make_proxy_method(t):
@@ -129,7 +136,9 @@ def make_proxy_method(t):
wt = weakref.proxy(t)
return lambda item: wt.do_fun(item)
-def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0):
+def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0,
+ feedercls=TestThreadTaskNode, transformercls=TestThreadInputChannelTaskNode,
+ include_verifier=True):
"""Create a task chain of feeder, count transformers and order verifcator
to the pool p, like t1 -> t2 -> t3
:param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would
@@ -145,7 +154,7 @@ def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_of
feeder = None
frc = feeder_channel
if feeder_channel is None:
- feeder = make_iterator_task(ni)
+ feeder = make_iterator_task(ni, taskcls=feedercls)
frc = p.add_task(feeder)
# END handle specific feeder
@@ -154,7 +163,7 @@ def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_of
inrc = frc
for tc in xrange(count):
- t = TestThreadInputChannelTaskNode(inrc, tc+id_offset, None)
+ t = transformercls(inrc, tc+id_offset, None)
t.fun = make_proxy_method(t)
#t.fun = t.do_fun
@@ -169,14 +178,16 @@ def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_of
tasks[1+id].fail_after = fail_after
# END setup failure
- verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None)
- #verifier.fun = verifier.do_fun
- verifier.fun = make_proxy_method(verifier)
- vrc = p.add_task(verifier)
-
-
- tasks.append(verifier)
- rcs.append(vrc)
+ if include_verifier:
+ verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None)
+ #verifier.fun = verifier.do_fun
+ verifier.fun = make_proxy_method(verifier)
+ vrc = p.add_task(verifier)
+
+
+ tasks.append(verifier)
+ rcs.append(vrc)
+ # END handle include verifier
return tasks, rcs
def make_iterator_task(ni, taskcls=TestThreadTaskNode, **kwargs):
@@ -184,7 +195,8 @@ def make_iterator_task(ni, taskcls=TestThreadTaskNode, **kwargs):
:param taskcls: the actual iterator type to use
:param **kwargs: additional kwargs to be passed to the task"""
t = taskcls(iter(range(ni)), 'iterator', None, **kwargs)
- t.fun = make_proxy_method(t)
+ if isinstance(t, _TestTaskBase):
+ t.fun = make_proxy_method(t)
return t
#} END utilities
diff --git a/test/git/async/test_performance.py b/test/git/async/test_performance.py
new file mode 100644
index 00000000..896d230e
--- /dev/null
+++ b/test/git/async/test_performance.py
@@ -0,0 +1,51 @@
+"""Channel testing"""
+from test.testlib import *
+from task import *
+
+from git.async.pool import *
+from git.async.thread import terminate_threads
+from git.async.util import cpu_count
+
+import time
+import sys
+
+
+
+class TestThreadPoolPerformance(TestBase):
+
+ max_threads = cpu_count()
+
+ def test_base(self):
+ # create a dependency network, and see how the performance changes
+ # when adjusting the amount of threads
+ pool = ThreadPool(0)
+ ni = 1000 # number of items to process
+ print self.max_threads
+ for num_threads in range(self.max_threads*2 + 1):
+ pool.set_size(num_threads)
+ for num_transformers in (1, 5, 10):
+ for read_mode in range(2):
+ ts, rcs = add_task_chain(pool, ni, count=num_transformers,
+ feedercls=InputIteratorThreadTask,
+ transformercls=TestThreadPerformanceTaskNode,
+ include_verifier=False)
+
+ mode_info = "read(0)"
+ if read_mode == 1:
+ mode_info = "read(1) * %i" % ni
+ # END mode info
+ fmt = "Threadcount=%%i: Produced %%i items using %s in %%i transformations in %%f s (%%f items / s)" % mode_info
+ reader = rcs[-1]
+ st = time.time()
+ if read_mode == 1:
+ for i in xrange(ni):
+ assert len(reader.read(1)) == 1
+ # END for each item to read
+ else:
+ assert len(reader.read(0)) == ni
+ # END handle read mode
+ elapsed = time.time() - st
+ print >> sys.stderr, fmt % (num_threads, ni, num_transformers, elapsed, ni / elapsed)
+ # END for each read-mode
+ # END for each amount of processors
+ # END for each thread count