summaryrefslogtreecommitdiff
path: root/python/qpid/concurrency.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/concurrency.py')
-rw-r--r--python/qpid/concurrency.py39
1 files changed, 37 insertions, 2 deletions
diff --git a/python/qpid/concurrency.py b/python/qpid/concurrency.py
index 00cdb6b953..9837a3f0df 100644
--- a/python/qpid/concurrency.py
+++ b/python/qpid/concurrency.py
@@ -17,7 +17,7 @@
# under the License.
#
-import inspect, time
+import compat, inspect, time
def synchronized(meth):
args, vargs, kwargs, defs = inspect.getargspec(meth)
@@ -48,13 +48,17 @@ class Waiter(object):
start = time.time()
while not predicate():
if timeout is None:
+ # XXX: this timed wait thing is not necessary for the fast
+ # condition from this module, only for the condition impl from
+ # the threading module
+
# using the timed wait prevents keyboard interrupts from being
# blocked while waiting
self.condition.wait(3)
elif passed < timeout:
self.condition.wait(timeout - passed)
else:
- return False
+ return bool(predicate())
passed = time.time() - start
return True
@@ -63,3 +67,34 @@ class Waiter(object):
def notifyAll(self):
self.condition.notifyAll()
+
+class Condition:
+
+ def __init__(self, lock):
+ self.lock = lock
+ self.waiters = []
+ self.waiting = []
+
+ def notify(self):
+ assert self.lock._is_owned()
+ if self.waiting:
+ self.waiting[0].wakeup()
+
+ def notifyAll(self):
+ assert self.lock._is_owned()
+ for w in self.waiting:
+ w.wakeup()
+
+ def wait(self, timeout=None):
+ assert self.lock._is_owned()
+ if not self.waiters:
+ self.waiters.append(compat.selectable_waiter())
+ sw = self.waiters.pop(0)
+ self.waiting.append(sw)
+ try:
+ st = self.lock._release_save()
+ sw.wait(timeout)
+ finally:
+ self.lock._acquire_restore(st)
+ self.waiting.remove(sw)
+ self.waiters.append(sw)