diff options
author | Rafael H. Schloming <rhs@apache.org> | 2010-06-25 17:09:05 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2010-06-25 17:09:05 +0000 |
commit | e9920e89d298dbcc5cd01d0c79616353eb750c43 (patch) | |
tree | 6793a83cffe26ab8f15b43be2f7de77b58020f23 /python/qpid/messaging | |
parent | 6b27ee254c81d3121cba7e20368f5c2d1f0fb2c5 (diff) | |
download | qpid-python-e9920e89d298dbcc5cd01d0c79616353eb750c43.tar.gz |
added optional timeouts to {connection,session,sender,receiver}.close() as well as connection.detach() and {session,sender}.sync()
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@958037 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging')
-rw-r--r-- | python/qpid/messaging/driver.py | 2 | ||||
-rw-r--r-- | python/qpid/messaging/endpoints.py | 57 | ||||
-rw-r--r-- | python/qpid/messaging/exceptions.py | 5 |
3 files changed, 43 insertions, 21 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py index 76ccd54e9f..6dab24db85 100644 --- a/python/qpid/messaging/driver.py +++ b/python/qpid/messaging/driver.py @@ -357,6 +357,8 @@ class Driver: def stop(self): self._selector.unregister(self) + if self._transport: + self.st_closed() def fileno(self): return self._transport.fileno() diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py index 58a654ef2a..30f51fe955 100644 --- a/python/qpid/messaging/endpoints.py +++ b/python/qpid/messaging/endpoints.py @@ -251,15 +251,18 @@ class Connection: if not (l.linked or l.error or l.closed)] @synchronized - def detach(self): + def detach(self, timeout=None): """ Detach from the remote endpoint. """ self._connected = False self._wakeup() - self._wait(lambda: not self._transport_connected) - self._driver.stop() - self._condition.gc() + try: + if not self._wait(lambda: not self._transport_connected, timeout=timeout): + raise Timeout("detach timed out") + finally: + self._driver.stop() + self._condition.gc() @synchronized def attached(self): @@ -269,15 +272,15 @@ class Connection: return self._connected @synchronized - def close(self): + def close(self, timeout=None): """ Close the connection and all sessions. """ try: for ssn in self.sessions.values(): - ssn.close() + ssn.close(timeout=timeout) finally: - self.detach() + self.detach(timeout=timeout) self._open = False class Session: @@ -677,28 +680,32 @@ class Session: assert self.aborted @synchronized - def sync(self): + def sync(self, timeout=None): """ Sync the session. """ for snd in self.senders: - snd.sync() - self._ewait(lambda: not self.outgoing and not self.acked) + snd.sync(timeout=timeout) + if not self._ewait(lambda: not self.outgoing and not self.acked, timeout=timeout): + raise Timeout("session sync timed out") @synchronized - def close(self): + def close(self, timeout=None): """ Close the session. """ - self.sync() + self.sync(timeout=timeout) for link in self.receivers + self.senders: - link.close() + link.close(timeout=timeout) self.closing = True self._wakeup() - self._ewait(lambda: self.closed) - self.connection._remove_session(self) + try: + if not self._ewait(lambda: self.closed, timeout=timeout): + raise Timeout("session close timed out") + finally: + self.connection._remove_session(self) class Sender: @@ -816,22 +823,29 @@ class Sender: self._wakeup() @synchronized - def sync(self): + def sync(self, timeout=None): mno = self.queued if self.synced < mno: self.synced = mno self._wakeup() - self._ewait(lambda: self.acked >= mno) + if not self._ewait(lambda: self.acked >= mno, timeout=timeout): + raise Timeout("sender sync timed out") @synchronized - def close(self): + def close(self, timeout=None): """ Close the Sender. """ + # avoid erroring out when closing a sender that was never + # established + if self.acked < self.queued: + self.sync(timeout=timeout) + self.closing = True self._wakeup() try: - self.session._ewait(lambda: self.closed) + if not self.session._ewait(lambda: self.closed, timeout=timeout): + raise Timeout("sender close timed out") finally: self.session.senders.remove(self) @@ -962,14 +976,15 @@ class Receiver(object): self.granted = self.received + self._capacity @synchronized - def close(self): + def close(self, timeout=None): """ Close the receiver. """ self.closing = True self._wakeup() try: - self.session._ewait(lambda: self.closed) + if not self.session._ewait(lambda: self.closed, timeout=timeout): + raise Timeout("receiver close timed out") finally: self.session.receivers.remove(self) diff --git a/python/qpid/messaging/exceptions.py b/python/qpid/messaging/exceptions.py index 0a4941a40f..f640b6bc1a 100644 --- a/python/qpid/messaging/exceptions.py +++ b/python/qpid/messaging/exceptions.py @@ -17,6 +17,11 @@ # under the License. # +class Timeout(Exception): + pass + +## Messaging Errors + class MessagingError(Exception): def __init__(self, code=None, text=None, **info): |