diff options
author | Rafael H. Schloming <rhs@apache.org> | 2009-09-02 15:35:42 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2009-09-02 15:35:42 +0000 |
commit | 9c0a76ef0159743bb58d288c89a7e2faf5be11be (patch) | |
tree | 5ac5d688dfbc6ee67deb5296f3e3272eef07e14a /python/qpid/messaging.py | |
parent | e6b63285996dc65cb214ab94f55676f48f80188b (diff) | |
download | qpid-python-9c0a76ef0159743bb58d288c89a7e2faf5be11be.tar.gz |
changed Lockable -> Waiter and switched its usage from has-a to is-a; also fixed some more imports
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@810573 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging.py')
-rw-r--r-- | python/qpid/messaging.py | 36 |
1 files changed, 20 insertions, 16 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index 3c12641bca..a3f2d43ed2 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -31,8 +31,8 @@ Areas that still need work: """ from codec010 import StringCodec +from concurrency import synchronized, Waiter from datatypes import timestamp, uuid4, Serial -from lockable import synchronized, Lockable from logging import getLogger from ops import PRIMITIVE from threading import Thread, RLock, Condition @@ -69,7 +69,7 @@ class ConnectError(ConnectionError): """ pass -class Connection(Lockable): +class Connection: """ A Connection manages a group of L{Sessions<Session>} and connects @@ -114,19 +114,23 @@ class Connection(Lockable): self._connected = False self._lock = RLock() self._condition = Condition(self._lock) + self._waiter = Waiter(self._condition) self._modcount = Serial(0) self.error = None from driver import Driver self._driver = Driver(self) self._driver.start() + def _wait(self, predicate, timeout=None): + return self._waiter.wait(predicate, timeout=timeout) + def _wakeup(self): self._modcount += 1 self._driver.wakeup() def _catchup(self, exc=ConnectionError): mc = self._modcount - self.wait(lambda: not self._driver._modcount < mc) + self._wait(lambda: not self._driver._modcount < mc) self._check_error(exc) def _check_error(self, exc=ConnectionError): @@ -134,7 +138,7 @@ class Connection(Lockable): raise exc(*self.error) def _ewait(self, predicate, timeout=None, exc=ConnectionError): - result = self.wait(lambda: self.error or predicate(), timeout) + result = self._wait(lambda: self.error or predicate(), timeout) self._check_error(exc) return result @@ -255,7 +259,7 @@ class NontransactionalSession(SessionError): class TransactionAborted(SessionError): pass -class Session(Lockable): +class Session: """ Sessions provide a linear context for sending and receiving @@ -287,7 +291,6 @@ class Session(Lockable): self.closed = False self._lock = connection._lock - self._condition = connection._condition self.thread = Thread(target = self.run) self.thread.setDaemon(True) self.thread.start() @@ -295,6 +298,9 @@ class Session(Lockable): def __repr__(self): return "<Session %s>" % self.name + def _wait(self, predicate, timeout=None): + return self.connection._wait(predicate, timeout=timeout) + def _wakeup(self): self.connection._wakeup() @@ -369,8 +375,8 @@ class Session(Lockable): @synchronized def _get(self, predicate, timeout=None): - if self.wait(lambda: ((self._peek(predicate) is not None) or self.closing), - timeout): + if self._wait(lambda: ((self._peek(predicate) is not None) or self.closing), + timeout): msg = self._pop(predicate) if msg is not None: msg._receiver.returned += 1 @@ -454,7 +460,7 @@ class Session(Lockable): for rcv in self.receivers: rcv.stop() # TODO: think about stopping individual receivers in listen mode - self.wait(lambda: self._peek(self._pred) is None) + self._wait(lambda: self._peek(self._pred) is None) self.started = False def _pred(self, m): @@ -470,10 +476,10 @@ class Session(Lockable): else: msg._receiver.listener(msg) if self._peek(self._pred) is None: - self.notifyAll() + self.connection._waiter.notifyAll() finally: self.closed = True - self.notifyAll() + self.connection._waiter.notifyAll() @synchronized def close(self): @@ -486,7 +492,7 @@ class Session(Lockable): self.closing = True self._wakeup() self._catchup() - self.wait(lambda: self.closed) + self._wait(lambda: self.closed) while self.thread.isAlive(): self.thread.join(3) self.thread = None @@ -500,7 +506,7 @@ class SendError(SessionError): class InsufficientCapacity(SendError): pass -class Sender(Lockable): +class Sender: """ Sends outgoing messages. @@ -515,7 +521,6 @@ class Sender(Lockable): self.acked = Serial(0) self.closed = False self._lock = self.session._lock - self._condition = self.session._condition def _wakeup(self): self.session._wakeup() @@ -598,7 +603,7 @@ class Empty(ReceiveError): """ pass -class Receiver(Lockable): +class Receiver: """ Receives incoming messages from a remote source. Messages may be @@ -625,7 +630,6 @@ class Receiver(Lockable): self.closed = False self.listener = None self._lock = self.session._lock - self._condition = self.session._condition def _wakeup(self): self.session._wakeup() |