diff options
Diffstat (limited to 'test/git/async')
-rw-r--r-- | test/git/async/__init__.py | 0 | ||||
-rw-r--r-- | test/git/async/task.py | 202 | ||||
-rw-r--r-- | test/git/async/test_channel.py | 87 | ||||
-rw-r--r-- | test/git/async/test_graph.py | 80 | ||||
-rw-r--r-- | test/git/async/test_performance.py | 51 | ||||
-rw-r--r-- | test/git/async/test_pool.py | 476 | ||||
-rw-r--r-- | test/git/async/test_task.py | 15 | ||||
-rw-r--r-- | test/git/async/test_thread.py | 44 |
8 files changed, 0 insertions, 955 deletions
diff --git a/test/git/async/__init__.py b/test/git/async/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/test/git/async/__init__.py +++ /dev/null diff --git a/test/git/async/task.py b/test/git/async/task.py deleted file mode 100644 index 583cb1f8..00000000 --- a/test/git/async/task.py +++ /dev/null @@ -1,202 +0,0 @@ -"""Module containing task implementations useful for testing them""" -from git.async.task import * - -import threading -import weakref - -class _TestTaskBase(object): - """Note: causes great slowdown due to the required locking of task variables""" - def __init__(self, *args, **kwargs): - super(_TestTaskBase, self).__init__(*args, **kwargs) - self.should_fail = False - self.lock = threading.Lock() # yes, can't safely do x = x + 1 :) - self.plock = threading.Lock() - self.item_count = 0 - self.process_count = 0 - - def do_fun(self, item): - self.lock.acquire() - self.item_count += 1 - self.lock.release() - if self.should_fail: - raise AssertionError("I am failing just for the fun of it") - return item - - def process(self, count=1): - # must do it first, otherwise we might read and check results before - # the thread gets here :). Its a lesson ! - self.plock.acquire() - self.process_count += 1 - self.plock.release() - super(_TestTaskBase, self).process(count) - - def _assert(self, pc, fc, check_scheduled=False): - """Assert for num process counts (pc) and num function counts (fc) - :return: self""" - self.lock.acquire() - if self.item_count != fc: - print self.item_count, fc - assert self.item_count == fc - self.lock.release() - - # NOTE: asserting num-writers fails every now and then, implying a thread is - # still processing (an empty chunk) when we are checking it. This can - # only be prevented by checking the scheduled items, which requires locking - # and causes slowdows, so we don't do that. If the num_writers - # counter wouldn't be maintained properly, more tests would fail, so - # we can safely refrain from checking this here - # self._wlock.acquire() - # assert self._num_writers == 0 - # self._wlock.release() - return self - - -class TestThreadTask(_TestTaskBase, IteratorThreadTask): - pass - - -class TestFailureThreadTask(TestThreadTask): - """Fails after X items""" - def __init__(self, *args, **kwargs): - self.fail_after = kwargs.pop('fail_after') - super(TestFailureThreadTask, self).__init__(*args, **kwargs) - - def do_fun(self, item): - item = TestThreadTask.do_fun(self, item) - - self.lock.acquire() - try: - if self.item_count > self.fail_after: - raise AssertionError("Simulated failure after processing %i items" % self.fail_after) - finally: - self.lock.release() - # END handle fail after - return item - - -class TestChannelThreadTask(_TestTaskBase, ChannelThreadTask): - """Apply a transformation on items read from an input channel""" - def __init__(self, *args, **kwargs): - self.fail_after = kwargs.pop('fail_after', 0) - super(TestChannelThreadTask, self).__init__(*args, **kwargs) - - def do_fun(self, item): - """return tuple(i, i*2)""" - item = super(TestChannelThreadTask, self).do_fun(item) - - # fail after support - if self.fail_after: - self.lock.acquire() - try: - if self.item_count > self.fail_after: - raise AssertionError("Simulated failure after processing %i items" % self.fail_after) - finally: - self.lock.release() - # END handle fail-after - - if isinstance(item, tuple): - i = item[0] - return item + (i * self.id, ) - else: - return (item, item * self.id) - # END handle tuple - - -class TestPerformanceThreadTask(ChannelThreadTask): - """Applies no operation to the item, and does not lock, measuring - the actual throughput of the system""" - - def do_fun(self, item): - return item - - -class TestVerifyChannelThreadTask(_TestTaskBase, ChannelThreadTask): - """An input channel task, which verifies the result of its input channels, - should be last in the chain. - Id must be int""" - - def do_fun(self, item): - """return tuple(i, i*2)""" - item = super(TestVerifyChannelThreadTask, self).do_fun(item) - - # make sure the computation order matches - assert isinstance(item, tuple), "input was no tuple: %s" % item - - base = item[0] - for id, num in enumerate(item[1:]): - assert num == base * id, "%i != %i, orig = %s" % (num, base * id, str(item)) - # END verify order - - return item - -#{ Utilities - -def make_proxy_method(t): - """required to prevent binding self into the method we call""" - wt = weakref.proxy(t) - return lambda item: wt.do_fun(item) - -def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0, - feedercls=TestThreadTask, transformercls=TestChannelThreadTask, - include_verifier=True): - """Create a task chain of feeder, count transformers and order verifcator - to the pool p, like t1 -> t2 -> t3 - :param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would - make the third transformer fail after 20 items - :param feeder_channel: if set to a channel, it will be used as input of the - first transformation task. The respective first task in the return value - will be None. - :param id_offset: defines the id of the first transformation task, all subsequent - ones will add one - :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))""" - nt = p.num_tasks() - - feeder = None - frc = feeder_channel - if feeder_channel is None: - feeder = make_iterator_task(ni, taskcls=feedercls) - frc = p.add_task(feeder) - # END handle specific feeder - - rcs = [frc] - tasks = [feeder] - - inrc = frc - for tc in xrange(count): - t = transformercls(inrc, tc+id_offset, None) - - t.fun = make_proxy_method(t) - #t.fun = t.do_fun - inrc = p.add_task(t) - - tasks.append(t) - rcs.append(inrc) - # END create count transformers - - # setup failure - for id, fail_after in fail_setup: - tasks[1+id].fail_after = fail_after - # END setup failure - - if include_verifier: - verifier = TestVerifyChannelThreadTask(inrc, 'verifier', None) - #verifier.fun = verifier.do_fun - verifier.fun = make_proxy_method(verifier) - vrc = p.add_task(verifier) - - - tasks.append(verifier) - rcs.append(vrc) - # END handle include verifier - return tasks, rcs - -def make_iterator_task(ni, taskcls=TestThreadTask, **kwargs): - """:return: task which yields ni items - :param taskcls: the actual iterator type to use - :param **kwargs: additional kwargs to be passed to the task""" - t = taskcls(iter(range(ni)), 'iterator', None, **kwargs) - if isinstance(t, _TestTaskBase): - t.fun = make_proxy_method(t) - return t - -#} END utilities diff --git a/test/git/async/test_channel.py b/test/git/async/test_channel.py deleted file mode 100644 index e9e1b64c..00000000 --- a/test/git/async/test_channel.py +++ /dev/null @@ -1,87 +0,0 @@ -"""Channel testing""" -from test.testlib import * -from git.async.channel import * - -import time - -class TestChannels(TestBase): - - def test_base(self): - # creating channel yields a write and a read channal - wc, rc = mkchannel() - assert isinstance(wc, ChannelWriter) # default args - assert isinstance(rc, ChannelReader) - - - # TEST UNLIMITED SIZE CHANNEL - writing+reading is FIFO - item = 1 - item2 = 2 - wc.write(item) - wc.write(item2) - - # read all - it blocks as its still open for writing - to = 0.2 - st = time.time() - assert rc.read(timeout=to) == [item, item2] - assert time.time() - st >= to - - # next read blocks. it waits a second - st = time.time() - assert len(rc.read(1, True, to)) == 0 - assert time.time() - st >= to - - # writing to a closed channel raises - assert not wc.closed() - wc.close() - assert wc.closed() - wc.close() # fine - assert wc.closed() - - self.failUnlessRaises(ReadOnly, wc.write, 1) - - # reading from a closed channel never blocks - assert len(rc.read()) == 0 - assert len(rc.read(5)) == 0 - assert len(rc.read(1)) == 0 - - - # test callback channels - wc, rc = mkchannel(wtype = CallbackChannelWriter, rtype = CallbackChannelReader) - - cb = [0, 0] # set slots to one if called - def pre_write(item): - cb[0] = 1 - return item + 1 - def pre_read(count): - cb[1] = 1 - - # set, verify it returns previous one - assert wc.set_pre_cb(pre_write) is None - assert rc.set_pre_cb(pre_read) is None - assert wc.set_pre_cb(pre_write) is pre_write - assert rc.set_pre_cb(pre_read) is pre_read - - # writer transforms input - val = 5 - wc.write(val) - assert cb[0] == 1 and cb[1] == 0 - - rval = rc.read(1)[0] # read one item, must not block - assert cb[0] == 1 and cb[1] == 1 - assert rval == val + 1 - - - - # ITERATOR READER - reader = IteratorReader(iter(range(10))) - assert len(reader.read(2)) == 2 - assert len(reader.read(0)) == 8 - # its empty now - assert len(reader.read(0)) == 0 - assert len(reader.read(5)) == 0 - - # doesn't work if item is not an iterator - self.failUnlessRaises(ValueError, IteratorReader, list()) - - # NOTE: its thread-safety is tested by the pool - diff --git a/test/git/async/test_graph.py b/test/git/async/test_graph.py deleted file mode 100644 index 7630226b..00000000 --- a/test/git/async/test_graph.py +++ /dev/null @@ -1,80 +0,0 @@ -"""Channel testing""" -from test.testlib import * -from git.async.graph import * - -import time -import sys - -class TestGraph(TestBase): - - def test_base(self): - g = Graph() - nn = 10 - assert nn > 2, "need at least 3 nodes" - - # add unconnected nodes - for i in range(nn): - assert isinstance(g.add_node(Node()), Node) - # END add nodes - assert len(g.nodes) == nn - - # delete unconnected nodes - for n in g.nodes[:]: - g.remove_node(n) - # END del nodes - - # add a chain of connected nodes - last = None - for i in range(nn): - n = g.add_node(Node(i)) - if last: - assert not last.out_nodes - assert not n.in_nodes - assert g.add_edge(last, n) is g - assert last.out_nodes[0] is n - assert n.in_nodes[0] is last - last = n - # END for each node to connect - - # try to connect a node with itself - self.failUnlessRaises(ValueError, g.add_edge, last, last) - - # try to create a cycle - self.failUnlessRaises(ValueError, g.add_edge, g.nodes[0], g.nodes[-1]) - self.failUnlessRaises(ValueError, g.add_edge, g.nodes[-1], g.nodes[0]) - - # we have undirected edges, readding the same edge, but the other way - # around does not change anything - n1, n2, n3 = g.nodes[0], g.nodes[1], g.nodes[2] - g.add_edge(n1, n2) # already connected - g.add_edge(n2, n1) # same thing - assert len(n1.out_nodes) == 1 - assert len(n1.in_nodes) == 0 - assert len(n2.in_nodes) == 1 - assert len(n2.out_nodes) == 1 - - # deleting a connected node clears its neighbour connections - assert n3.in_nodes[0] is n2 - assert g.remove_node(n2) is g - assert g.remove_node(n2) is g # multi-deletion okay - assert len(g.nodes) == nn - 1 - assert len(n3.in_nodes) == 0 - assert len(n1.out_nodes) == 0 - - # check the history from the last node - end = g.nodes[-1] - dfirst_nodes = g.input_inclusive_dfirst_reversed(end) - num_nodes_seen = nn - 2 # deleted second, which leaves first one disconnected - assert len(dfirst_nodes) == num_nodes_seen - assert dfirst_nodes[-1] == end and dfirst_nodes[-2].id == end.id-1 - - - # test cleanup - # its at least kept by its graph - assert sys.getrefcount(end) > 3 - del(g) - del(n1); del(n2); del(n3) - del(dfirst_nodes) - del(last) - del(n) - assert sys.getrefcount(end) == 2 diff --git a/test/git/async/test_performance.py b/test/git/async/test_performance.py deleted file mode 100644 index 703c8593..00000000 --- a/test/git/async/test_performance.py +++ /dev/null @@ -1,51 +0,0 @@ -"""Channel testing""" -from test.testlib import * -from task import * - -from git.async.pool import * -from git.async.thread import terminate_threads -from git.async.util import cpu_count - -import time -import sys - - - -class TestThreadPoolPerformance(TestBase): - - max_threads = cpu_count() - - def test_base(self): - # create a dependency network, and see how the performance changes - # when adjusting the amount of threads - pool = ThreadPool(0) - ni = 1000 # number of items to process - print self.max_threads - for num_threads in range(self.max_threads*2 + 1): - pool.set_size(num_threads) - for num_transformers in (1, 5, 10): - for read_mode in range(2): - ts, rcs = add_task_chain(pool, ni, count=num_transformers, - feedercls=IteratorThreadTask, - transformercls=TestPerformanceThreadTask, - include_verifier=False) - - mode_info = "read(0)" - if read_mode == 1: - mode_info = "read(1) * %i" % ni - # END mode info - fmt = "Threadcount=%%i: Produced %%i items using %s in %%i transformations in %%f s (%%f items / s)" % mode_info - reader = rcs[-1] - st = time.time() - if read_mode == 1: - for i in xrange(ni): - assert len(reader.read(1)) == 1 - # END for each item to read - else: - assert len(reader.read(0)) == ni - # END handle read mode - elapsed = time.time() - st - print >> sys.stderr, fmt % (num_threads, ni, num_transformers, elapsed, ni / elapsed) - # END for each read-mode - # END for each amount of processors - # END for each thread count diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py deleted file mode 100644 index aab618aa..00000000 --- a/test/git/async/test_pool.py +++ /dev/null @@ -1,476 +0,0 @@ -"""Channel testing""" -from test.testlib import * -from task import * - -from git.async.pool import * -from git.async.thread import terminate_threads -from git.async.util import cpu_count - -import threading -import weakref -import time -import sys - - - -class TestThreadPool(TestBase): - - max_threads = cpu_count() - - def _assert_single_task(self, p, async=False): - """Performs testing in a synchronized environment""" - print >> sys.stderr, "Threadpool: Starting single task (async = %i) with %i threads" % (async, p.size()) - null_tasks = p.num_tasks() # in case we had some before - - # add a simple task - # it iterates n items - ni = 1000 - assert ni % 2 == 0, "ni needs to be dividable by 2" - assert ni % 4 == 0, "ni needs to be dividable by 4" - - make_task = lambda *args, **kwargs: make_iterator_task(ni, *args, **kwargs) - - task = make_task() - - assert p.num_tasks() == null_tasks - rc = p.add_task(task) - assert p.num_tasks() == 1 + null_tasks - assert isinstance(rc, PoolReader) - assert task._out_writer is not None - - # pull the result completely - we should get one task, which calls its - # function once. In sync mode, the order matches - print "read(0)" - items = rc.read() - assert len(items) == ni - task._assert(1, ni) - if not async: - assert items[0] == 0 and items[-1] == ni-1 - - # as the task is done, it should have been removed - we have read everything - assert task.is_done() - del(rc) - assert p.num_tasks() == null_tasks - task = make_task() - - # pull individual items - rc = p.add_task(task) - assert p.num_tasks() == 1 + null_tasks - st = time.time() - print "read(1) * %i" % ni - for i in range(ni): - items = rc.read(1) - assert len(items) == 1 - - # can't assert order in async mode - if not async: - assert i == items[0] - # END for each item - elapsed = time.time() - st - print >> sys.stderr, "Threadpool: processed %i individual items, with %i threads, one at a time, in %f s ( %f items / s )" % (ni, p.size(), elapsed, ni / elapsed) - - # it couldn't yet notice that the input is depleted as we pulled exaclty - # ni items - the next one would remove it. Instead, we delete our channel - # which triggers orphan handling - assert not task.is_done() - assert p.num_tasks() == 1 + null_tasks - del(rc) - assert p.num_tasks() == null_tasks - - # test min count - # if we query 1 item, it will prepare ni / 2 - task = make_task() - task.min_count = ni / 2 - rc = p.add_task(task) - print "read(1)" - items = rc.read(1) - assert len(items) == 1 and items[0] == 0 # processes ni / 2 - print "read(1)" - items = rc.read(1) - assert len(items) == 1 and items[0] == 1 # processes nothing - # rest - it has ni/2 - 2 on the queue, and pulls ni-2 - # It wants too much, so the task realizes its done. The task - # doesn't care about the items in its output channel - nri = ni-2 - print "read(%i)" % nri - items = rc.read(nri) - assert len(items) == nri - p.remove_task(task) - assert p.num_tasks() == null_tasks - task._assert(2, ni) # two chunks, ni calls - - # its already done, gives us no more, its still okay to use it though - # as a task doesn't have to be in the graph to allow reading its produced - # items - print "read(0) on closed" - # it can happen that a thread closes the channel just a tiny fraction of time - # after we check this, so the test fails, although it is nearly closed. - # When we start reading, we should wake up once it sends its signal - # assert task.is_closed() - assert len(rc.read()) == 0 - - # test chunking - # we always want 4 chunks, these could go to individual nodes - task = make_task() - task.min_count = ni / 2 # restore previous value - task.max_chunksize = ni / 4 # 4 chunks - rc = p.add_task(task) - - # must read a specific item count - # count is still at ni / 2 - here we want more than that - # 2 steps with n / 4 items, + 1 step with n/4 items to get + 2 - nri = ni / 2 + 2 - print "read(%i) chunksize set" % nri - items = rc.read(nri) - assert len(items) == nri - # have n / 4 - 2 items on queue, want n / 4 in first chunk, cause 1 processing - # ( 4 in total ). Still want n / 4 - 2 in second chunk, causing another processing - nri = ni / 2 - 2 - print "read(%i) chunksize set" % nri - items = rc.read(nri) - assert len(items) == nri - - task._assert( 5, ni) - - # delete the handle first, causing the task to be removed and to be set - # done. We check for the set-done state later. Depending on the timing, - # The task is not yet set done when we are checking it because we were - # scheduled in before the flag could be set. - del(rc) - assert task.is_done() - assert p.num_tasks() == null_tasks # depleted - - # but this only hits if we want too many items, if we want less, it could - # still do too much - hence we set the min_count to the same number to enforce - # at least ni / 4 items to be preocessed, no matter what we request - task = make_task() - task.min_count = None - task.max_chunksize = ni / 4 # match previous setup - rc = p.add_task(task) - st = time.time() - print "read(1) * %i, chunksize set" % ni - for i in range(ni): - if async: - assert len(rc.read(1)) == 1 - else: - assert rc.read(1)[0] == i - # END handle async mode - # END pull individual items - # too many processing counts ;) - elapsed = time.time() - st - print >> sys.stderr, "Threadpool: processed %i individual items in chunks of %i, with %i threads, one at a time, in %f s ( %f items / s )" % (ni, ni/4, p.size(), elapsed, ni / elapsed) - - task._assert(ni, ni) - assert p.num_tasks() == 1 + null_tasks - assert p.remove_task(task) is p # del manually this time - assert p.num_tasks() == null_tasks - - # now with we set the minimum count to reduce the number of processing counts - task = make_task() - task.min_count = ni / 4 - task.max_chunksize = ni / 4 # match previous setup - rc = p.add_task(task) - print "read(1) * %i, min_count%i + chunksize" % (ni, task.min_count) - for i in range(ni): - items = rc.read(1) - assert len(items) == 1 - if not async: - assert items[0] == i - # END for each item - task._assert(ni / task.min_count, ni) - del(rc) - assert p.num_tasks() == null_tasks - - # test failure - # on failure, the processing stops and the task is finished, keeping - # his error for later - task = make_task() - task.should_fail = True - rc = p.add_task(task) - print "read(0) with failure" - assert len(rc.read()) == 0 # failure on first item - - assert isinstance(task.error(), AssertionError) - assert task.is_done() # on error, its marked done as well - del(rc) - assert p.num_tasks() == null_tasks - - # test failure after ni / 2 items - # This makes sure it correctly closes the channel on failure to prevent blocking - nri = ni/2 - task = make_task(TestFailureThreadTask, fail_after=ni/2) - rc = p.add_task(task) - assert len(rc.read()) == nri - assert task.is_done() - assert isinstance(task.error(), AssertionError) - - print >> sys.stderr, "done with everything" - - - - def _assert_async_dependent_tasks(self, pool): - # includes failure in center task, 'recursive' orphan cleanup - # This will also verify that the channel-close mechanism works - # t1 -> t2 -> t3 - - print >> sys.stderr, "Threadpool: starting async dependency test in %i threads" % pool.size() - null_tasks = pool.num_tasks() - ni = 1000 - count = 3 - aic = count + 2 - make_task = lambda *args, **kwargs: add_task_chain(pool, ni, count, *args, **kwargs) - - ts, rcs = make_task() - assert len(ts) == aic - assert len(rcs) == aic - assert pool.num_tasks() == null_tasks + len(ts) - - # read(0) - ######### - st = time.time() - items = rcs[-1].read() - elapsed = time.time() - st - print len(items), ni - assert len(items) == ni - del(rcs) - assert pool.num_tasks() == 0 # tasks depleted, all done, no handles - # wait a tiny moment - there could still be something unprocessed on the - # queue, increasing the refcount - time.sleep(0.15) - assert sys.getrefcount(ts[-1]) == 2 # ts + call - assert sys.getrefcount(ts[0]) == 2 # ts + call - print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed) - - - # read(1) - ######### - ts, rcs = make_task() - st = time.time() - for i in xrange(ni): - items = rcs[-1].read(1) - assert len(items) == 1 - # END for each item to pull - elapsed_single = time.time() - st - # another read yields nothing, its empty - assert len(rcs[-1].read()) == 0 - print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1) of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed_single, ni / elapsed_single) - - - # read with min-count size - ########################### - # must be faster, as it will read ni / 4 chunks - # Its enough to set one task, as it will force all others in the chain - # to min_size as well. - ts, rcs = make_task() - assert pool.num_tasks() == len(ts) - nri = ni / 4 - ts[-1].min_count = nri - st = time.time() - for i in xrange(ni): - items = rcs[-1].read(1) - assert len(items) == 1 - # END for each item to read - elapsed_minsize = time.time() - st - # its empty - assert len(rcs[-1].read()) == 0 - print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1), min_size=%i, of %i dependent in %f s ( %i items / s )" % (ni, nri, aic, elapsed_minsize, ni / elapsed_minsize) - - # it should have been a bit faster at least, and most of the time it is - # Sometimes, its not, mainly because: - # * The test tasks lock a lot, hence they slow down the system - # * Each read will still trigger the pool to evaluate, causing some overhead - # even though there are enough items on the queue in that case. Keeping - # track of the scheduled items helped there, but it caused further inacceptable - # slowdown - # assert elapsed_minsize < elapsed_single - - - # read with failure - ################### - # it should recover and give at least fail_after items - # t1 -> x -> t3 - fail_after = ni/2 - ts, rcs = make_task(fail_setup=[(0, fail_after)]) - items = rcs[-1].read() - assert len(items) == fail_after - - - # MULTI-POOL - # If two pools are connected, this shold work as well. - # The second one has just one more thread - ts, rcs = make_task() - - # connect verifier channel as feeder of the second pool - p2 = ThreadPool(0) # don't spawn new threads, they have the tendency not to wake up on mutexes - assert p2.size() == 0 - p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count) - assert p2ts[0] is None # we have no feeder task - assert rcs[-1].pool_ref()() is pool # it didnt change the pool - assert rcs[-1] is p2ts[1].reader() - assert p2.num_tasks() == len(p2ts)-1 # first is None - - # reading from the last one will evaluate all pools correctly - print "read(0) multi-pool" - st = time.time() - items = p2rcs[-1].read() - elapsed = time.time() - st - assert len(items) == ni - - print >> sys.stderr, "Dependent Tasks: evaluated 2 connected pools and %i items with read(0), of %i dependent tasks in %f s ( %i items / s )" % (ni, aic + aic-1, elapsed, ni / elapsed) - - - # loose the handles of the second pool to allow others to go as well - del(p2rcs); del(p2ts) - assert p2.num_tasks() == 0 - - # now we lost our old handles as well, and the tasks go away - ts, rcs = make_task() - assert pool.num_tasks() == len(ts) - - p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count) - assert p2.num_tasks() == len(p2ts) - 1 - - # Test multi-read(1) - print "read(1) * %i" % ni - reader = rcs[-1] - st = time.time() - for i in xrange(ni): - items = reader.read(1) - assert len(items) == 1 - # END for each item to get - elapsed = time.time() - st - del(reader) # decrement refcount - - print >> sys.stderr, "Dependent Tasks: evaluated 2 connected pools and %i items with read(1), of %i dependent tasks in %f s ( %i items / s )" % (ni, aic + aic-1, elapsed, ni / elapsed) - - # another read is empty - assert len(rcs[-1].read()) == 0 - - # now that both are connected, I can drop my handle to the reader - # without affecting the task-count, but whats more important: - # They remove their tasks correctly once we drop our references in the - # right order - del(p2ts) - assert p2rcs[0] is rcs[-1] - del(p2rcs) - assert p2.num_tasks() == 0 - del(p2) - - assert pool.num_tasks() == null_tasks + len(ts) - - - del(ts) - del(rcs) - - assert pool.num_tasks() == null_tasks - - - # ASSERTION: We already tested that one pool behaves correctly when an error - # occours - if two pools handle their ref-counts correctly, which they - # do if we are here, then they should handle errors happening during - # the task processing as expected as well. Hence we can safe this here - - - - @terminate_threads - def test_base(self): - max_wait_attempts = 3 - sleep_time = 0.1 - for mc in range(max_wait_attempts): - # wait for threads to die - if len(threading.enumerate()) != 1: - time.sleep(sleep_time) - # END for each attempt - assert len(threading.enumerate()) == 1, "Waited %f s for threads to die, its still alive" % (max_wait_attempts, sleep_time) - - p = ThreadPool() - - # default pools have no workers - assert p.size() == 0 - - # increase and decrease the size - num_threads = len(threading.enumerate()) - for i in range(self.max_threads): - p.set_size(i) - assert p.size() == i - assert len(threading.enumerate()) == num_threads + i - - for i in range(self.max_threads, -1, -1): - p.set_size(i) - assert p.size() == i - - assert p.size() == 0 - # threads should be killed already, but we let them a tiny amount of time - # just to be sure - time.sleep(0.05) - assert len(threading.enumerate()) == num_threads - - # SINGLE TASK SERIAL SYNC MODE - ############################## - # put a few unrelated tasks that we forget about - check ref counts and cleanup - t1, t2 = TestThreadTask(iter(list()), "nothing1", None), TestThreadTask(iter(list()), "nothing2", None) - urc1 = p.add_task(t1) - urc2 = p.add_task(t2) - assert p.num_tasks() == 2 - - ## SINGLE TASK ################# - self._assert_single_task(p, False) - assert p.num_tasks() == 2 - del(urc1) - assert p.num_tasks() == 1 - - p.remove_task(t2) - assert p.num_tasks() == 0 - assert sys.getrefcount(t2) == 2 - - t3 = TestChannelThreadTask(urc2, "channel", None) - urc3 = p.add_task(t3) - assert p.num_tasks() == 1 - del(urc3) - assert p.num_tasks() == 0 - assert sys.getrefcount(t3) == 2 - - - # DEPENDENT TASKS SYNC MODE - ########################### - self._assert_async_dependent_tasks(p) - - - # SINGLE TASK THREADED ASYNC MODE ( 1 thread ) - ############################################## - # step one gear up - just one thread for now. - p.set_size(1) - assert p.size() == 1 - assert len(threading.enumerate()) == num_threads + 1 - # deleting the pool stops its threads - just to be sure ;) - # Its not synchronized, hence we wait a moment - del(p) - time.sleep(0.05) - assert len(threading.enumerate()) == num_threads - - p = ThreadPool(1) - assert len(threading.enumerate()) == num_threads + 1 - - # here we go - self._assert_single_task(p, True) - - - - # SINGLE TASK ASYNC MODE ( 2 threads ) - ###################################### - # two threads to compete for a single task - p.set_size(2) - self._assert_single_task(p, True) - - # real stress test- should be native on every dual-core cpu with 2 hardware - # threads per core - p.set_size(4) - self._assert_single_task(p, True) - - - # DEPENDENT TASK ASYNC MODE - ########################### - self._assert_async_dependent_tasks(p) - - print >> sys.stderr, "Done with everything" - diff --git a/test/git/async/test_task.py b/test/git/async/test_task.py deleted file mode 100644 index c6a796e9..00000000 --- a/test/git/async/test_task.py +++ /dev/null @@ -1,15 +0,0 @@ -"""Channel testing""" -from test.testlib import * -from git.async.util import * -from git.async.task import * - -import time - -class TestTask(TestBase): - - max_threads = cpu_count() - - def test_iterator_task(self): - # tested via test_pool - pass - diff --git a/test/git/async/test_thread.py b/test/git/async/test_thread.py deleted file mode 100644 index a08c1dc7..00000000 --- a/test/git/async/test_thread.py +++ /dev/null @@ -1,44 +0,0 @@ -# -*- coding: utf-8 -*- -""" Test thead classes and functions""" -from test.testlib import * -from git.async.thread import * -from Queue import Queue -import time - -class TestWorker(WorkerThread): - def __init__(self, *args, **kwargs): - super(TestWorker, self).__init__(*args, **kwargs) - self.reset() - - def fun(self, arg): - self.called = True - self.arg = arg - return True - - def make_assertion(self): - assert self.called - assert self.arg - self.reset() - - def reset(self): - self.called = False - self.arg = None - - -class TestThreads( TestCase ): - - @terminate_threads - def test_worker_thread(self): - worker = TestWorker() - assert isinstance(worker.start(), WorkerThread) - - # test different method types - standalone_func = lambda *args, **kwargs: worker.fun(*args, **kwargs) - for function in (TestWorker.fun, worker.fun, standalone_func): - worker.inq.put((function, 1)) - time.sleep(0.01) - worker.make_assertion() - # END for each function type - - worker.stop_and_join() - |