summaryrefslogtreecommitdiff
path: root/python/qpid/messaging
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-06-25 17:09:05 +0000
committerRafael H. Schloming <rhs@apache.org>2010-06-25 17:09:05 +0000
commite9920e89d298dbcc5cd01d0c79616353eb750c43 (patch)
tree6793a83cffe26ab8f15b43be2f7de77b58020f23 /python/qpid/messaging
parent6b27ee254c81d3121cba7e20368f5c2d1f0fb2c5 (diff)
downloadqpid-python-e9920e89d298dbcc5cd01d0c79616353eb750c43.tar.gz
added optional timeouts to {connection,session,sender,receiver}.close() as well as connection.detach() and {session,sender}.sync()
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@958037 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging')
-rw-r--r--python/qpid/messaging/driver.py2
-rw-r--r--python/qpid/messaging/endpoints.py57
-rw-r--r--python/qpid/messaging/exceptions.py5
3 files changed, 43 insertions, 21 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py
index 76ccd54e9f..6dab24db85 100644
--- a/python/qpid/messaging/driver.py
+++ b/python/qpid/messaging/driver.py
@@ -357,6 +357,8 @@ class Driver:
def stop(self):
self._selector.unregister(self)
+ if self._transport:
+ self.st_closed()
def fileno(self):
return self._transport.fileno()
diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py
index 58a654ef2a..30f51fe955 100644
--- a/python/qpid/messaging/endpoints.py
+++ b/python/qpid/messaging/endpoints.py
@@ -251,15 +251,18 @@ class Connection:
if not (l.linked or l.error or l.closed)]
@synchronized
- def detach(self):
+ def detach(self, timeout=None):
"""
Detach from the remote endpoint.
"""
self._connected = False
self._wakeup()
- self._wait(lambda: not self._transport_connected)
- self._driver.stop()
- self._condition.gc()
+ try:
+ if not self._wait(lambda: not self._transport_connected, timeout=timeout):
+ raise Timeout("detach timed out")
+ finally:
+ self._driver.stop()
+ self._condition.gc()
@synchronized
def attached(self):
@@ -269,15 +272,15 @@ class Connection:
return self._connected
@synchronized
- def close(self):
+ def close(self, timeout=None):
"""
Close the connection and all sessions.
"""
try:
for ssn in self.sessions.values():
- ssn.close()
+ ssn.close(timeout=timeout)
finally:
- self.detach()
+ self.detach(timeout=timeout)
self._open = False
class Session:
@@ -677,28 +680,32 @@ class Session:
assert self.aborted
@synchronized
- def sync(self):
+ def sync(self, timeout=None):
"""
Sync the session.
"""
for snd in self.senders:
- snd.sync()
- self._ewait(lambda: not self.outgoing and not self.acked)
+ snd.sync(timeout=timeout)
+ if not self._ewait(lambda: not self.outgoing and not self.acked, timeout=timeout):
+ raise Timeout("session sync timed out")
@synchronized
- def close(self):
+ def close(self, timeout=None):
"""
Close the session.
"""
- self.sync()
+ self.sync(timeout=timeout)
for link in self.receivers + self.senders:
- link.close()
+ link.close(timeout=timeout)
self.closing = True
self._wakeup()
- self._ewait(lambda: self.closed)
- self.connection._remove_session(self)
+ try:
+ if not self._ewait(lambda: self.closed, timeout=timeout):
+ raise Timeout("session close timed out")
+ finally:
+ self.connection._remove_session(self)
class Sender:
@@ -816,22 +823,29 @@ class Sender:
self._wakeup()
@synchronized
- def sync(self):
+ def sync(self, timeout=None):
mno = self.queued
if self.synced < mno:
self.synced = mno
self._wakeup()
- self._ewait(lambda: self.acked >= mno)
+ if not self._ewait(lambda: self.acked >= mno, timeout=timeout):
+ raise Timeout("sender sync timed out")
@synchronized
- def close(self):
+ def close(self, timeout=None):
"""
Close the Sender.
"""
+ # avoid erroring out when closing a sender that was never
+ # established
+ if self.acked < self.queued:
+ self.sync(timeout=timeout)
+
self.closing = True
self._wakeup()
try:
- self.session._ewait(lambda: self.closed)
+ if not self.session._ewait(lambda: self.closed, timeout=timeout):
+ raise Timeout("sender close timed out")
finally:
self.session.senders.remove(self)
@@ -962,14 +976,15 @@ class Receiver(object):
self.granted = self.received + self._capacity
@synchronized
- def close(self):
+ def close(self, timeout=None):
"""
Close the receiver.
"""
self.closing = True
self._wakeup()
try:
- self.session._ewait(lambda: self.closed)
+ if not self.session._ewait(lambda: self.closed, timeout=timeout):
+ raise Timeout("receiver close timed out")
finally:
self.session.receivers.remove(self)
diff --git a/python/qpid/messaging/exceptions.py b/python/qpid/messaging/exceptions.py
index 0a4941a40f..f640b6bc1a 100644
--- a/python/qpid/messaging/exceptions.py
+++ b/python/qpid/messaging/exceptions.py
@@ -17,6 +17,11 @@
# under the License.
#
+class Timeout(Exception):
+ pass
+
+## Messaging Errors
+
class MessagingError(Exception):
def __init__(self, code=None, text=None, **info):