summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/qpid/compat.py49
-rw-r--r--python/qpid/concurrency.py39
-rw-r--r--python/qpid/driver.py2
-rw-r--r--python/qpid/messaging.py4
4 files changed, 73 insertions, 21 deletions
diff --git a/python/qpid/compat.py b/python/qpid/compat.py
index e38f0dbe8c..c2b668a5e9 100644
--- a/python/qpid/compat.py
+++ b/python/qpid/compat.py
@@ -43,27 +43,47 @@ if tuple(sys.version_info[0:2]) < (2, 4):
else:
from select import select
+class BaseWaiter:
+
+ def wakeup(self):
+ self._do_write()
+
+ def wait(self, timeout=None):
+ if timeout is not None:
+ ready, _, _ = select([self], [], [], timeout)
+ else:
+ ready = True
+
+ if ready:
+ self._do_read()
+ return True
+ else:
+ return False
+
+ def reading(self):
+ return True
+
+ def readable(self):
+ self._do_read()
+
if sys.platform in ('win32', 'cygwin'):
import socket
- class SockWaiter:
+ class SockWaiter(BaseWaiter):
def __init__(self, read_sock, write_sock):
self.read_sock = read_sock
self.write_sock = write_sock
- def wakeup(self):
+ def _do_write(self):
self.write_sock.send("\0")
+ def _do_read(self):
+ self.read_sock.recv(65536)
+
def fileno(self):
return self.read_sock.fileno()
- def reading(self):
- return True
-
- def readable(self):
- self.read_sock.recv(65536)
-
def __repr__(self):
return "SockWaiter(%r, %r)" % (self.read_sock, self.write_sock)
@@ -80,24 +100,21 @@ if sys.platform in ('win32', 'cygwin'):
else:
import os
- class PipeWaiter:
+ class PipeWaiter(BaseWaiter):
def __init__(self, read_fd, write_fd):
self.read_fd = read_fd
self.write_fd = write_fd
- def wakeup(self):
+ def _do_write(self):
os.write(self.write_fd, "\0")
+ def _do_read(self):
+ os.read(self.read_fd, 65536)
+
def fileno(self):
return self.read_fd
- def reading(self):
- return True
-
- def readable(self):
- os.read(self.read_fd, 65536)
-
def __repr__(self):
return "PipeWaiter(%r, %r)" % (self.read_fd, self.write_fd)
diff --git a/python/qpid/concurrency.py b/python/qpid/concurrency.py
index 00cdb6b953..9837a3f0df 100644
--- a/python/qpid/concurrency.py
+++ b/python/qpid/concurrency.py
@@ -17,7 +17,7 @@
# under the License.
#
-import inspect, time
+import compat, inspect, time
def synchronized(meth):
args, vargs, kwargs, defs = inspect.getargspec(meth)
@@ -48,13 +48,17 @@ class Waiter(object):
start = time.time()
while not predicate():
if timeout is None:
+ # XXX: this timed wait thing is not necessary for the fast
+ # condition from this module, only for the condition impl from
+ # the threading module
+
# using the timed wait prevents keyboard interrupts from being
# blocked while waiting
self.condition.wait(3)
elif passed < timeout:
self.condition.wait(timeout - passed)
else:
- return False
+ return bool(predicate())
passed = time.time() - start
return True
@@ -63,3 +67,34 @@ class Waiter(object):
def notifyAll(self):
self.condition.notifyAll()
+
+class Condition:
+
+ def __init__(self, lock):
+ self.lock = lock
+ self.waiters = []
+ self.waiting = []
+
+ def notify(self):
+ assert self.lock._is_owned()
+ if self.waiting:
+ self.waiting[0].wakeup()
+
+ def notifyAll(self):
+ assert self.lock._is_owned()
+ for w in self.waiting:
+ w.wakeup()
+
+ def wait(self, timeout=None):
+ assert self.lock._is_owned()
+ if not self.waiters:
+ self.waiters.append(compat.selectable_waiter())
+ sw = self.waiters.pop(0)
+ self.waiting.append(sw)
+ try:
+ st = self.lock._release_save()
+ sw.wait(timeout)
+ finally:
+ self.lock._acquire_restore(st)
+ self.waiting.remove(sw)
+ self.waiters.append(sw)
diff --git a/python/qpid/driver.py b/python/qpid/driver.py
index 4f001974d4..93961e5ed0 100644
--- a/python/qpid/driver.py
+++ b/python/qpid/driver.py
@@ -755,7 +755,7 @@ class Driver:
sst.outgoing_idx -= 1
assert msg == m
sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp),
- payload=body, sync=True), msg_acked)
+ payload=body, sync=True), msg_acked)
def do_message_transfer(self, xfr):
sst = self.get_sst(xfr)
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py
index 3e3c8f36cb..2fe7d33ca9 100644
--- a/python/qpid/messaging.py
+++ b/python/qpid/messaging.py
@@ -31,11 +31,11 @@ Areas that still need work:
"""
from codec010 import StringCodec
-from concurrency import synchronized, Waiter
+from concurrency import synchronized, Waiter, Condition
from datatypes import timestamp, uuid4, Serial
from logging import getLogger
from ops import PRIMITIVE
-from threading import Thread, RLock, Condition
+from threading import Thread, RLock
from util import default
log = getLogger("qpid.messaging")