summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-09-03 18:22:10 +0000
committerRafael H. Schloming <rhs@apache.org>2009-09-03 18:22:10 +0000
commit82fd43ffbe33ed0e368ea70ccbb6963994a1f5ba (patch)
tree25c08df2373dc5078bae760cb43ca2c0883e2a69 /python
parent22008b3dfae9f69f3a26df68da7852c01933111e (diff)
downloadqpid-python-82fd43ffbe33ed0e368ea70ccbb6963994a1f5ba.tar.gz
added timeout option to send
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@811066 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/qpid/messaging.py14
-rw-r--r--python/qpid/tests/messaging.py16
2 files changed, 27 insertions, 3 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py
index a3f2d43ed2..0d05f5cc1c 100644
--- a/python/qpid/messaging.py
+++ b/python/qpid/messaging.py
@@ -544,18 +544,25 @@ class Sender:
return self.queued - self.acked
@synchronized
- def send(self, object, sync=True):
+ def send(self, object, sync=True, timeout=None):
"""
Send a message. If the object passed in is of type L{unicode},
L{str}, L{list}, or L{dict}, it will automatically be wrapped in a
L{Message} and sent. If it is of type L{Message}, it will be sent
- directly.
+ directly. If the sender capacity is not L{UNLIMITED} then send
+ will block until there is available capacity to send the message.
+ If the timeout parameter is specified, then send will throw an
+ L{InsufficientCapacity} exception if capacity does not become
+ available within the specified time.
@type object: unicode, str, list, dict, Message
@param object: the message or content to send
@type sync: boolean
@param sync: if true then block until the message is sent
+
+ @type timeout: float
+ @param timeout: the time to wait for available capacity
"""
if not self.session.connection._connected or self.session.closing:
@@ -569,7 +576,8 @@ class Sender:
if self.capacity is not UNLIMITED:
if self.capacity <= 0:
raise InsufficientCapacity("capacity = %s" % self.capacity)
- self._ewait(lambda: self.pending() < self.capacity)
+ if not self._ewait(lambda: self.pending() < self.capacity, timeout=timeout):
+ raise InsufficientCapacity("capacity = %s" % self.capacity)
# XXX: what if we send the same message to multiple senders?
message._sender = self
diff --git a/python/qpid/tests/messaging.py b/python/qpid/tests/messaging.py
index d734bb421f..7623c1f93b 100644
--- a/python/qpid/tests/messaging.py
+++ b/python/qpid/tests/messaging.py
@@ -599,6 +599,22 @@ class SenderTests(Base):
def testSendAsyncCapacityUNLIMITED(self):
self.asyncTest(UNLIMITED)
+ def testCapacityTimeout(self):
+ self.snd.capacity = 1
+ msgs = []
+ caught = False
+ while len(msgs) < 100:
+ m = self.content("testCapacity", len(msgs))
+ try:
+ self.snd.send(m, sync=False, timeout=0)
+ msgs.append(m)
+ except InsufficientCapacity:
+ caught = True
+ break
+ self.drain(self.rcv, expected=msgs)
+ self.ssn.acknowledge()
+ assert caught, "did not exceed capacity"
+
class MessageTests(Base):
def testCreateString(self):