diff options
-rw-r--r-- | lib/git/async/pool.py | 26 | ||||
-rw-r--r-- | lib/git/async/thread.py | 3 | ||||
-rw-r--r-- | test/git/async/test_pool.py | 40 |
3 files changed, 54 insertions, 15 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 009096f2..26a6a182 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -107,6 +107,7 @@ class Pool(object): '_consumed_tasks', # a queue with tasks that are done or had an error '_workers', # list of worker threads '_queue', # master queue for tasks + '_taskorder_cache', # map task id -> ordered dependent tasks '_taskgraph_lock', # lock for accessing the task graph ) @@ -130,6 +131,7 @@ class Pool(object): self._workers = list() self._queue = self.TaskQueueCls() self._taskgraph_lock = self.LockCls() + self._taskorder_cache = dict() self.set_size(size) def __del__(self): @@ -149,10 +151,21 @@ class Pool(object): Tasks which are not done will be put onto the queue for processing, which is fine as we walked them depth-first.""" - dfirst_tasks = list() - # for the walk, we must make sure the ordering does not change - # Note: the result of this could be cached - self._tasks.visit_input_inclusive_depth_first(task, lambda n: dfirst_tasks.append(n)) + # for the walk, we must make sure the ordering does not change. Even + # when accessing the cache, as it is related to graph changes + self._taskgraph_lock.acquire() + try: + try: + dfirst_tasks = self._taskorder_cache[id(task)] + except KeyError: + # have to retrieve the list from the graph + dfirst_tasks = list() + self._tasks.visit_input_inclusive_depth_first(task, lambda n: dfirst_tasks.append(n)) + self._taskorder_cache[id(task)] = dfirst_tasks + # END handle cached order retrieval + finally: + self._taskgraph_lock.release() + # END handle locking # check the min count on all involved tasks, and be sure that we don't # have any task which produces less than the maximum min-count of all tasks @@ -208,7 +221,8 @@ class Pool(object): # the following loops are kind of unrolled - code duplication # should make things execute faster. Putting the if statements # into the loop would be less code, but ... slower - print actual_count, numchunks, chunksize, remainder, task._out_wc.size() + # DEBUG + # print actual_count, numchunks, chunksize, remainder, task._out_wc.size() if self._workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task @@ -332,6 +346,7 @@ class Pool(object): task.set_done() self._taskgraph_lock.acquire() try: + self._taskorder_cache.clear() self._tasks.del_node(task) finally: self._taskgraph_lock.release() @@ -360,6 +375,7 @@ class Pool(object): self._taskgraph_lock.acquire() try: + self._taskorder_cache.clear() self._tasks.add_node(task) finally: self._taskgraph_lock.release() diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 2ed002e9..f875f094 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -141,7 +141,8 @@ class WorkerThread(TerminatableThread): # needing exactly one function, and one arg assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need" routine, arg = tasktuple - + # DEBUG + # print "%s: picked up: %s(%s)" % (self.name, routine, arg) try: rval = None if inspect.ismethod(routine): diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 628e2a93..df3eaf11 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -40,14 +40,15 @@ class TestThreadPool(TestBase): max_threads = cpu_count() - def _assert_sync_single_task(self, p): + def _assert_single_task(self, p, async=False): """Performs testing in a synchronized environment""" null_tasks = p.num_tasks() # in case we had some before # add a simple task # it iterates n items - ni = 20 + ni = 1000 assert ni % 2 == 0, "ni needs to be dividable by 2" + assert ni % 4 == 0, "ni needs to be dividable by 4" def make_iter(): return iter(range(ni)) @@ -76,11 +77,18 @@ class TestThreadPool(TestBase): # pull individual items rc = p.add_task(task) assert p.num_tasks() == 1 + null_tasks + st = time.time() for i in range(ni): items = rc.read(1) assert len(items) == 1 - assert i == items[0] + + # can't assert order in async mode + if not async: + assert i == items[0] # END for each item + elapsed = time.time() - st + print >> sys.stderr, "Threadpool: processed %i individual items, with %i threads, one at a time, in %f s ( %f items / s )" % (ni, p.size(), elapsed, ni / elapsed) + # it couldn't yet notice that the input is depleted as we pulled exaclty # ni items - the next one would remove it. Instead, we delete our channel # which triggers orphan handling @@ -113,11 +121,13 @@ class TestThreadPool(TestBase): rc = p.add_task(task) # must read a specific item count # count is still at ni / 2 - here we want more than that - assert len(rc.read(ni / 2 + 2)) == ni / 2 + 2 # make sure its uneven ;) + # 2 steps with n / 4 items, + 1 step with n/4 items to get + 2 + assert len(rc.read(ni / 2 + 2)) == ni / 2 + 2 + # have n / 4 - 2 items on queue, want n / 4 in first chunk, cause 1 processing + # ( 4 in total ). Still want n / 4 - 2 in second chunk, causing another processing assert len(rc.read(ni / 2 - 2)) == ni / 2 - 2 - # END read chunks - task._assert(ni / 4, ni) # read two times, got 4 processing steps + task._assert( 5, ni) assert p.num_tasks() == null_tasks # depleted # but this only hits if we want too many items, if we want less, it could @@ -126,10 +136,18 @@ class TestThreadPool(TestBase): task.reset(make_iter()) task.min_count = None rc = p.add_task(task) + st = time.time() for i in range(ni): - assert rc.read(1)[0] == i + if async: + assert len(rc.read(1)) == 1 + else: + assert rc.read(1)[0] == i + # END handle async mode # END pull individual items # too many processing counts ;) + elapsed = time.time() - st + print >> sys.stderr, "Threadpool: processed %i individual items in chunks of %i, with %i threads, one at a time, in %f s ( %f items / s )" % (ni, ni/4, p.size(), elapsed, ni / elapsed) + task._assert(ni, ni) assert p.num_tasks() == 1 + null_tasks assert p.del_task(task) is p # del manually this time @@ -183,7 +201,9 @@ class TestThreadPool(TestBase): urc1 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None)) urc2 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None)) assert p.num_tasks() == 2 - self._assert_sync_single_task(p) + + ## SINGLE TASK ################# + self._assert_single_task(p, False) assert p.num_tasks() == 2 del(urc1) del(urc2) @@ -209,13 +229,15 @@ class TestThreadPool(TestBase): assert len(threading.enumerate()) == num_threads + 1 # here we go - self._assert_sync_single_task(p) + self._assert_single_task(p, False) # SINGLE TASK ASYNC MODE ######################## # two threads to compete for a single task + p.set_size(2) + self._assert_single_task(p, True) # DEPENDENT TASK ASYNC MODE |