diff options
-rw-r--r-- | python/Makefile | 2 | ||||
-rw-r--r-- | python/qpid/messaging.py | 44 | ||||
-rw-r--r-- | python/qpid/tests/messaging.py | 33 |
3 files changed, 71 insertions, 8 deletions
diff --git a/python/Makefile b/python/Makefile index eec4076722..31547c8f57 100644 --- a/python/Makefile +++ b/python/Makefile @@ -50,7 +50,7 @@ build: $(TARGETS) doc: @mkdir -p $(BUILD) - epydoc qpid/messaging.py -o $(BUILD)/doc --no-private --no-sourcecode --include-log + epydoc qpid.messaging -o $(BUILD)/doc --no-private --no-sourcecode --include-log install: build install -d $(PYTHON_LIB) diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index 6df0ef742d..e9266600ff 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -559,6 +559,9 @@ def reply_to2addr(reply_to): class SendError(SessionError): pass +class InsufficientCapacity(SendError): + pass + class Sender(Lockable): """ @@ -569,6 +572,9 @@ class Sender(Lockable): self.session = session self.index = index self.target = target + self.capacity = UNLIMITED + self.queued = Serial(0) + self.acked = Serial(0) self.closed = False self._lock = self.session._lock self._condition = self.session._condition @@ -586,7 +592,16 @@ class Sender(Lockable): return self.session.ewait(predicate, timeout, exc) @synchronized - def send(self, object): + def pending(self): + """ + Returns the number of messages awaiting acknowledgment. + @rtype: int + @return: the number of unacknowledged messages + """ + return self.queued - self.acked + + @synchronized + def send(self, object, sync=True): """ Send a message. If the object passed in is of type L{unicode}, L{str}, L{list}, or L{dict}, it will automatically be wrapped in a @@ -595,6 +610,9 @@ class Sender(Lockable): @type object: unicode, str, list, dict, Message @param object: the message or content to send + + @type sync: boolean + @param sync: if true then block until the message is sent """ if not self.session.connection._connected or self.session.closing: @@ -605,12 +623,22 @@ class Sender(Lockable): else: message = Message(object) + if self.capacity is not UNLIMITED: + if self.capacity <= 0: + raise InsufficientCapacity("capacity = %s" % self.capacity) + self.ewait(lambda: self.pending() < self.capacity) + # XXX: what if we send the same message to multiple senders? message._sender = self self.session.outgoing.append(message) + self.queued += 1 + mno = self.queued self.wakeup() - self.ewait(lambda: message not in self.session.outgoing) + + if sync: + self.ewait(lambda: self.acked >= mno) + assert message not in self.session.outgoing @synchronized def close(self): @@ -675,6 +703,13 @@ class Receiver(Lockable): @synchronized def pending(self): + """ + Returns the number of messages available to be fetched by the + application. + + @rtype: int + @return: the number of available messages + """ return self.received - self.returned def _capacity(self): @@ -1203,6 +1238,7 @@ class Driver(Lockable): # XXX: really need to make this async so that we don't give up the lock _ssn.sync() # XXX: should we log the ack somehow too? + snd.acked += 1 @synchronized def _message_transfer(self, ssn, cmd): @@ -1241,5 +1277,5 @@ class Driver(Lockable): msg._transfer_id = message.id return msg -__all__ = ["Connection", "Pattern", "Session", "Sender", "Receiver", "Message", - "Empty", "timestamp", "uuid4"] +__all__ = ["Connection", "ConnectionError", "ConnectError", "Pattern", "Session", "Sender", "Receiver", "Message", + "ReceiveError", "Empty", "SendError", "InsufficientCapacity", "timestamp", "uuid4"] diff --git a/python/qpid/tests/messaging.py b/python/qpid/tests/messaging.py index 6062895519..0058f6199e 100644 --- a/python/qpid/tests/messaging.py +++ b/python/qpid/tests/messaging.py @@ -23,7 +23,8 @@ import time from qpid.tests import Test from qpid.harness import Skipped -from qpid.messaging import Connection, ConnectError, Disconnected, Empty, Message, UNLIMITED, uuid4 +from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \ + InsufficientCapacity, Message, UNLIMITED, uuid4 from Queue import Queue, Empty as QueueEmpty class Base(Test): @@ -71,11 +72,11 @@ class Base(Test): ssn.acknowledge() assert msg.content == content, "expected %r, got %r" % (content, msg.content) - def drain(self, rcv, limit=None): + def drain(self, rcv, limit=None, timeout=0): contents = [] try: while limit is None or len(contents) < limit: - contents.append(rcv.fetch(0).content) + contents.append(rcv.fetch(timeout=timeout).content) except Empty: pass return contents @@ -543,6 +544,32 @@ class SenderTests(Base): def testSendMap(self): self.checkContent({"testSendMap": self.test_id, "pie": "blueberry", "pi": 3.14}) + def asyncTest(self, capacity): + self.snd.capacity = capacity + msgs = [self.content("asyncTest", i) for i in range(15)] + for m in msgs: + self.snd.send(m, sync=False) + drained = self.drain(self.rcv, timeout=self.delay()) + assert msgs == drained, "expected %s, got %s" % (msgs, drained) + self.ssn.acknowledge() + + def testSendAsyncCapacity0(self): + try: + self.asyncTest(0) + assert False, "send shouldn't succeed with zero capacity" + except InsufficientCapacity: + # this is expected + pass + + def testSendAsyncCapacity1(self): + self.asyncTest(1) + + def testSendAsyncCapacity5(self): + self.asyncTest(5) + + def testSendAsyncCapacityUNLIMITED(self): + self.asyncTest(UNLIMITED) + class MessageTests(Base): def testCreateString(self): |