diff options
Diffstat (limited to 'python/qpid/messaging.py')
-rw-r--r-- | python/qpid/messaging.py | 44 |
1 files changed, 40 insertions, 4 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index 6df0ef742d..e9266600ff 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -559,6 +559,9 @@ def reply_to2addr(reply_to): class SendError(SessionError): pass +class InsufficientCapacity(SendError): + pass + class Sender(Lockable): """ @@ -569,6 +572,9 @@ class Sender(Lockable): self.session = session self.index = index self.target = target + self.capacity = UNLIMITED + self.queued = Serial(0) + self.acked = Serial(0) self.closed = False self._lock = self.session._lock self._condition = self.session._condition @@ -586,7 +592,16 @@ class Sender(Lockable): return self.session.ewait(predicate, timeout, exc) @synchronized - def send(self, object): + def pending(self): + """ + Returns the number of messages awaiting acknowledgment. + @rtype: int + @return: the number of unacknowledged messages + """ + return self.queued - self.acked + + @synchronized + def send(self, object, sync=True): """ Send a message. If the object passed in is of type L{unicode}, L{str}, L{list}, or L{dict}, it will automatically be wrapped in a @@ -595,6 +610,9 @@ class Sender(Lockable): @type object: unicode, str, list, dict, Message @param object: the message or content to send + + @type sync: boolean + @param sync: if true then block until the message is sent """ if not self.session.connection._connected or self.session.closing: @@ -605,12 +623,22 @@ class Sender(Lockable): else: message = Message(object) + if self.capacity is not UNLIMITED: + if self.capacity <= 0: + raise InsufficientCapacity("capacity = %s" % self.capacity) + self.ewait(lambda: self.pending() < self.capacity) + # XXX: what if we send the same message to multiple senders? message._sender = self self.session.outgoing.append(message) + self.queued += 1 + mno = self.queued self.wakeup() - self.ewait(lambda: message not in self.session.outgoing) + + if sync: + self.ewait(lambda: self.acked >= mno) + assert message not in self.session.outgoing @synchronized def close(self): @@ -675,6 +703,13 @@ class Receiver(Lockable): @synchronized def pending(self): + """ + Returns the number of messages available to be fetched by the + application. + + @rtype: int + @return: the number of available messages + """ return self.received - self.returned def _capacity(self): @@ -1203,6 +1238,7 @@ class Driver(Lockable): # XXX: really need to make this async so that we don't give up the lock _ssn.sync() # XXX: should we log the ack somehow too? + snd.acked += 1 @synchronized def _message_transfer(self, ssn, cmd): @@ -1241,5 +1277,5 @@ class Driver(Lockable): msg._transfer_id = message.id return msg -__all__ = ["Connection", "Pattern", "Session", "Sender", "Receiver", "Message", - "Empty", "timestamp", "uuid4"] +__all__ = ["Connection", "ConnectionError", "ConnectError", "Pattern", "Session", "Sender", "Receiver", "Message", + "ReceiveError", "Empty", "SendError", "InsufficientCapacity", "timestamp", "uuid4"] |