diff options
Diffstat (limited to 'qpid/python/qpid/tests/messaging.py')
-rw-r--r-- | qpid/python/qpid/tests/messaging.py | 124 |
1 files changed, 102 insertions, 22 deletions
diff --git a/qpid/python/qpid/tests/messaging.py b/qpid/python/qpid/tests/messaging.py index 7706ebbabe..7623c1f93b 100644 --- a/qpid/python/qpid/tests/messaging.py +++ b/qpid/python/qpid/tests/messaging.py @@ -23,7 +23,8 @@ import time from qpid.tests import Test from qpid.harness import Skipped -from qpid.messaging import Connection, ConnectError, Disconnected, Empty, Message, UNLIMITED, uuid4 +from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \ + InsufficientCapacity, Message, UNLIMITED, uuid4 from Queue import Queue, Empty as QueueEmpty class Base(Test): @@ -71,19 +72,25 @@ class Base(Test): ssn.acknowledge() assert msg.content == content, "expected %r, got %r" % (content, msg.content) - def drain(self, rcv, limit=None): + def drain(self, rcv, limit=None, timeout=0, expected=None): contents = [] try: while limit is None or len(contents) < limit: - contents.append(rcv.fetch(0).content) + contents.append(rcv.fetch(timeout=timeout).content) except Empty: pass + if expected is not None: + assert expected == contents, "expected %s, got %s" % (expected, contents) return contents def assertEmpty(self, rcv): contents = self.drain(rcv) assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents) + def assertPending(self, rcv, expected): + p = rcv.pending() + assert p == expected, "expected %s, got %s" % (expected, p) + def sleep(self): time.sleep(self.delay()) @@ -107,7 +114,8 @@ class SetupTests(Base): try: self.conn = Connection.open("localhost", 0) assert False, "connect succeeded" - except ConnectError: + except ConnectError, e: + # XXX: should verify that e includes appropriate diagnostic info pass class ConnectionTests(Base): @@ -219,26 +227,27 @@ class SessionTests(Base): # XXX, we need a convenient way to assert that required queues are # empty on setup, and possibly also to drain queues on teardown - def testAcknowledge(self): + def ackTest(self, acker, ack_capacity=None): # send a bunch of messages snd = self.ssn.sender("test-ack-queue") - tid = "a" - contents = ["testAcknowledge[%s, %s]" % (i, tid) for i in range(10)] + contents = [self.content("ackTest", i) for i in range(15)] for c in contents: snd.send(c) # drain the queue, verify the messages are there and then close # without acking rcv = self.ssn.receiver(snd.target) - assert contents == self.drain(rcv) + self.drain(rcv, expected=contents) self.ssn.close() # drain the queue again, verify that they are all the messages # were requeued, and ack this time before closing self.ssn = self.conn.session() + if ack_capacity is not None: + self.ssn.ack_capacity = ack_capacity rcv = self.ssn.receiver("test-ack-queue") - assert contents == self.drain(rcv) - self.ssn.acknowledge() + self.drain(rcv, expected=contents) + acker(self.ssn) self.ssn.close() # drain the queue a final time and verify that the messages were @@ -247,6 +256,33 @@ class SessionTests(Base): rcv = self.ssn.receiver("test-ack-queue") self.assertEmpty(rcv) + def testAcknowledge(self): + self.ackTest(lambda ssn: ssn.acknowledge()) + + def testAcknowledgeAsync(self): + self.ackTest(lambda ssn: ssn.acknowledge(sync=False)) + + def testAcknowledgeAsyncAckCap0(self): + try: + try: + self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 0) + assert False, "acknowledge shouldn't succeed with ack_capacity of zero" + except InsufficientCapacity: + pass + finally: + self.ssn.ack_capacity = UNLIMITED + self.drain(self.ssn.receiver("test-ack-queue")) + self.ssn.acknowledge() + + def testAcknowledgeAsyncAckCap1(self): + self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 1) + + def testAcknowledgeAsyncAckCap5(self): + self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 5) + + def testAcknowledgeAsyncAckCapUNLIMITED(self): + self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED) + def send(self, ssn, queue, base, count=1): snd = ssn.sender(queue) contents = [] @@ -319,7 +355,8 @@ class SessionTests(Base): txssn.acknowledge() else: txssn.rollback() - assert contents == self.drain(txrcv) + drained = self.drain(txrcv) + assert contents == drained, "expected %s, got %s" % (contents, drained) txssn.acknowledge() txssn.rollback() assert contents == self.drain(txrcv) @@ -401,9 +438,9 @@ class ReceiverTests(Base): elapsed = time.time() - start assert elapsed >= self.delay() - one = self.send("testListen", 1) - two = self.send("testListen", 2) - three = self.send("testListen", 3) + one = self.send("testFetch", 1) + two = self.send("testFetch", 2) + three = self.send("testFetch", 3) msg = self.rcv.fetch(0) assert msg.content == one msg = self.rcv.fetch(self.delay()) @@ -467,34 +504,35 @@ class ReceiverTests(Base): def testCapacity(self): self.rcv.capacity = 5 self.rcv.start() - assert self.rcv.pending() == 0 + self.assertPending(self.rcv, 0) for i in range(15): self.send("testCapacity", i) self.sleep() - assert self.rcv.pending() == 5 + self.assertPending(self.rcv, 5) self.drain(self.rcv, limit = 5) self.sleep() - assert self.rcv.pending() == 5 + self.assertPending(self.rcv, 5) - self.drain(self.rcv) - assert self.rcv.pending() == 0 + drained = self.drain(self.rcv) + assert len(drained) == 10 + self.assertPending(self.rcv, 0) self.ssn.acknowledge() def testCapacityUNLIMITED(self): self.rcv.capacity = UNLIMITED self.rcv.start() - assert self.rcv.pending() == 0 + self.assertPending(self.rcv, 0) for i in range(10): self.send("testCapacityUNLIMITED", i) self.sleep() - assert self.rcv.pending() == 10 + self.assertPending(self.rcv, 10) self.drain(self.rcv) - assert self.rcv.pending() == 0 + self.assertPending(self.rcv, 0) self.ssn.acknowledge() @@ -535,6 +573,48 @@ class SenderTests(Base): def testSendMap(self): self.checkContent({"testSendMap": self.test_id, "pie": "blueberry", "pi": 3.14}) + def asyncTest(self, capacity): + self.snd.capacity = capacity + msgs = [self.content("asyncTest", i) for i in range(15)] + for m in msgs: + self.snd.send(m, sync=False) + drained = self.drain(self.rcv, timeout=self.delay()) + assert msgs == drained, "expected %s, got %s" % (msgs, drained) + self.ssn.acknowledge() + + def testSendAsyncCapacity0(self): + try: + self.asyncTest(0) + assert False, "send shouldn't succeed with zero capacity" + except InsufficientCapacity: + # this is expected + pass + + def testSendAsyncCapacity1(self): + self.asyncTest(1) + + def testSendAsyncCapacity5(self): + self.asyncTest(5) + + def testSendAsyncCapacityUNLIMITED(self): + self.asyncTest(UNLIMITED) + + def testCapacityTimeout(self): + self.snd.capacity = 1 + msgs = [] + caught = False + while len(msgs) < 100: + m = self.content("testCapacity", len(msgs)) + try: + self.snd.send(m, sync=False, timeout=0) + msgs.append(m) + except InsufficientCapacity: + caught = True + break + self.drain(self.rcv, expected=msgs) + self.ssn.acknowledge() + assert caught, "did not exceed capacity" + class MessageTests(Base): def testCreateString(self): |