summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-09-02 15:35:42 +0000
committerRafael H. Schloming <rhs@apache.org>2009-09-02 15:35:42 +0000
commit9c0a76ef0159743bb58d288c89a7e2faf5be11be (patch)
tree5ac5d688dfbc6ee67deb5296f3e3272eef07e14a /python
parente6b63285996dc65cb214ab94f55676f48f80188b (diff)
downloadqpid-python-9c0a76ef0159743bb58d288c89a7e2faf5be11be.tar.gz
changed Lockable -> Waiter and switched its usage from has-a to is-a; also fixed some more imports
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@810573 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/qpid/concurrency.py (renamed from python/qpid/lockable.py)21
-rw-r--r--python/qpid/driver.py13
-rw-r--r--python/qpid/messaging.py36
3 files changed, 35 insertions, 35 deletions
diff --git a/python/qpid/lockable.py b/python/qpid/concurrency.py
index 0415d53e27..00cdb6b953 100644
--- a/python/qpid/lockable.py
+++ b/python/qpid/concurrency.py
@@ -26,11 +26,11 @@ def synchronized(meth):
exec """
def %s%s:
%s
- %s.lock()
+ %s._lock.acquire()
try:
return meth%s
finally:
- %s.unlock()
+ %s._lock.release()
""" % (meth.__name__, inspect.formatargspec(args, vargs, kwargs, defs),
repr(inspect.getdoc(meth)), args[0],
inspect.formatargspec(args, vargs, kwargs, defs,
@@ -38,13 +38,10 @@ def %s%s:
args[0]) in scope
return scope[meth.__name__]
-class Lockable(object):
+class Waiter(object):
- def lock(self):
- self._lock.acquire()
-
- def unlock(self):
- self._lock.release()
+ def __init__(self, condition):
+ self.condition = condition
def wait(self, predicate, timeout=None):
passed = 0
@@ -53,16 +50,16 @@ class Lockable(object):
if timeout is None:
# using the timed wait prevents keyboard interrupts from being
# blocked while waiting
- self._condition.wait(3)
+ self.condition.wait(3)
elif passed < timeout:
- self._condition.wait(timeout - passed)
+ self.condition.wait(timeout - passed)
else:
return False
passed = time.time() - start
return True
def notify(self):
- self._condition.notify()
+ self.condition.notify()
def notifyAll(self):
- self._condition.notifyAll()
+ self.condition.notifyAll()
diff --git a/python/qpid/driver.py b/python/qpid/driver.py
index 2f56a6008e..a759588572 100644
--- a/python/qpid/driver.py
+++ b/python/qpid/driver.py
@@ -18,11 +18,11 @@
#
import compat, connection, socket, sys, time
+from concurrency import synchronized
from datatypes import RangedSet, Message as Message010
from exceptions import Timeout
-from lockable import synchronized, Lockable
from logging import getLogger
-from messaging import get_codec, Message, Pattern, UNLIMITED
+from messaging import get_codec, ConnectError, Message, Pattern, UNLIMITED
from ops import delivery_mode
from session import Client, INCOMPLETE, SessionDetached
from threading import Condition, Thread
@@ -63,12 +63,11 @@ def delegate(handler, session):
return handler._message_transfer(session, cmd)
return Delegate
-class Driver(Lockable):
+class Driver:
def __init__(self, connection):
self.connection = connection
self._lock = self.connection._lock
- self._condition = self.connection._condition
self._wakeup_cond = Condition()
self._socket = None
self._conn = None
@@ -134,7 +133,7 @@ class Driver(Lockable):
self.connection.error = (msg,)
self._modcount = modcount
- self.notifyAll()
+ self.connection._waiter.notifyAll()
def connect(self):
if self._conn is not None:
@@ -177,7 +176,7 @@ class Driver(Lockable):
_ssn.auto_sync = False
_ssn.invoke_lock = self._lock
_ssn.lock = self._lock
- _ssn.condition = self._condition
+ _ssn.condition = self.connection._condition
if ssn.transactional:
# XXX: adding an attribute to qpid.session.Session
_ssn.acked = []
@@ -422,7 +421,7 @@ class Driver(Lockable):
rcv.received += 1
log.debug("RECV [%s] %s", ssn, msg)
ssn.incoming.append(msg)
- self.notifyAll()
+ self.connection._waiter.notifyAll()
return INCOMPLETE
def _decode(self, message):
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py
index 3c12641bca..a3f2d43ed2 100644
--- a/python/qpid/messaging.py
+++ b/python/qpid/messaging.py
@@ -31,8 +31,8 @@ Areas that still need work:
"""
from codec010 import StringCodec
+from concurrency import synchronized, Waiter
from datatypes import timestamp, uuid4, Serial
-from lockable import synchronized, Lockable
from logging import getLogger
from ops import PRIMITIVE
from threading import Thread, RLock, Condition
@@ -69,7 +69,7 @@ class ConnectError(ConnectionError):
"""
pass
-class Connection(Lockable):
+class Connection:
"""
A Connection manages a group of L{Sessions<Session>} and connects
@@ -114,19 +114,23 @@ class Connection(Lockable):
self._connected = False
self._lock = RLock()
self._condition = Condition(self._lock)
+ self._waiter = Waiter(self._condition)
self._modcount = Serial(0)
self.error = None
from driver import Driver
self._driver = Driver(self)
self._driver.start()
+ def _wait(self, predicate, timeout=None):
+ return self._waiter.wait(predicate, timeout=timeout)
+
def _wakeup(self):
self._modcount += 1
self._driver.wakeup()
def _catchup(self, exc=ConnectionError):
mc = self._modcount
- self.wait(lambda: not self._driver._modcount < mc)
+ self._wait(lambda: not self._driver._modcount < mc)
self._check_error(exc)
def _check_error(self, exc=ConnectionError):
@@ -134,7 +138,7 @@ class Connection(Lockable):
raise exc(*self.error)
def _ewait(self, predicate, timeout=None, exc=ConnectionError):
- result = self.wait(lambda: self.error or predicate(), timeout)
+ result = self._wait(lambda: self.error or predicate(), timeout)
self._check_error(exc)
return result
@@ -255,7 +259,7 @@ class NontransactionalSession(SessionError):
class TransactionAborted(SessionError):
pass
-class Session(Lockable):
+class Session:
"""
Sessions provide a linear context for sending and receiving
@@ -287,7 +291,6 @@ class Session(Lockable):
self.closed = False
self._lock = connection._lock
- self._condition = connection._condition
self.thread = Thread(target = self.run)
self.thread.setDaemon(True)
self.thread.start()
@@ -295,6 +298,9 @@ class Session(Lockable):
def __repr__(self):
return "<Session %s>" % self.name
+ def _wait(self, predicate, timeout=None):
+ return self.connection._wait(predicate, timeout=timeout)
+
def _wakeup(self):
self.connection._wakeup()
@@ -369,8 +375,8 @@ class Session(Lockable):
@synchronized
def _get(self, predicate, timeout=None):
- if self.wait(lambda: ((self._peek(predicate) is not None) or self.closing),
- timeout):
+ if self._wait(lambda: ((self._peek(predicate) is not None) or self.closing),
+ timeout):
msg = self._pop(predicate)
if msg is not None:
msg._receiver.returned += 1
@@ -454,7 +460,7 @@ class Session(Lockable):
for rcv in self.receivers:
rcv.stop()
# TODO: think about stopping individual receivers in listen mode
- self.wait(lambda: self._peek(self._pred) is None)
+ self._wait(lambda: self._peek(self._pred) is None)
self.started = False
def _pred(self, m):
@@ -470,10 +476,10 @@ class Session(Lockable):
else:
msg._receiver.listener(msg)
if self._peek(self._pred) is None:
- self.notifyAll()
+ self.connection._waiter.notifyAll()
finally:
self.closed = True
- self.notifyAll()
+ self.connection._waiter.notifyAll()
@synchronized
def close(self):
@@ -486,7 +492,7 @@ class Session(Lockable):
self.closing = True
self._wakeup()
self._catchup()
- self.wait(lambda: self.closed)
+ self._wait(lambda: self.closed)
while self.thread.isAlive():
self.thread.join(3)
self.thread = None
@@ -500,7 +506,7 @@ class SendError(SessionError):
class InsufficientCapacity(SendError):
pass
-class Sender(Lockable):
+class Sender:
"""
Sends outgoing messages.
@@ -515,7 +521,6 @@ class Sender(Lockable):
self.acked = Serial(0)
self.closed = False
self._lock = self.session._lock
- self._condition = self.session._condition
def _wakeup(self):
self.session._wakeup()
@@ -598,7 +603,7 @@ class Empty(ReceiveError):
"""
pass
-class Receiver(Lockable):
+class Receiver:
"""
Receives incoming messages from a remote source. Messages may be
@@ -625,7 +630,6 @@ class Receiver(Lockable):
self.closed = False
self.listener = None
self._lock = self.session._lock
- self._condition = self.session._condition
def _wakeup(self):
self.session._wakeup()