summaryrefslogtreecommitdiff
path: root/python/qpid/messaging
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-06-30 12:44:58 +0000
committerRafael H. Schloming <rhs@apache.org>2010-06-30 12:44:58 +0000
commita374dcc0c34f51a2086122d834009716eb86cd54 (patch)
treea16d9dbdc5103b0144bdb66944430d1636165537 /python/qpid/messaging
parentbd67d8d00a4efde84e19bfbedd794b8e59d6e554 (diff)
downloadqpid-python-a374dcc0c34f51a2086122d834009716eb86cd54.tar.gz
fixed concurrent close
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@959289 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging')
-rw-r--r--python/qpid/messaging/endpoints.py123
-rw-r--r--python/qpid/messaging/exceptions.py9
2 files changed, 91 insertions, 41 deletions
diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py
index 62423ca7d5..f7afc66d46 100644
--- a/python/qpid/messaging/endpoints.py
+++ b/python/qpid/messaging/endpoints.py
@@ -44,7 +44,14 @@ log = getLogger("qpid.messaging")
static = staticmethod
-class Connection:
+class Endpoint:
+
+ def _ecwait(self, predicate, timeout=None):
+ result = self._ewait(lambda: self.closed or predicate(), timeout)
+ self.check_closed()
+ return result
+
+class Connection(Endpoint):
"""
A Connection manages a group of L{Sessions<Session>} and connects
@@ -186,6 +193,11 @@ class Connection:
self.check_error()
return result
+ def check_closed(self):
+ if self.closed:
+ self._condition.gc()
+ raise ConnectionClosed()
+
@synchronized
def session(self, name=None, transactional=False):
"""
@@ -215,7 +227,7 @@ class Connection:
@synchronized
def _remove_session(self, ssn):
- del self.sessions[ssn.name]
+ self.sessions.pop(ssn.name, 0)
@synchronized
def open(self):
@@ -239,9 +251,10 @@ class Connection:
"""
Attach to the remote endpoint.
"""
- self._connected = True
- self._driver.start()
- self._wakeup()
+ if not self._connected:
+ self._connected = True
+ self._driver.start()
+ self._wakeup()
self._ewait(lambda: self._transport_connected and not self._unlinked())
def _unlinked(self):
@@ -255,13 +268,18 @@ class Connection:
"""
Detach from the remote endpoint.
"""
- self._connected = False
- self._wakeup()
+ if self._connected:
+ self._connected = False
+ self._wakeup()
+ cleanup = True
+ else:
+ cleanup = False
try:
if not self._wait(lambda: not self._transport_connected, timeout=timeout):
raise Timeout("detach timed out")
finally:
- self._driver.stop()
+ if cleanup:
+ self._driver.stop()
self._condition.gc()
@synchronized
@@ -283,7 +301,7 @@ class Connection:
self.detach(timeout=timeout)
self._open = False
-class Session:
+class Session(Endpoint):
"""
Sessions provide a linear context for sending and receiving
@@ -532,6 +550,10 @@ class Session:
self.check_error()
return result
+ def check_closed(self):
+ if self.closed:
+ raise SessionClosed()
+
@synchronized
def sender(self, target, **options):
"""
@@ -588,26 +610,27 @@ class Session:
result += 1
return result
- def _peek(self, predicate):
+ def _peek(self, receiver):
for msg in self.incoming:
- if predicate(msg):
+ if msg._receiver == receiver:
return msg
- def _pop(self, predicate):
+ def _pop(self, receiver):
i = 0
while i < len(self.incoming):
msg = self.incoming[i]
- if predicate(msg):
+ if msg._receiver == receiver:
del self.incoming[i]
return msg
else:
i += 1
@synchronized
- def _get(self, predicate, timeout=None):
- if self._ewait(lambda: ((self._peek(predicate) is not None) or self.closing),
+ def _get(self, receiver, timeout=None):
+ if self._ewait(lambda: ((self._peek(receiver) is not None) or
+ self.closing or receiver.closed),
timeout):
- msg = self._pop(predicate)
+ msg = self._pop(receiver)
if msg is not None:
msg._receiver.returned += 1
self.unacked.append(msg)
@@ -617,7 +640,7 @@ class Session:
@synchronized
def next_receiver(self, timeout=None):
- if self._ewait(lambda: self.incoming, timeout):
+ if self._ecwait(lambda: self.incoming, timeout):
return self.incoming[0]._receiver
else:
raise Empty
@@ -644,14 +667,14 @@ class Session:
# XXX: this is currently a SendError, maybe it should be a SessionError?
raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity)
self._wakeup()
- self._ewait(lambda: len(self.acked) < self.ack_capacity)
+ self._ecwait(lambda: len(self.acked) < self.ack_capacity)
m._disposition = disposition
self.unacked.remove(m)
self.acked.append(m)
self._wakeup()
if sync:
- self._ewait(lambda: not [m for m in messages if m in self.acked])
+ self._ecwait(lambda: not [m for m in messages if m in self.acked])
@synchronized
def commit(self):
@@ -663,7 +686,7 @@ class Session:
raise NontransactionalSession()
self.committing = True
self._wakeup()
- self._ewait(lambda: not self.committing)
+ self._ecwait(lambda: not self.committing)
if self.aborted:
raise TransactionAborted()
assert self.committed
@@ -678,7 +701,7 @@ class Session:
raise NontransactionalSession()
self.aborting = True
self._wakeup()
- self._ewait(lambda: not self.aborting)
+ self._ecwait(lambda: not self.aborting)
assert self.aborted
@synchronized
@@ -701,8 +724,10 @@ class Session:
for link in self.receivers + self.senders:
link.close(timeout=timeout)
- self.closing = True
- self._wakeup()
+ if not self.closing:
+ self.closing = True
+ self._wakeup()
+
try:
if not self._ewait(lambda: self.closed, timeout=timeout):
raise Timeout("session close timed out")
@@ -715,7 +740,7 @@ def _mangle(addr):
else:
return addr
-class Sender:
+class Sender(Endpoint):
"""
Sends outgoing messages.
@@ -758,6 +783,10 @@ class Sender:
self.check_error()
return result
+ def check_closed(self):
+ if self.closed:
+ raise LinkClosed()
+
@synchronized
def unsettled(self):
"""
@@ -799,7 +828,7 @@ class Sender:
if not self.session.connection._connected or self.session.closing:
raise Detached()
- self._ewait(lambda: self.linked)
+ self._ecwait(lambda: self.linked)
if isinstance(object, Message):
message = object
@@ -812,7 +841,7 @@ class Sender:
if self.capacity is not UNLIMITED:
if self.capacity <= 0:
raise InsufficientCapacity("capacity = %s" % self.capacity)
- if not self._ewait(self.available, timeout=timeout):
+ if not self._ecwait(self.available, timeout=timeout):
raise InsufficientCapacity("capacity = %s" % self.capacity)
# XXX: what if we send the same message to multiple senders?
@@ -849,15 +878,20 @@ class Sender:
if self.acked < self.queued:
self.sync(timeout=timeout)
- self.closing = True
- self._wakeup()
+ if not self.closing:
+ self.closing = True
+ self._wakeup()
+
try:
if not self.session._ewait(lambda: self.closed, timeout=timeout):
raise Timeout("sender close timed out")
finally:
- self.session.senders.remove(self)
+ try:
+ self.session.senders.remove(self)
+ except ValueError:
+ pass
-class Receiver(object):
+class Receiver(Endpoint, object):
"""
Receives incoming messages from a remote source. Messages may be
@@ -923,6 +957,10 @@ class Receiver(object):
self.check_error()
return result
+ def check_closed(self):
+ if self.closed:
+ raise LinkClosed()
+
@synchronized
def unsettled(self):
"""
@@ -941,9 +979,6 @@ class Receiver(object):
"""
return self.received - self.returned
- def _pred(self, msg):
- return msg._receiver == self
-
@synchronized
def fetch(self, timeout=None):
"""
@@ -955,20 +990,21 @@ class Receiver(object):
@param timeout: the time to wait for a message to be available
"""
- self._ewait(lambda: self.linked)
+ self._ecwait(lambda: self.linked)
if self._capacity == 0:
self.granted = self.returned + 1
self._wakeup()
- self._ewait(lambda: self.impending >= self.granted)
- msg = self.session._get(self._pred, timeout=timeout)
+ self._ecwait(lambda: self.impending >= self.granted)
+ msg = self.session._get(self, timeout=timeout)
if msg is None:
+ self.check_closed()
self.draining = True
self._wakeup()
- self._ewait(lambda: not self.draining)
+ self._ecwait(lambda: not self.draining)
self._grant()
self._wakeup()
- msg = self.session._get(self._pred, timeout=0)
+ msg = self.session._get(self, timeout=0)
if msg is None:
raise Empty()
elif self._capacity not in (0, UNLIMITED.value):
@@ -988,12 +1024,17 @@ class Receiver(object):
"""
Close the receiver.
"""
- self.closing = True
- self._wakeup()
+ if not self.closing:
+ self.closing = True
+ self._wakeup()
+
try:
if not self.session._ewait(lambda: self.closed, timeout=timeout):
raise Timeout("receiver close timed out")
finally:
- self.session.receivers.remove(self)
+ try:
+ self.session.receivers.remove(self)
+ except ValueError:
+ pass
__all__ = ["Connection", "Session", "Sender", "Receiver"]
diff --git a/python/qpid/messaging/exceptions.py b/python/qpid/messaging/exceptions.py
index f640b6bc1a..27bc5af035 100644
--- a/python/qpid/messaging/exceptions.py
+++ b/python/qpid/messaging/exceptions.py
@@ -60,6 +60,9 @@ class VersionError(ConnectError):
class AuthenticationFailure(ConnectError):
pass
+class ConnectionClosed(ConnectionError):
+ pass
+
## Session Errors
class SessionError(MessagingError):
@@ -91,6 +94,9 @@ class UnauthorizedAccess(SessionError):
class ServerError(SessionError):
pass
+class SessionClosed(SessionError):
+ pass
+
## Link Errors
class LinkError(MessagingError):
@@ -117,6 +123,9 @@ class AssertionFailed(ResolutionError):
class NotFound(ResolutionError):
pass
+class LinkClosed(LinkError):
+ pass
+
## Sender Errors
class SenderError(LinkError):