diff options
author | Rafael H. Schloming <rhs@apache.org> | 2009-09-02 12:58:05 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2009-09-02 12:58:05 +0000 |
commit | 53a2ec220e568e12f4e4a640f87b02d22274a938 (patch) | |
tree | 40020f9d9dd2a95bf484b8471a844d251c1ac41a /python/qpid/tests/messaging.py | |
parent | 5781e67cca8d07d481797946c217e738264f6d23 (diff) | |
download | qpid-python-53a2ec220e568e12f4e4a640f87b02d22274a938.tar.gz |
added sync flag to acknowledge and ack_capcity to Session
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@810495 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/tests/messaging.py')
-rw-r--r-- | python/qpid/tests/messaging.py | 44 |
1 files changed, 36 insertions, 8 deletions
diff --git a/python/qpid/tests/messaging.py b/python/qpid/tests/messaging.py index 0058f6199e..3ec8bf0749 100644 --- a/python/qpid/tests/messaging.py +++ b/python/qpid/tests/messaging.py @@ -72,13 +72,15 @@ class Base(Test): ssn.acknowledge() assert msg.content == content, "expected %r, got %r" % (content, msg.content) - def drain(self, rcv, limit=None, timeout=0): + def drain(self, rcv, limit=None, timeout=0, expected=None): contents = [] try: while limit is None or len(contents) < limit: contents.append(rcv.fetch(timeout=timeout).content) except Empty: pass + if expected is not None: + assert expected == contents, "expected %s, got %s" % (expected, contents) return contents def assertEmpty(self, rcv): @@ -225,27 +227,27 @@ class SessionTests(Base): # XXX, we need a convenient way to assert that required queues are # empty on setup, and possibly also to drain queues on teardown - def testAcknowledge(self): + def ackTest(self, acker, ack_capacity=None): # send a bunch of messages snd = self.ssn.sender("test-ack-queue") - tid = "a" - contents = ["testAcknowledge[%s, %s]" % (i, tid) for i in range(10)] + contents = [self.content("ackTest", i) for i in range(15)] for c in contents: snd.send(c) # drain the queue, verify the messages are there and then close # without acking rcv = self.ssn.receiver(snd.target) - assert contents == self.drain(rcv) + self.drain(rcv, expected=contents) self.ssn.close() # drain the queue again, verify that they are all the messages # were requeued, and ack this time before closing self.ssn = self.conn.session() + if ack_capacity is not None: + self.ssn.ack_capacity = ack_capacity rcv = self.ssn.receiver("test-ack-queue") - drained = self.drain(rcv) - assert contents == drained, "expected %s, got %s" % (contents, drained) - self.ssn.acknowledge() + self.drain(rcv, expected=contents) + acker(self.ssn) self.ssn.close() # drain the queue a final time and verify that the messages were @@ -254,6 +256,32 @@ class SessionTests(Base): rcv = self.ssn.receiver("test-ack-queue") self.assertEmpty(rcv) + def testAcknowledge(self): + self.ackTest(lambda ssn: ssn.acknowledge()) + + def testAcknowledgeAsync(self): + self.ackTest(lambda ssn: ssn.acknowledge(sync=False)) + + def testAcknowledgeAsyncAckCap0(self): + try: + self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 0) + assert False, "acknowledge shouldn't succeed with ack_capacity of zero" + except InsufficientCapacity: + pass + finally: + self.ssn.ack_capacity = UNLIMITED + self.drain(self.ssn.receiver("test-ack-queue")) + self.ssn.acknowledge() + + def testAcknowledgeAsyncAckCap1(self): + self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 1) + + def testAcknowledgeAsyncAckCap5(self): + self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 5) + + def testAcknowledgeAsyncAckCapUNLIMITED(self): + self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED) + def send(self, ssn, queue, base, count=1): snd = ssn.sender(queue) contents = [] |