summaryrefslogtreecommitdiff
path: root/python/qpid/messaging.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/messaging.py')
-rw-r--r--python/qpid/messaging.py44
1 files changed, 40 insertions, 4 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py
index 6df0ef742d..e9266600ff 100644
--- a/python/qpid/messaging.py
+++ b/python/qpid/messaging.py
@@ -559,6 +559,9 @@ def reply_to2addr(reply_to):
class SendError(SessionError):
pass
+class InsufficientCapacity(SendError):
+ pass
+
class Sender(Lockable):
"""
@@ -569,6 +572,9 @@ class Sender(Lockable):
self.session = session
self.index = index
self.target = target
+ self.capacity = UNLIMITED
+ self.queued = Serial(0)
+ self.acked = Serial(0)
self.closed = False
self._lock = self.session._lock
self._condition = self.session._condition
@@ -586,7 +592,16 @@ class Sender(Lockable):
return self.session.ewait(predicate, timeout, exc)
@synchronized
- def send(self, object):
+ def pending(self):
+ """
+ Returns the number of messages awaiting acknowledgment.
+ @rtype: int
+ @return: the number of unacknowledged messages
+ """
+ return self.queued - self.acked
+
+ @synchronized
+ def send(self, object, sync=True):
"""
Send a message. If the object passed in is of type L{unicode},
L{str}, L{list}, or L{dict}, it will automatically be wrapped in a
@@ -595,6 +610,9 @@ class Sender(Lockable):
@type object: unicode, str, list, dict, Message
@param object: the message or content to send
+
+ @type sync: boolean
+ @param sync: if true then block until the message is sent
"""
if not self.session.connection._connected or self.session.closing:
@@ -605,12 +623,22 @@ class Sender(Lockable):
else:
message = Message(object)
+ if self.capacity is not UNLIMITED:
+ if self.capacity <= 0:
+ raise InsufficientCapacity("capacity = %s" % self.capacity)
+ self.ewait(lambda: self.pending() < self.capacity)
+
# XXX: what if we send the same message to multiple senders?
message._sender = self
self.session.outgoing.append(message)
+ self.queued += 1
+ mno = self.queued
self.wakeup()
- self.ewait(lambda: message not in self.session.outgoing)
+
+ if sync:
+ self.ewait(lambda: self.acked >= mno)
+ assert message not in self.session.outgoing
@synchronized
def close(self):
@@ -675,6 +703,13 @@ class Receiver(Lockable):
@synchronized
def pending(self):
+ """
+ Returns the number of messages available to be fetched by the
+ application.
+
+ @rtype: int
+ @return: the number of available messages
+ """
return self.received - self.returned
def _capacity(self):
@@ -1203,6 +1238,7 @@ class Driver(Lockable):
# XXX: really need to make this async so that we don't give up the lock
_ssn.sync()
# XXX: should we log the ack somehow too?
+ snd.acked += 1
@synchronized
def _message_transfer(self, ssn, cmd):
@@ -1241,5 +1277,5 @@ class Driver(Lockable):
msg._transfer_id = message.id
return msg
-__all__ = ["Connection", "Pattern", "Session", "Sender", "Receiver", "Message",
- "Empty", "timestamp", "uuid4"]
+__all__ = ["Connection", "ConnectionError", "ConnectError", "Pattern", "Session", "Sender", "Receiver", "Message",
+ "ReceiveError", "Empty", "SendError", "InsufficientCapacity", "timestamp", "uuid4"]