summaryrefslogtreecommitdiff
path: root/python/qpid/messaging.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-09-02 15:35:42 +0000
committerRafael H. Schloming <rhs@apache.org>2009-09-02 15:35:42 +0000
commit9c0a76ef0159743bb58d288c89a7e2faf5be11be (patch)
tree5ac5d688dfbc6ee67deb5296f3e3272eef07e14a /python/qpid/messaging.py
parente6b63285996dc65cb214ab94f55676f48f80188b (diff)
downloadqpid-python-9c0a76ef0159743bb58d288c89a7e2faf5be11be.tar.gz
changed Lockable -> Waiter and switched its usage from has-a to is-a; also fixed some more imports
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@810573 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging.py')
-rw-r--r--python/qpid/messaging.py36
1 files changed, 20 insertions, 16 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py
index 3c12641bca..a3f2d43ed2 100644
--- a/python/qpid/messaging.py
+++ b/python/qpid/messaging.py
@@ -31,8 +31,8 @@ Areas that still need work:
"""
from codec010 import StringCodec
+from concurrency import synchronized, Waiter
from datatypes import timestamp, uuid4, Serial
-from lockable import synchronized, Lockable
from logging import getLogger
from ops import PRIMITIVE
from threading import Thread, RLock, Condition
@@ -69,7 +69,7 @@ class ConnectError(ConnectionError):
"""
pass
-class Connection(Lockable):
+class Connection:
"""
A Connection manages a group of L{Sessions<Session>} and connects
@@ -114,19 +114,23 @@ class Connection(Lockable):
self._connected = False
self._lock = RLock()
self._condition = Condition(self._lock)
+ self._waiter = Waiter(self._condition)
self._modcount = Serial(0)
self.error = None
from driver import Driver
self._driver = Driver(self)
self._driver.start()
+ def _wait(self, predicate, timeout=None):
+ return self._waiter.wait(predicate, timeout=timeout)
+
def _wakeup(self):
self._modcount += 1
self._driver.wakeup()
def _catchup(self, exc=ConnectionError):
mc = self._modcount
- self.wait(lambda: not self._driver._modcount < mc)
+ self._wait(lambda: not self._driver._modcount < mc)
self._check_error(exc)
def _check_error(self, exc=ConnectionError):
@@ -134,7 +138,7 @@ class Connection(Lockable):
raise exc(*self.error)
def _ewait(self, predicate, timeout=None, exc=ConnectionError):
- result = self.wait(lambda: self.error or predicate(), timeout)
+ result = self._wait(lambda: self.error or predicate(), timeout)
self._check_error(exc)
return result
@@ -255,7 +259,7 @@ class NontransactionalSession(SessionError):
class TransactionAborted(SessionError):
pass
-class Session(Lockable):
+class Session:
"""
Sessions provide a linear context for sending and receiving
@@ -287,7 +291,6 @@ class Session(Lockable):
self.closed = False
self._lock = connection._lock
- self._condition = connection._condition
self.thread = Thread(target = self.run)
self.thread.setDaemon(True)
self.thread.start()
@@ -295,6 +298,9 @@ class Session(Lockable):
def __repr__(self):
return "<Session %s>" % self.name
+ def _wait(self, predicate, timeout=None):
+ return self.connection._wait(predicate, timeout=timeout)
+
def _wakeup(self):
self.connection._wakeup()
@@ -369,8 +375,8 @@ class Session(Lockable):
@synchronized
def _get(self, predicate, timeout=None):
- if self.wait(lambda: ((self._peek(predicate) is not None) or self.closing),
- timeout):
+ if self._wait(lambda: ((self._peek(predicate) is not None) or self.closing),
+ timeout):
msg = self._pop(predicate)
if msg is not None:
msg._receiver.returned += 1
@@ -454,7 +460,7 @@ class Session(Lockable):
for rcv in self.receivers:
rcv.stop()
# TODO: think about stopping individual receivers in listen mode
- self.wait(lambda: self._peek(self._pred) is None)
+ self._wait(lambda: self._peek(self._pred) is None)
self.started = False
def _pred(self, m):
@@ -470,10 +476,10 @@ class Session(Lockable):
else:
msg._receiver.listener(msg)
if self._peek(self._pred) is None:
- self.notifyAll()
+ self.connection._waiter.notifyAll()
finally:
self.closed = True
- self.notifyAll()
+ self.connection._waiter.notifyAll()
@synchronized
def close(self):
@@ -486,7 +492,7 @@ class Session(Lockable):
self.closing = True
self._wakeup()
self._catchup()
- self.wait(lambda: self.closed)
+ self._wait(lambda: self.closed)
while self.thread.isAlive():
self.thread.join(3)
self.thread = None
@@ -500,7 +506,7 @@ class SendError(SessionError):
class InsufficientCapacity(SendError):
pass
-class Sender(Lockable):
+class Sender:
"""
Sends outgoing messages.
@@ -515,7 +521,6 @@ class Sender(Lockable):
self.acked = Serial(0)
self.closed = False
self._lock = self.session._lock
- self._condition = self.session._condition
def _wakeup(self):
self.session._wakeup()
@@ -598,7 +603,7 @@ class Empty(ReceiveError):
"""
pass
-class Receiver(Lockable):
+class Receiver:
"""
Receives incoming messages from a remote source. Messages may be
@@ -625,7 +630,6 @@ class Receiver(Lockable):
self.closed = False
self.listener = None
self._lock = self.session._lock
- self._condition = self.session._condition
def _wakeup(self):
self.session._wakeup()