diff options
-rw-r--r-- | lib/git/async/pool.py | 18 | ||||
-rw-r--r-- | lib/git/async/task.py | 11 | ||||
-rw-r--r-- | lib/git/async/util.py | 6 | ||||
-rw-r--r-- | test/git/async/test_pool.py | 29 |
4 files changed, 54 insertions, 10 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 3de98777..19fc9f6e 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -67,10 +67,20 @@ class RPoolChannel(RChannel): # if we have count items, don't do any queue preparation - if someone # depletes the queue in the meanwhile, the channel will close and # we will unblock naturally + # PROBLEM: If there are multiple consumer of this channel, we might + # run out of items without being replenished == block forever in the + # worst case. task.min_count could have triggered to produce more ... + # usually per read with n items, we put n items on to the queue, + # so we wouldn't check this + # Even if we have just one consumer ( we could determine that with + # the reference count ), it could be that in one moment we don't yet + # have an item, but its currently being produced by some worker. + # This is why we: + # * make no assumptions if there are multiple consumers + # * have_enough = False if count > 0: - # explicitly > count, as we want a certain safe range - have_enough = self._wc._queue.qsize() > count + have_enough = self._wc._queue.qsize() >= count # END risky game ########## prepare ############################## @@ -78,9 +88,11 @@ class RPoolChannel(RChannel): self._pool._prepare_channel_read(self._task, count) - ######### read data ###### + ####### read data ######## + ########################## # read actual items, tasks were setup to put their output into our channel ( as well ) items = RChannel.read(self, count, block, timeout) + ########################## if self._post_cb: items = self._post_cb(items) diff --git a/lib/git/async/task.py b/lib/git/async/task.py index b282e371..4e8aef54 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -178,6 +178,17 @@ class InputChannelTask(OutputChannelTask): # make sure we don't trigger the pool if we read from a pool channel which # belongs to our own pool. Channels from different pools are fine though, # there we want to trigger its computation + # PROBLEM: if the user keeps an end, but decides to put the same end into + # a task of this pool, then all items might deplete without new ones being + # produced, causing a deadlock. Just triggering the pool would be better, + # but cost's more, unnecessarily if there is just one consumer, which is + # the user. + # * could encode usage in the channel type, and fail if the refcount on + # the read-pool channel is too high + # * maybe keep track of the elements that are requested or in-production + # for each task, which would allow to precisely determine whether + # the pool as to be triggered, and bail out early. Problem would + # be the if isinstance(self._in_rc, RPoolChannel) and self._in_rc._pool is self._pool_ref(): self._read = self._in_rc._read diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 432d1736..85d44694 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -59,7 +59,7 @@ class SyncQueue(deque): class HSCondition(_Condition): """An attempt to make conditions less blocking, which gains performance in return by sleeping less""" - delay = 0.00002 # reduces wait times, but increases overhead + delay = 0.00005 # reduces wait times, but increases overhead def wait(self, timeout=None): waiter = Lock() @@ -85,7 +85,9 @@ class HSCondition(_Condition): remaining = endtime - _time() if remaining <= 0: break - delay = min(delay * 2, remaining, .05) + # this makes 4 threads working as good as two, but of course + # it causes more frequent micro-sleeping + #delay = min(delay * 2, remaining, .05) _sleep(delay) # END endless loop if not gotit: diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 19e86a9a..2b45727c 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -12,9 +12,13 @@ class TestThreadTaskNode(InputIteratorThreadTask): super(TestThreadTaskNode, self).__init__(*args, **kwargs) self.reset(self._iterator) self.should_fail = False + self.lock = threading.Lock() # yes, can't safely do x = x + 1 :) + self.plock = threading.Lock() def do_fun(self, item): + self.lock.acquire() self.item_count += 1 + self.lock.release() if self.should_fail: raise AssertionError("I am failing just for the fun of it") return item @@ -25,14 +29,26 @@ class TestThreadTaskNode(InputIteratorThreadTask): self._iterator = iterator def process(self, count=1): - super(TestThreadTaskNode, self).process(count) + # must do it first, otherwise we might read and check results before + # the thread gets here :). Its a lesson ! + self.plock.acquire() self.process_count += 1 + self.plock.release() + super(TestThreadTaskNode, self).process(count) def _assert(self, pc, fc): """Assert for num process counts (pc) and num function counts (fc) :return: self""" + self.plock.acquire() + if self.process_count != pc: + print self.process_count, pc assert self.process_count == pc + self.plock.release() + self.lock.acquire() + if self.item_count != fc: + print self.item_count, fc assert self.item_count == fc + self.lock.release() assert not self.error() return self @@ -103,15 +119,17 @@ class TestThreadPool(TestBase): # if we query 1 item, it will prepare ni / 2 task.min_count = ni / 2 rc = p.add_task(task) - assert len(rc.read(1)) == 1 # processes ni / 2 - assert len(rc.read(1)) == 1 # processes nothing + items = rc.read(1) + assert len(items) == 1 and items[0] == 0 # processes ni / 2 + items = rc.read(1) + assert len(items) == 1 and items[0] == 1 # processes nothing # rest - it has ni/2 - 2 on the queue, and pulls ni-2 # It wants too much, so the task realizes its done. The task # doesn't care about the items in its output channel items = rc.read(ni-2) assert len(items) == ni - 2 assert p.num_tasks() == null_tasks - task._assert(2, ni) # two chunks, 20 calls ( all items ) + task._assert(2, ni) # two chunks, ni calls # its already done, gives us no more assert len(rc.read()) == 0 @@ -246,7 +264,8 @@ class TestThreadPool(TestBase): p.set_size(2) self._assert_single_task(p, True) - # kill it + # real stress test- should be native on every dual-core cpu with 2 hardware + # threads per core p.set_size(4) self._assert_single_task(p, True) |