summaryrefslogtreecommitdiff
path: root/test/git/async
diff options
context:
space:
mode:
Diffstat (limited to 'test/git/async')
-rw-r--r--test/git/async/__init__.py0
-rw-r--r--test/git/async/task.py202
-rw-r--r--test/git/async/test_channel.py87
-rw-r--r--test/git/async/test_graph.py80
-rw-r--r--test/git/async/test_performance.py51
-rw-r--r--test/git/async/test_pool.py476
-rw-r--r--test/git/async/test_task.py15
-rw-r--r--test/git/async/test_thread.py44
8 files changed, 0 insertions, 955 deletions
diff --git a/test/git/async/__init__.py b/test/git/async/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/test/git/async/__init__.py
+++ /dev/null
diff --git a/test/git/async/task.py b/test/git/async/task.py
deleted file mode 100644
index 583cb1f8..00000000
--- a/test/git/async/task.py
+++ /dev/null
@@ -1,202 +0,0 @@
-"""Module containing task implementations useful for testing them"""
-from git.async.task import *
-
-import threading
-import weakref
-
-class _TestTaskBase(object):
- """Note: causes great slowdown due to the required locking of task variables"""
- def __init__(self, *args, **kwargs):
- super(_TestTaskBase, self).__init__(*args, **kwargs)
- self.should_fail = False
- self.lock = threading.Lock() # yes, can't safely do x = x + 1 :)
- self.plock = threading.Lock()
- self.item_count = 0
- self.process_count = 0
-
- def do_fun(self, item):
- self.lock.acquire()
- self.item_count += 1
- self.lock.release()
- if self.should_fail:
- raise AssertionError("I am failing just for the fun of it")
- return item
-
- def process(self, count=1):
- # must do it first, otherwise we might read and check results before
- # the thread gets here :). Its a lesson !
- self.plock.acquire()
- self.process_count += 1
- self.plock.release()
- super(_TestTaskBase, self).process(count)
-
- def _assert(self, pc, fc, check_scheduled=False):
- """Assert for num process counts (pc) and num function counts (fc)
- :return: self"""
- self.lock.acquire()
- if self.item_count != fc:
- print self.item_count, fc
- assert self.item_count == fc
- self.lock.release()
-
- # NOTE: asserting num-writers fails every now and then, implying a thread is
- # still processing (an empty chunk) when we are checking it. This can
- # only be prevented by checking the scheduled items, which requires locking
- # and causes slowdows, so we don't do that. If the num_writers
- # counter wouldn't be maintained properly, more tests would fail, so
- # we can safely refrain from checking this here
- # self._wlock.acquire()
- # assert self._num_writers == 0
- # self._wlock.release()
- return self
-
-
-class TestThreadTask(_TestTaskBase, IteratorThreadTask):
- pass
-
-
-class TestFailureThreadTask(TestThreadTask):
- """Fails after X items"""
- def __init__(self, *args, **kwargs):
- self.fail_after = kwargs.pop('fail_after')
- super(TestFailureThreadTask, self).__init__(*args, **kwargs)
-
- def do_fun(self, item):
- item = TestThreadTask.do_fun(self, item)
-
- self.lock.acquire()
- try:
- if self.item_count > self.fail_after:
- raise AssertionError("Simulated failure after processing %i items" % self.fail_after)
- finally:
- self.lock.release()
- # END handle fail after
- return item
-
-
-class TestChannelThreadTask(_TestTaskBase, ChannelThreadTask):
- """Apply a transformation on items read from an input channel"""
- def __init__(self, *args, **kwargs):
- self.fail_after = kwargs.pop('fail_after', 0)
- super(TestChannelThreadTask, self).__init__(*args, **kwargs)
-
- def do_fun(self, item):
- """return tuple(i, i*2)"""
- item = super(TestChannelThreadTask, self).do_fun(item)
-
- # fail after support
- if self.fail_after:
- self.lock.acquire()
- try:
- if self.item_count > self.fail_after:
- raise AssertionError("Simulated failure after processing %i items" % self.fail_after)
- finally:
- self.lock.release()
- # END handle fail-after
-
- if isinstance(item, tuple):
- i = item[0]
- return item + (i * self.id, )
- else:
- return (item, item * self.id)
- # END handle tuple
-
-
-class TestPerformanceThreadTask(ChannelThreadTask):
- """Applies no operation to the item, and does not lock, measuring
- the actual throughput of the system"""
-
- def do_fun(self, item):
- return item
-
-
-class TestVerifyChannelThreadTask(_TestTaskBase, ChannelThreadTask):
- """An input channel task, which verifies the result of its input channels,
- should be last in the chain.
- Id must be int"""
-
- def do_fun(self, item):
- """return tuple(i, i*2)"""
- item = super(TestVerifyChannelThreadTask, self).do_fun(item)
-
- # make sure the computation order matches
- assert isinstance(item, tuple), "input was no tuple: %s" % item
-
- base = item[0]
- for id, num in enumerate(item[1:]):
- assert num == base * id, "%i != %i, orig = %s" % (num, base * id, str(item))
- # END verify order
-
- return item
-
-#{ Utilities
-
-def make_proxy_method(t):
- """required to prevent binding self into the method we call"""
- wt = weakref.proxy(t)
- return lambda item: wt.do_fun(item)
-
-def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0,
- feedercls=TestThreadTask, transformercls=TestChannelThreadTask,
- include_verifier=True):
- """Create a task chain of feeder, count transformers and order verifcator
- to the pool p, like t1 -> t2 -> t3
- :param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would
- make the third transformer fail after 20 items
- :param feeder_channel: if set to a channel, it will be used as input of the
- first transformation task. The respective first task in the return value
- will be None.
- :param id_offset: defines the id of the first transformation task, all subsequent
- ones will add one
- :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))"""
- nt = p.num_tasks()
-
- feeder = None
- frc = feeder_channel
- if feeder_channel is None:
- feeder = make_iterator_task(ni, taskcls=feedercls)
- frc = p.add_task(feeder)
- # END handle specific feeder
-
- rcs = [frc]
- tasks = [feeder]
-
- inrc = frc
- for tc in xrange(count):
- t = transformercls(inrc, tc+id_offset, None)
-
- t.fun = make_proxy_method(t)
- #t.fun = t.do_fun
- inrc = p.add_task(t)
-
- tasks.append(t)
- rcs.append(inrc)
- # END create count transformers
-
- # setup failure
- for id, fail_after in fail_setup:
- tasks[1+id].fail_after = fail_after
- # END setup failure
-
- if include_verifier:
- verifier = TestVerifyChannelThreadTask(inrc, 'verifier', None)
- #verifier.fun = verifier.do_fun
- verifier.fun = make_proxy_method(verifier)
- vrc = p.add_task(verifier)
-
-
- tasks.append(verifier)
- rcs.append(vrc)
- # END handle include verifier
- return tasks, rcs
-
-def make_iterator_task(ni, taskcls=TestThreadTask, **kwargs):
- """:return: task which yields ni items
- :param taskcls: the actual iterator type to use
- :param **kwargs: additional kwargs to be passed to the task"""
- t = taskcls(iter(range(ni)), 'iterator', None, **kwargs)
- if isinstance(t, _TestTaskBase):
- t.fun = make_proxy_method(t)
- return t
-
-#} END utilities
diff --git a/test/git/async/test_channel.py b/test/git/async/test_channel.py
deleted file mode 100644
index e9e1b64c..00000000
--- a/test/git/async/test_channel.py
+++ /dev/null
@@ -1,87 +0,0 @@
-"""Channel testing"""
-from test.testlib import *
-from git.async.channel import *
-
-import time
-
-class TestChannels(TestBase):
-
- def test_base(self):
- # creating channel yields a write and a read channal
- wc, rc = mkchannel()
- assert isinstance(wc, ChannelWriter) # default args
- assert isinstance(rc, ChannelReader)
-
-
- # TEST UNLIMITED SIZE CHANNEL - writing+reading is FIFO
- item = 1
- item2 = 2
- wc.write(item)
- wc.write(item2)
-
- # read all - it blocks as its still open for writing
- to = 0.2
- st = time.time()
- assert rc.read(timeout=to) == [item, item2]
- assert time.time() - st >= to
-
- # next read blocks. it waits a second
- st = time.time()
- assert len(rc.read(1, True, to)) == 0
- assert time.time() - st >= to
-
- # writing to a closed channel raises
- assert not wc.closed()
- wc.close()
- assert wc.closed()
- wc.close() # fine
- assert wc.closed()
-
- self.failUnlessRaises(ReadOnly, wc.write, 1)
-
- # reading from a closed channel never blocks
- assert len(rc.read()) == 0
- assert len(rc.read(5)) == 0
- assert len(rc.read(1)) == 0
-
-
- # test callback channels
- wc, rc = mkchannel(wtype = CallbackChannelWriter, rtype = CallbackChannelReader)
-
- cb = [0, 0] # set slots to one if called
- def pre_write(item):
- cb[0] = 1
- return item + 1
- def pre_read(count):
- cb[1] = 1
-
- # set, verify it returns previous one
- assert wc.set_pre_cb(pre_write) is None
- assert rc.set_pre_cb(pre_read) is None
- assert wc.set_pre_cb(pre_write) is pre_write
- assert rc.set_pre_cb(pre_read) is pre_read
-
- # writer transforms input
- val = 5
- wc.write(val)
- assert cb[0] == 1 and cb[1] == 0
-
- rval = rc.read(1)[0] # read one item, must not block
- assert cb[0] == 1 and cb[1] == 1
- assert rval == val + 1
-
-
-
- # ITERATOR READER
- reader = IteratorReader(iter(range(10)))
- assert len(reader.read(2)) == 2
- assert len(reader.read(0)) == 8
- # its empty now
- assert len(reader.read(0)) == 0
- assert len(reader.read(5)) == 0
-
- # doesn't work if item is not an iterator
- self.failUnlessRaises(ValueError, IteratorReader, list())
-
- # NOTE: its thread-safety is tested by the pool
-
diff --git a/test/git/async/test_graph.py b/test/git/async/test_graph.py
deleted file mode 100644
index 7630226b..00000000
--- a/test/git/async/test_graph.py
+++ /dev/null
@@ -1,80 +0,0 @@
-"""Channel testing"""
-from test.testlib import *
-from git.async.graph import *
-
-import time
-import sys
-
-class TestGraph(TestBase):
-
- def test_base(self):
- g = Graph()
- nn = 10
- assert nn > 2, "need at least 3 nodes"
-
- # add unconnected nodes
- for i in range(nn):
- assert isinstance(g.add_node(Node()), Node)
- # END add nodes
- assert len(g.nodes) == nn
-
- # delete unconnected nodes
- for n in g.nodes[:]:
- g.remove_node(n)
- # END del nodes
-
- # add a chain of connected nodes
- last = None
- for i in range(nn):
- n = g.add_node(Node(i))
- if last:
- assert not last.out_nodes
- assert not n.in_nodes
- assert g.add_edge(last, n) is g
- assert last.out_nodes[0] is n
- assert n.in_nodes[0] is last
- last = n
- # END for each node to connect
-
- # try to connect a node with itself
- self.failUnlessRaises(ValueError, g.add_edge, last, last)
-
- # try to create a cycle
- self.failUnlessRaises(ValueError, g.add_edge, g.nodes[0], g.nodes[-1])
- self.failUnlessRaises(ValueError, g.add_edge, g.nodes[-1], g.nodes[0])
-
- # we have undirected edges, readding the same edge, but the other way
- # around does not change anything
- n1, n2, n3 = g.nodes[0], g.nodes[1], g.nodes[2]
- g.add_edge(n1, n2) # already connected
- g.add_edge(n2, n1) # same thing
- assert len(n1.out_nodes) == 1
- assert len(n1.in_nodes) == 0
- assert len(n2.in_nodes) == 1
- assert len(n2.out_nodes) == 1
-
- # deleting a connected node clears its neighbour connections
- assert n3.in_nodes[0] is n2
- assert g.remove_node(n2) is g
- assert g.remove_node(n2) is g # multi-deletion okay
- assert len(g.nodes) == nn - 1
- assert len(n3.in_nodes) == 0
- assert len(n1.out_nodes) == 0
-
- # check the history from the last node
- end = g.nodes[-1]
- dfirst_nodes = g.input_inclusive_dfirst_reversed(end)
- num_nodes_seen = nn - 2 # deleted second, which leaves first one disconnected
- assert len(dfirst_nodes) == num_nodes_seen
- assert dfirst_nodes[-1] == end and dfirst_nodes[-2].id == end.id-1
-
-
- # test cleanup
- # its at least kept by its graph
- assert sys.getrefcount(end) > 3
- del(g)
- del(n1); del(n2); del(n3)
- del(dfirst_nodes)
- del(last)
- del(n)
- assert sys.getrefcount(end) == 2
diff --git a/test/git/async/test_performance.py b/test/git/async/test_performance.py
deleted file mode 100644
index 703c8593..00000000
--- a/test/git/async/test_performance.py
+++ /dev/null
@@ -1,51 +0,0 @@
-"""Channel testing"""
-from test.testlib import *
-from task import *
-
-from git.async.pool import *
-from git.async.thread import terminate_threads
-from git.async.util import cpu_count
-
-import time
-import sys
-
-
-
-class TestThreadPoolPerformance(TestBase):
-
- max_threads = cpu_count()
-
- def test_base(self):
- # create a dependency network, and see how the performance changes
- # when adjusting the amount of threads
- pool = ThreadPool(0)
- ni = 1000 # number of items to process
- print self.max_threads
- for num_threads in range(self.max_threads*2 + 1):
- pool.set_size(num_threads)
- for num_transformers in (1, 5, 10):
- for read_mode in range(2):
- ts, rcs = add_task_chain(pool, ni, count=num_transformers,
- feedercls=IteratorThreadTask,
- transformercls=TestPerformanceThreadTask,
- include_verifier=False)
-
- mode_info = "read(0)"
- if read_mode == 1:
- mode_info = "read(1) * %i" % ni
- # END mode info
- fmt = "Threadcount=%%i: Produced %%i items using %s in %%i transformations in %%f s (%%f items / s)" % mode_info
- reader = rcs[-1]
- st = time.time()
- if read_mode == 1:
- for i in xrange(ni):
- assert len(reader.read(1)) == 1
- # END for each item to read
- else:
- assert len(reader.read(0)) == ni
- # END handle read mode
- elapsed = time.time() - st
- print >> sys.stderr, fmt % (num_threads, ni, num_transformers, elapsed, ni / elapsed)
- # END for each read-mode
- # END for each amount of processors
- # END for each thread count
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py
deleted file mode 100644
index aab618aa..00000000
--- a/test/git/async/test_pool.py
+++ /dev/null
@@ -1,476 +0,0 @@
-"""Channel testing"""
-from test.testlib import *
-from task import *
-
-from git.async.pool import *
-from git.async.thread import terminate_threads
-from git.async.util import cpu_count
-
-import threading
-import weakref
-import time
-import sys
-
-
-
-class TestThreadPool(TestBase):
-
- max_threads = cpu_count()
-
- def _assert_single_task(self, p, async=False):
- """Performs testing in a synchronized environment"""
- print >> sys.stderr, "Threadpool: Starting single task (async = %i) with %i threads" % (async, p.size())
- null_tasks = p.num_tasks() # in case we had some before
-
- # add a simple task
- # it iterates n items
- ni = 1000
- assert ni % 2 == 0, "ni needs to be dividable by 2"
- assert ni % 4 == 0, "ni needs to be dividable by 4"
-
- make_task = lambda *args, **kwargs: make_iterator_task(ni, *args, **kwargs)
-
- task = make_task()
-
- assert p.num_tasks() == null_tasks
- rc = p.add_task(task)
- assert p.num_tasks() == 1 + null_tasks
- assert isinstance(rc, PoolReader)
- assert task._out_writer is not None
-
- # pull the result completely - we should get one task, which calls its
- # function once. In sync mode, the order matches
- print "read(0)"
- items = rc.read()
- assert len(items) == ni
- task._assert(1, ni)
- if not async:
- assert items[0] == 0 and items[-1] == ni-1
-
- # as the task is done, it should have been removed - we have read everything
- assert task.is_done()
- del(rc)
- assert p.num_tasks() == null_tasks
- task = make_task()
-
- # pull individual items
- rc = p.add_task(task)
- assert p.num_tasks() == 1 + null_tasks
- st = time.time()
- print "read(1) * %i" % ni
- for i in range(ni):
- items = rc.read(1)
- assert len(items) == 1
-
- # can't assert order in async mode
- if not async:
- assert i == items[0]
- # END for each item
- elapsed = time.time() - st
- print >> sys.stderr, "Threadpool: processed %i individual items, with %i threads, one at a time, in %f s ( %f items / s )" % (ni, p.size(), elapsed, ni / elapsed)
-
- # it couldn't yet notice that the input is depleted as we pulled exaclty
- # ni items - the next one would remove it. Instead, we delete our channel
- # which triggers orphan handling
- assert not task.is_done()
- assert p.num_tasks() == 1 + null_tasks
- del(rc)
- assert p.num_tasks() == null_tasks
-
- # test min count
- # if we query 1 item, it will prepare ni / 2
- task = make_task()
- task.min_count = ni / 2
- rc = p.add_task(task)
- print "read(1)"
- items = rc.read(1)
- assert len(items) == 1 and items[0] == 0 # processes ni / 2
- print "read(1)"
- items = rc.read(1)
- assert len(items) == 1 and items[0] == 1 # processes nothing
- # rest - it has ni/2 - 2 on the queue, and pulls ni-2
- # It wants too much, so the task realizes its done. The task
- # doesn't care about the items in its output channel
- nri = ni-2
- print "read(%i)" % nri
- items = rc.read(nri)
- assert len(items) == nri
- p.remove_task(task)
- assert p.num_tasks() == null_tasks
- task._assert(2, ni) # two chunks, ni calls
-
- # its already done, gives us no more, its still okay to use it though
- # as a task doesn't have to be in the graph to allow reading its produced
- # items
- print "read(0) on closed"
- # it can happen that a thread closes the channel just a tiny fraction of time
- # after we check this, so the test fails, although it is nearly closed.
- # When we start reading, we should wake up once it sends its signal
- # assert task.is_closed()
- assert len(rc.read()) == 0
-
- # test chunking
- # we always want 4 chunks, these could go to individual nodes
- task = make_task()
- task.min_count = ni / 2 # restore previous value
- task.max_chunksize = ni / 4 # 4 chunks
- rc = p.add_task(task)
-
- # must read a specific item count
- # count is still at ni / 2 - here we want more than that
- # 2 steps with n / 4 items, + 1 step with n/4 items to get + 2
- nri = ni / 2 + 2
- print "read(%i) chunksize set" % nri
- items = rc.read(nri)
- assert len(items) == nri
- # have n / 4 - 2 items on queue, want n / 4 in first chunk, cause 1 processing
- # ( 4 in total ). Still want n / 4 - 2 in second chunk, causing another processing
- nri = ni / 2 - 2
- print "read(%i) chunksize set" % nri
- items = rc.read(nri)
- assert len(items) == nri
-
- task._assert( 5, ni)
-
- # delete the handle first, causing the task to be removed and to be set
- # done. We check for the set-done state later. Depending on the timing,
- # The task is not yet set done when we are checking it because we were
- # scheduled in before the flag could be set.
- del(rc)
- assert task.is_done()
- assert p.num_tasks() == null_tasks # depleted
-
- # but this only hits if we want too many items, if we want less, it could
- # still do too much - hence we set the min_count to the same number to enforce
- # at least ni / 4 items to be preocessed, no matter what we request
- task = make_task()
- task.min_count = None
- task.max_chunksize = ni / 4 # match previous setup
- rc = p.add_task(task)
- st = time.time()
- print "read(1) * %i, chunksize set" % ni
- for i in range(ni):
- if async:
- assert len(rc.read(1)) == 1
- else:
- assert rc.read(1)[0] == i
- # END handle async mode
- # END pull individual items
- # too many processing counts ;)
- elapsed = time.time() - st
- print >> sys.stderr, "Threadpool: processed %i individual items in chunks of %i, with %i threads, one at a time, in %f s ( %f items / s )" % (ni, ni/4, p.size(), elapsed, ni / elapsed)
-
- task._assert(ni, ni)
- assert p.num_tasks() == 1 + null_tasks
- assert p.remove_task(task) is p # del manually this time
- assert p.num_tasks() == null_tasks
-
- # now with we set the minimum count to reduce the number of processing counts
- task = make_task()
- task.min_count = ni / 4
- task.max_chunksize = ni / 4 # match previous setup
- rc = p.add_task(task)
- print "read(1) * %i, min_count%i + chunksize" % (ni, task.min_count)
- for i in range(ni):
- items = rc.read(1)
- assert len(items) == 1
- if not async:
- assert items[0] == i
- # END for each item
- task._assert(ni / task.min_count, ni)
- del(rc)
- assert p.num_tasks() == null_tasks
-
- # test failure
- # on failure, the processing stops and the task is finished, keeping
- # his error for later
- task = make_task()
- task.should_fail = True
- rc = p.add_task(task)
- print "read(0) with failure"
- assert len(rc.read()) == 0 # failure on first item
-
- assert isinstance(task.error(), AssertionError)
- assert task.is_done() # on error, its marked done as well
- del(rc)
- assert p.num_tasks() == null_tasks
-
- # test failure after ni / 2 items
- # This makes sure it correctly closes the channel on failure to prevent blocking
- nri = ni/2
- task = make_task(TestFailureThreadTask, fail_after=ni/2)
- rc = p.add_task(task)
- assert len(rc.read()) == nri
- assert task.is_done()
- assert isinstance(task.error(), AssertionError)
-
- print >> sys.stderr, "done with everything"
-
-
-
- def _assert_async_dependent_tasks(self, pool):
- # includes failure in center task, 'recursive' orphan cleanup
- # This will also verify that the channel-close mechanism works
- # t1 -> t2 -> t3
-
- print >> sys.stderr, "Threadpool: starting async dependency test in %i threads" % pool.size()
- null_tasks = pool.num_tasks()
- ni = 1000
- count = 3
- aic = count + 2
- make_task = lambda *args, **kwargs: add_task_chain(pool, ni, count, *args, **kwargs)
-
- ts, rcs = make_task()
- assert len(ts) == aic
- assert len(rcs) == aic
- assert pool.num_tasks() == null_tasks + len(ts)
-
- # read(0)
- #########
- st = time.time()
- items = rcs[-1].read()
- elapsed = time.time() - st
- print len(items), ni
- assert len(items) == ni
- del(rcs)
- assert pool.num_tasks() == 0 # tasks depleted, all done, no handles
- # wait a tiny moment - there could still be something unprocessed on the
- # queue, increasing the refcount
- time.sleep(0.15)
- assert sys.getrefcount(ts[-1]) == 2 # ts + call
- assert sys.getrefcount(ts[0]) == 2 # ts + call
- print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed)
-
-
- # read(1)
- #########
- ts, rcs = make_task()
- st = time.time()
- for i in xrange(ni):
- items = rcs[-1].read(1)
- assert len(items) == 1
- # END for each item to pull
- elapsed_single = time.time() - st
- # another read yields nothing, its empty
- assert len(rcs[-1].read()) == 0
- print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1) of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed_single, ni / elapsed_single)
-
-
- # read with min-count size
- ###########################
- # must be faster, as it will read ni / 4 chunks
- # Its enough to set one task, as it will force all others in the chain
- # to min_size as well.
- ts, rcs = make_task()
- assert pool.num_tasks() == len(ts)
- nri = ni / 4
- ts[-1].min_count = nri
- st = time.time()
- for i in xrange(ni):
- items = rcs[-1].read(1)
- assert len(items) == 1
- # END for each item to read
- elapsed_minsize = time.time() - st
- # its empty
- assert len(rcs[-1].read()) == 0
- print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1), min_size=%i, of %i dependent in %f s ( %i items / s )" % (ni, nri, aic, elapsed_minsize, ni / elapsed_minsize)
-
- # it should have been a bit faster at least, and most of the time it is
- # Sometimes, its not, mainly because:
- # * The test tasks lock a lot, hence they slow down the system
- # * Each read will still trigger the pool to evaluate, causing some overhead
- # even though there are enough items on the queue in that case. Keeping
- # track of the scheduled items helped there, but it caused further inacceptable
- # slowdown
- # assert elapsed_minsize < elapsed_single
-
-
- # read with failure
- ###################
- # it should recover and give at least fail_after items
- # t1 -> x -> t3
- fail_after = ni/2
- ts, rcs = make_task(fail_setup=[(0, fail_after)])
- items = rcs[-1].read()
- assert len(items) == fail_after
-
-
- # MULTI-POOL
- # If two pools are connected, this shold work as well.
- # The second one has just one more thread
- ts, rcs = make_task()
-
- # connect verifier channel as feeder of the second pool
- p2 = ThreadPool(0) # don't spawn new threads, they have the tendency not to wake up on mutexes
- assert p2.size() == 0
- p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count)
- assert p2ts[0] is None # we have no feeder task
- assert rcs[-1].pool_ref()() is pool # it didnt change the pool
- assert rcs[-1] is p2ts[1].reader()
- assert p2.num_tasks() == len(p2ts)-1 # first is None
-
- # reading from the last one will evaluate all pools correctly
- print "read(0) multi-pool"
- st = time.time()
- items = p2rcs[-1].read()
- elapsed = time.time() - st
- assert len(items) == ni
-
- print >> sys.stderr, "Dependent Tasks: evaluated 2 connected pools and %i items with read(0), of %i dependent tasks in %f s ( %i items / s )" % (ni, aic + aic-1, elapsed, ni / elapsed)
-
-
- # loose the handles of the second pool to allow others to go as well
- del(p2rcs); del(p2ts)
- assert p2.num_tasks() == 0
-
- # now we lost our old handles as well, and the tasks go away
- ts, rcs = make_task()
- assert pool.num_tasks() == len(ts)
-
- p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count)
- assert p2.num_tasks() == len(p2ts) - 1
-
- # Test multi-read(1)
- print "read(1) * %i" % ni
- reader = rcs[-1]
- st = time.time()
- for i in xrange(ni):
- items = reader.read(1)
- assert len(items) == 1
- # END for each item to get
- elapsed = time.time() - st
- del(reader) # decrement refcount
-
- print >> sys.stderr, "Dependent Tasks: evaluated 2 connected pools and %i items with read(1), of %i dependent tasks in %f s ( %i items / s )" % (ni, aic + aic-1, elapsed, ni / elapsed)
-
- # another read is empty
- assert len(rcs[-1].read()) == 0
-
- # now that both are connected, I can drop my handle to the reader
- # without affecting the task-count, but whats more important:
- # They remove their tasks correctly once we drop our references in the
- # right order
- del(p2ts)
- assert p2rcs[0] is rcs[-1]
- del(p2rcs)
- assert p2.num_tasks() == 0
- del(p2)
-
- assert pool.num_tasks() == null_tasks + len(ts)
-
-
- del(ts)
- del(rcs)
-
- assert pool.num_tasks() == null_tasks
-
-
- # ASSERTION: We already tested that one pool behaves correctly when an error
- # occours - if two pools handle their ref-counts correctly, which they
- # do if we are here, then they should handle errors happening during
- # the task processing as expected as well. Hence we can safe this here
-
-
-
- @terminate_threads
- def test_base(self):
- max_wait_attempts = 3
- sleep_time = 0.1
- for mc in range(max_wait_attempts):
- # wait for threads to die
- if len(threading.enumerate()) != 1:
- time.sleep(sleep_time)
- # END for each attempt
- assert len(threading.enumerate()) == 1, "Waited %f s for threads to die, its still alive" % (max_wait_attempts, sleep_time)
-
- p = ThreadPool()
-
- # default pools have no workers
- assert p.size() == 0
-
- # increase and decrease the size
- num_threads = len(threading.enumerate())
- for i in range(self.max_threads):
- p.set_size(i)
- assert p.size() == i
- assert len(threading.enumerate()) == num_threads + i
-
- for i in range(self.max_threads, -1, -1):
- p.set_size(i)
- assert p.size() == i
-
- assert p.size() == 0
- # threads should be killed already, but we let them a tiny amount of time
- # just to be sure
- time.sleep(0.05)
- assert len(threading.enumerate()) == num_threads
-
- # SINGLE TASK SERIAL SYNC MODE
- ##############################
- # put a few unrelated tasks that we forget about - check ref counts and cleanup
- t1, t2 = TestThreadTask(iter(list()), "nothing1", None), TestThreadTask(iter(list()), "nothing2", None)
- urc1 = p.add_task(t1)
- urc2 = p.add_task(t2)
- assert p.num_tasks() == 2
-
- ## SINGLE TASK #################
- self._assert_single_task(p, False)
- assert p.num_tasks() == 2
- del(urc1)
- assert p.num_tasks() == 1
-
- p.remove_task(t2)
- assert p.num_tasks() == 0
- assert sys.getrefcount(t2) == 2
-
- t3 = TestChannelThreadTask(urc2, "channel", None)
- urc3 = p.add_task(t3)
- assert p.num_tasks() == 1
- del(urc3)
- assert p.num_tasks() == 0
- assert sys.getrefcount(t3) == 2
-
-
- # DEPENDENT TASKS SYNC MODE
- ###########################
- self._assert_async_dependent_tasks(p)
-
-
- # SINGLE TASK THREADED ASYNC MODE ( 1 thread )
- ##############################################
- # step one gear up - just one thread for now.
- p.set_size(1)
- assert p.size() == 1
- assert len(threading.enumerate()) == num_threads + 1
- # deleting the pool stops its threads - just to be sure ;)
- # Its not synchronized, hence we wait a moment
- del(p)
- time.sleep(0.05)
- assert len(threading.enumerate()) == num_threads
-
- p = ThreadPool(1)
- assert len(threading.enumerate()) == num_threads + 1
-
- # here we go
- self._assert_single_task(p, True)
-
-
-
- # SINGLE TASK ASYNC MODE ( 2 threads )
- ######################################
- # two threads to compete for a single task
- p.set_size(2)
- self._assert_single_task(p, True)
-
- # real stress test- should be native on every dual-core cpu with 2 hardware
- # threads per core
- p.set_size(4)
- self._assert_single_task(p, True)
-
-
- # DEPENDENT TASK ASYNC MODE
- ###########################
- self._assert_async_dependent_tasks(p)
-
- print >> sys.stderr, "Done with everything"
-
diff --git a/test/git/async/test_task.py b/test/git/async/test_task.py
deleted file mode 100644
index c6a796e9..00000000
--- a/test/git/async/test_task.py
+++ /dev/null
@@ -1,15 +0,0 @@
-"""Channel testing"""
-from test.testlib import *
-from git.async.util import *
-from git.async.task import *
-
-import time
-
-class TestTask(TestBase):
-
- max_threads = cpu_count()
-
- def test_iterator_task(self):
- # tested via test_pool
- pass
-
diff --git a/test/git/async/test_thread.py b/test/git/async/test_thread.py
deleted file mode 100644
index a08c1dc7..00000000
--- a/test/git/async/test_thread.py
+++ /dev/null
@@ -1,44 +0,0 @@
-# -*- coding: utf-8 -*-
-""" Test thead classes and functions"""
-from test.testlib import *
-from git.async.thread import *
-from Queue import Queue
-import time
-
-class TestWorker(WorkerThread):
- def __init__(self, *args, **kwargs):
- super(TestWorker, self).__init__(*args, **kwargs)
- self.reset()
-
- def fun(self, arg):
- self.called = True
- self.arg = arg
- return True
-
- def make_assertion(self):
- assert self.called
- assert self.arg
- self.reset()
-
- def reset(self):
- self.called = False
- self.arg = None
-
-
-class TestThreads( TestCase ):
-
- @terminate_threads
- def test_worker_thread(self):
- worker = TestWorker()
- assert isinstance(worker.start(), WorkerThread)
-
- # test different method types
- standalone_func = lambda *args, **kwargs: worker.fun(*args, **kwargs)
- for function in (TestWorker.fun, worker.fun, standalone_func):
- worker.inq.put((function, 1))
- time.sleep(0.01)
- worker.make_assertion()
- # END for each function type
-
- worker.stop_and_join()
-