diff options
author | Rafael H. Schloming <rhs@apache.org> | 2010-06-30 12:44:58 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2010-06-30 12:44:58 +0000 |
commit | a374dcc0c34f51a2086122d834009716eb86cd54 (patch) | |
tree | a16d9dbdc5103b0144bdb66944430d1636165537 /python/qpid/messaging/endpoints.py | |
parent | bd67d8d00a4efde84e19bfbedd794b8e59d6e554 (diff) | |
download | qpid-python-a374dcc0c34f51a2086122d834009716eb86cd54.tar.gz |
fixed concurrent close
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@959289 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging/endpoints.py')
-rw-r--r-- | python/qpid/messaging/endpoints.py | 123 |
1 files changed, 82 insertions, 41 deletions
diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py index 62423ca7d5..f7afc66d46 100644 --- a/python/qpid/messaging/endpoints.py +++ b/python/qpid/messaging/endpoints.py @@ -44,7 +44,14 @@ log = getLogger("qpid.messaging") static = staticmethod -class Connection: +class Endpoint: + + def _ecwait(self, predicate, timeout=None): + result = self._ewait(lambda: self.closed or predicate(), timeout) + self.check_closed() + return result + +class Connection(Endpoint): """ A Connection manages a group of L{Sessions<Session>} and connects @@ -186,6 +193,11 @@ class Connection: self.check_error() return result + def check_closed(self): + if self.closed: + self._condition.gc() + raise ConnectionClosed() + @synchronized def session(self, name=None, transactional=False): """ @@ -215,7 +227,7 @@ class Connection: @synchronized def _remove_session(self, ssn): - del self.sessions[ssn.name] + self.sessions.pop(ssn.name, 0) @synchronized def open(self): @@ -239,9 +251,10 @@ class Connection: """ Attach to the remote endpoint. """ - self._connected = True - self._driver.start() - self._wakeup() + if not self._connected: + self._connected = True + self._driver.start() + self._wakeup() self._ewait(lambda: self._transport_connected and not self._unlinked()) def _unlinked(self): @@ -255,13 +268,18 @@ class Connection: """ Detach from the remote endpoint. """ - self._connected = False - self._wakeup() + if self._connected: + self._connected = False + self._wakeup() + cleanup = True + else: + cleanup = False try: if not self._wait(lambda: not self._transport_connected, timeout=timeout): raise Timeout("detach timed out") finally: - self._driver.stop() + if cleanup: + self._driver.stop() self._condition.gc() @synchronized @@ -283,7 +301,7 @@ class Connection: self.detach(timeout=timeout) self._open = False -class Session: +class Session(Endpoint): """ Sessions provide a linear context for sending and receiving @@ -532,6 +550,10 @@ class Session: self.check_error() return result + def check_closed(self): + if self.closed: + raise SessionClosed() + @synchronized def sender(self, target, **options): """ @@ -588,26 +610,27 @@ class Session: result += 1 return result - def _peek(self, predicate): + def _peek(self, receiver): for msg in self.incoming: - if predicate(msg): + if msg._receiver == receiver: return msg - def _pop(self, predicate): + def _pop(self, receiver): i = 0 while i < len(self.incoming): msg = self.incoming[i] - if predicate(msg): + if msg._receiver == receiver: del self.incoming[i] return msg else: i += 1 @synchronized - def _get(self, predicate, timeout=None): - if self._ewait(lambda: ((self._peek(predicate) is not None) or self.closing), + def _get(self, receiver, timeout=None): + if self._ewait(lambda: ((self._peek(receiver) is not None) or + self.closing or receiver.closed), timeout): - msg = self._pop(predicate) + msg = self._pop(receiver) if msg is not None: msg._receiver.returned += 1 self.unacked.append(msg) @@ -617,7 +640,7 @@ class Session: @synchronized def next_receiver(self, timeout=None): - if self._ewait(lambda: self.incoming, timeout): + if self._ecwait(lambda: self.incoming, timeout): return self.incoming[0]._receiver else: raise Empty @@ -644,14 +667,14 @@ class Session: # XXX: this is currently a SendError, maybe it should be a SessionError? raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity) self._wakeup() - self._ewait(lambda: len(self.acked) < self.ack_capacity) + self._ecwait(lambda: len(self.acked) < self.ack_capacity) m._disposition = disposition self.unacked.remove(m) self.acked.append(m) self._wakeup() if sync: - self._ewait(lambda: not [m for m in messages if m in self.acked]) + self._ecwait(lambda: not [m for m in messages if m in self.acked]) @synchronized def commit(self): @@ -663,7 +686,7 @@ class Session: raise NontransactionalSession() self.committing = True self._wakeup() - self._ewait(lambda: not self.committing) + self._ecwait(lambda: not self.committing) if self.aborted: raise TransactionAborted() assert self.committed @@ -678,7 +701,7 @@ class Session: raise NontransactionalSession() self.aborting = True self._wakeup() - self._ewait(lambda: not self.aborting) + self._ecwait(lambda: not self.aborting) assert self.aborted @synchronized @@ -701,8 +724,10 @@ class Session: for link in self.receivers + self.senders: link.close(timeout=timeout) - self.closing = True - self._wakeup() + if not self.closing: + self.closing = True + self._wakeup() + try: if not self._ewait(lambda: self.closed, timeout=timeout): raise Timeout("session close timed out") @@ -715,7 +740,7 @@ def _mangle(addr): else: return addr -class Sender: +class Sender(Endpoint): """ Sends outgoing messages. @@ -758,6 +783,10 @@ class Sender: self.check_error() return result + def check_closed(self): + if self.closed: + raise LinkClosed() + @synchronized def unsettled(self): """ @@ -799,7 +828,7 @@ class Sender: if not self.session.connection._connected or self.session.closing: raise Detached() - self._ewait(lambda: self.linked) + self._ecwait(lambda: self.linked) if isinstance(object, Message): message = object @@ -812,7 +841,7 @@ class Sender: if self.capacity is not UNLIMITED: if self.capacity <= 0: raise InsufficientCapacity("capacity = %s" % self.capacity) - if not self._ewait(self.available, timeout=timeout): + if not self._ecwait(self.available, timeout=timeout): raise InsufficientCapacity("capacity = %s" % self.capacity) # XXX: what if we send the same message to multiple senders? @@ -849,15 +878,20 @@ class Sender: if self.acked < self.queued: self.sync(timeout=timeout) - self.closing = True - self._wakeup() + if not self.closing: + self.closing = True + self._wakeup() + try: if not self.session._ewait(lambda: self.closed, timeout=timeout): raise Timeout("sender close timed out") finally: - self.session.senders.remove(self) + try: + self.session.senders.remove(self) + except ValueError: + pass -class Receiver(object): +class Receiver(Endpoint, object): """ Receives incoming messages from a remote source. Messages may be @@ -923,6 +957,10 @@ class Receiver(object): self.check_error() return result + def check_closed(self): + if self.closed: + raise LinkClosed() + @synchronized def unsettled(self): """ @@ -941,9 +979,6 @@ class Receiver(object): """ return self.received - self.returned - def _pred(self, msg): - return msg._receiver == self - @synchronized def fetch(self, timeout=None): """ @@ -955,20 +990,21 @@ class Receiver(object): @param timeout: the time to wait for a message to be available """ - self._ewait(lambda: self.linked) + self._ecwait(lambda: self.linked) if self._capacity == 0: self.granted = self.returned + 1 self._wakeup() - self._ewait(lambda: self.impending >= self.granted) - msg = self.session._get(self._pred, timeout=timeout) + self._ecwait(lambda: self.impending >= self.granted) + msg = self.session._get(self, timeout=timeout) if msg is None: + self.check_closed() self.draining = True self._wakeup() - self._ewait(lambda: not self.draining) + self._ecwait(lambda: not self.draining) self._grant() self._wakeup() - msg = self.session._get(self._pred, timeout=0) + msg = self.session._get(self, timeout=0) if msg is None: raise Empty() elif self._capacity not in (0, UNLIMITED.value): @@ -988,12 +1024,17 @@ class Receiver(object): """ Close the receiver. """ - self.closing = True - self._wakeup() + if not self.closing: + self.closing = True + self._wakeup() + try: if not self.session._ewait(lambda: self.closed, timeout=timeout): raise Timeout("receiver close timed out") finally: - self.session.receivers.remove(self) + try: + self.session.receivers.remove(self) + except ValueError: + pass __all__ = ["Connection", "Session", "Sender", "Receiver"] |