diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/qpid/compat.py | 49 | ||||
-rw-r--r-- | python/qpid/concurrency.py | 39 | ||||
-rw-r--r-- | python/qpid/driver.py | 2 | ||||
-rw-r--r-- | python/qpid/messaging.py | 4 |
4 files changed, 73 insertions, 21 deletions
diff --git a/python/qpid/compat.py b/python/qpid/compat.py index e38f0dbe8c..c2b668a5e9 100644 --- a/python/qpid/compat.py +++ b/python/qpid/compat.py @@ -43,27 +43,47 @@ if tuple(sys.version_info[0:2]) < (2, 4): else: from select import select +class BaseWaiter: + + def wakeup(self): + self._do_write() + + def wait(self, timeout=None): + if timeout is not None: + ready, _, _ = select([self], [], [], timeout) + else: + ready = True + + if ready: + self._do_read() + return True + else: + return False + + def reading(self): + return True + + def readable(self): + self._do_read() + if sys.platform in ('win32', 'cygwin'): import socket - class SockWaiter: + class SockWaiter(BaseWaiter): def __init__(self, read_sock, write_sock): self.read_sock = read_sock self.write_sock = write_sock - def wakeup(self): + def _do_write(self): self.write_sock.send("\0") + def _do_read(self): + self.read_sock.recv(65536) + def fileno(self): return self.read_sock.fileno() - def reading(self): - return True - - def readable(self): - self.read_sock.recv(65536) - def __repr__(self): return "SockWaiter(%r, %r)" % (self.read_sock, self.write_sock) @@ -80,24 +100,21 @@ if sys.platform in ('win32', 'cygwin'): else: import os - class PipeWaiter: + class PipeWaiter(BaseWaiter): def __init__(self, read_fd, write_fd): self.read_fd = read_fd self.write_fd = write_fd - def wakeup(self): + def _do_write(self): os.write(self.write_fd, "\0") + def _do_read(self): + os.read(self.read_fd, 65536) + def fileno(self): return self.read_fd - def reading(self): - return True - - def readable(self): - os.read(self.read_fd, 65536) - def __repr__(self): return "PipeWaiter(%r, %r)" % (self.read_fd, self.write_fd) diff --git a/python/qpid/concurrency.py b/python/qpid/concurrency.py index 00cdb6b953..9837a3f0df 100644 --- a/python/qpid/concurrency.py +++ b/python/qpid/concurrency.py @@ -17,7 +17,7 @@ # under the License. # -import inspect, time +import compat, inspect, time def synchronized(meth): args, vargs, kwargs, defs = inspect.getargspec(meth) @@ -48,13 +48,17 @@ class Waiter(object): start = time.time() while not predicate(): if timeout is None: + # XXX: this timed wait thing is not necessary for the fast + # condition from this module, only for the condition impl from + # the threading module + # using the timed wait prevents keyboard interrupts from being # blocked while waiting self.condition.wait(3) elif passed < timeout: self.condition.wait(timeout - passed) else: - return False + return bool(predicate()) passed = time.time() - start return True @@ -63,3 +67,34 @@ class Waiter(object): def notifyAll(self): self.condition.notifyAll() + +class Condition: + + def __init__(self, lock): + self.lock = lock + self.waiters = [] + self.waiting = [] + + def notify(self): + assert self.lock._is_owned() + if self.waiting: + self.waiting[0].wakeup() + + def notifyAll(self): + assert self.lock._is_owned() + for w in self.waiting: + w.wakeup() + + def wait(self, timeout=None): + assert self.lock._is_owned() + if not self.waiters: + self.waiters.append(compat.selectable_waiter()) + sw = self.waiters.pop(0) + self.waiting.append(sw) + try: + st = self.lock._release_save() + sw.wait(timeout) + finally: + self.lock._acquire_restore(st) + self.waiting.remove(sw) + self.waiters.append(sw) diff --git a/python/qpid/driver.py b/python/qpid/driver.py index 4f001974d4..93961e5ed0 100644 --- a/python/qpid/driver.py +++ b/python/qpid/driver.py @@ -755,7 +755,7 @@ class Driver: sst.outgoing_idx -= 1 assert msg == m sst.write_cmd(MessageTransfer(destination=_snd._exchange, headers=(dp, mp), - payload=body, sync=True), msg_acked) + payload=body, sync=True), msg_acked) def do_message_transfer(self, xfr): sst = self.get_sst(xfr) diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index 3e3c8f36cb..2fe7d33ca9 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -31,11 +31,11 @@ Areas that still need work: """ from codec010 import StringCodec -from concurrency import synchronized, Waiter +from concurrency import synchronized, Waiter, Condition from datatypes import timestamp, uuid4, Serial from logging import getLogger from ops import PRIMITIVE -from threading import Thread, RLock, Condition +from threading import Thread, RLock from util import default log = getLogger("qpid.messaging") |