summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/git/async/pool.py18
-rw-r--r--lib/git/async/task.py11
-rw-r--r--lib/git/async/util.py6
-rw-r--r--test/git/async/test_pool.py29
4 files changed, 54 insertions, 10 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 3de98777..19fc9f6e 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -67,10 +67,20 @@ class RPoolChannel(RChannel):
# if we have count items, don't do any queue preparation - if someone
# depletes the queue in the meanwhile, the channel will close and
# we will unblock naturally
+ # PROBLEM: If there are multiple consumer of this channel, we might
+ # run out of items without being replenished == block forever in the
+ # worst case. task.min_count could have triggered to produce more ...
+ # usually per read with n items, we put n items on to the queue,
+ # so we wouldn't check this
+ # Even if we have just one consumer ( we could determine that with
+ # the reference count ), it could be that in one moment we don't yet
+ # have an item, but its currently being produced by some worker.
+ # This is why we:
+ # * make no assumptions if there are multiple consumers
+ # *
have_enough = False
if count > 0:
- # explicitly > count, as we want a certain safe range
- have_enough = self._wc._queue.qsize() > count
+ have_enough = self._wc._queue.qsize() >= count
# END risky game
########## prepare ##############################
@@ -78,9 +88,11 @@ class RPoolChannel(RChannel):
self._pool._prepare_channel_read(self._task, count)
- ######### read data ######
+ ####### read data ########
+ ##########################
# read actual items, tasks were setup to put their output into our channel ( as well )
items = RChannel.read(self, count, block, timeout)
+ ##########################
if self._post_cb:
items = self._post_cb(items)
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index b282e371..4e8aef54 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -178,6 +178,17 @@ class InputChannelTask(OutputChannelTask):
# make sure we don't trigger the pool if we read from a pool channel which
# belongs to our own pool. Channels from different pools are fine though,
# there we want to trigger its computation
+ # PROBLEM: if the user keeps an end, but decides to put the same end into
+ # a task of this pool, then all items might deplete without new ones being
+ # produced, causing a deadlock. Just triggering the pool would be better,
+ # but cost's more, unnecessarily if there is just one consumer, which is
+ # the user.
+ # * could encode usage in the channel type, and fail if the refcount on
+ # the read-pool channel is too high
+ # * maybe keep track of the elements that are requested or in-production
+ # for each task, which would allow to precisely determine whether
+ # the pool as to be triggered, and bail out early. Problem would
+ # be the
if isinstance(self._in_rc, RPoolChannel) and self._in_rc._pool is self._pool_ref():
self._read = self._in_rc._read
diff --git a/lib/git/async/util.py b/lib/git/async/util.py
index 432d1736..85d44694 100644
--- a/lib/git/async/util.py
+++ b/lib/git/async/util.py
@@ -59,7 +59,7 @@ class SyncQueue(deque):
class HSCondition(_Condition):
"""An attempt to make conditions less blocking, which gains performance
in return by sleeping less"""
- delay = 0.00002 # reduces wait times, but increases overhead
+ delay = 0.00005 # reduces wait times, but increases overhead
def wait(self, timeout=None):
waiter = Lock()
@@ -85,7 +85,9 @@ class HSCondition(_Condition):
remaining = endtime - _time()
if remaining <= 0:
break
- delay = min(delay * 2, remaining, .05)
+ # this makes 4 threads working as good as two, but of course
+ # it causes more frequent micro-sleeping
+ #delay = min(delay * 2, remaining, .05)
_sleep(delay)
# END endless loop
if not gotit:
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py
index 19e86a9a..2b45727c 100644
--- a/test/git/async/test_pool.py
+++ b/test/git/async/test_pool.py
@@ -12,9 +12,13 @@ class TestThreadTaskNode(InputIteratorThreadTask):
super(TestThreadTaskNode, self).__init__(*args, **kwargs)
self.reset(self._iterator)
self.should_fail = False
+ self.lock = threading.Lock() # yes, can't safely do x = x + 1 :)
+ self.plock = threading.Lock()
def do_fun(self, item):
+ self.lock.acquire()
self.item_count += 1
+ self.lock.release()
if self.should_fail:
raise AssertionError("I am failing just for the fun of it")
return item
@@ -25,14 +29,26 @@ class TestThreadTaskNode(InputIteratorThreadTask):
self._iterator = iterator
def process(self, count=1):
- super(TestThreadTaskNode, self).process(count)
+ # must do it first, otherwise we might read and check results before
+ # the thread gets here :). Its a lesson !
+ self.plock.acquire()
self.process_count += 1
+ self.plock.release()
+ super(TestThreadTaskNode, self).process(count)
def _assert(self, pc, fc):
"""Assert for num process counts (pc) and num function counts (fc)
:return: self"""
+ self.plock.acquire()
+ if self.process_count != pc:
+ print self.process_count, pc
assert self.process_count == pc
+ self.plock.release()
+ self.lock.acquire()
+ if self.item_count != fc:
+ print self.item_count, fc
assert self.item_count == fc
+ self.lock.release()
assert not self.error()
return self
@@ -103,15 +119,17 @@ class TestThreadPool(TestBase):
# if we query 1 item, it will prepare ni / 2
task.min_count = ni / 2
rc = p.add_task(task)
- assert len(rc.read(1)) == 1 # processes ni / 2
- assert len(rc.read(1)) == 1 # processes nothing
+ items = rc.read(1)
+ assert len(items) == 1 and items[0] == 0 # processes ni / 2
+ items = rc.read(1)
+ assert len(items) == 1 and items[0] == 1 # processes nothing
# rest - it has ni/2 - 2 on the queue, and pulls ni-2
# It wants too much, so the task realizes its done. The task
# doesn't care about the items in its output channel
items = rc.read(ni-2)
assert len(items) == ni - 2
assert p.num_tasks() == null_tasks
- task._assert(2, ni) # two chunks, 20 calls ( all items )
+ task._assert(2, ni) # two chunks, ni calls
# its already done, gives us no more
assert len(rc.read()) == 0
@@ -246,7 +264,8 @@ class TestThreadPool(TestBase):
p.set_size(2)
self._assert_single_task(p, True)
- # kill it
+ # real stress test- should be native on every dual-core cpu with 2 hardware
+ # threads per core
p.set_size(4)
self._assert_single_task(p, True)