summaryrefslogtreecommitdiff
path: root/python/qpid/tests/messaging.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-09-02 12:58:05 +0000
committerRafael H. Schloming <rhs@apache.org>2009-09-02 12:58:05 +0000
commit53a2ec220e568e12f4e4a640f87b02d22274a938 (patch)
tree40020f9d9dd2a95bf484b8471a844d251c1ac41a /python/qpid/tests/messaging.py
parent5781e67cca8d07d481797946c217e738264f6d23 (diff)
downloadqpid-python-53a2ec220e568e12f4e4a640f87b02d22274a938.tar.gz
added sync flag to acknowledge and ack_capcity to Session
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@810495 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/tests/messaging.py')
-rw-r--r--python/qpid/tests/messaging.py44
1 files changed, 36 insertions, 8 deletions
diff --git a/python/qpid/tests/messaging.py b/python/qpid/tests/messaging.py
index 0058f6199e..3ec8bf0749 100644
--- a/python/qpid/tests/messaging.py
+++ b/python/qpid/tests/messaging.py
@@ -72,13 +72,15 @@ class Base(Test):
ssn.acknowledge()
assert msg.content == content, "expected %r, got %r" % (content, msg.content)
- def drain(self, rcv, limit=None, timeout=0):
+ def drain(self, rcv, limit=None, timeout=0, expected=None):
contents = []
try:
while limit is None or len(contents) < limit:
contents.append(rcv.fetch(timeout=timeout).content)
except Empty:
pass
+ if expected is not None:
+ assert expected == contents, "expected %s, got %s" % (expected, contents)
return contents
def assertEmpty(self, rcv):
@@ -225,27 +227,27 @@ class SessionTests(Base):
# XXX, we need a convenient way to assert that required queues are
# empty on setup, and possibly also to drain queues on teardown
- def testAcknowledge(self):
+ def ackTest(self, acker, ack_capacity=None):
# send a bunch of messages
snd = self.ssn.sender("test-ack-queue")
- tid = "a"
- contents = ["testAcknowledge[%s, %s]" % (i, tid) for i in range(10)]
+ contents = [self.content("ackTest", i) for i in range(15)]
for c in contents:
snd.send(c)
# drain the queue, verify the messages are there and then close
# without acking
rcv = self.ssn.receiver(snd.target)
- assert contents == self.drain(rcv)
+ self.drain(rcv, expected=contents)
self.ssn.close()
# drain the queue again, verify that they are all the messages
# were requeued, and ack this time before closing
self.ssn = self.conn.session()
+ if ack_capacity is not None:
+ self.ssn.ack_capacity = ack_capacity
rcv = self.ssn.receiver("test-ack-queue")
- drained = self.drain(rcv)
- assert contents == drained, "expected %s, got %s" % (contents, drained)
- self.ssn.acknowledge()
+ self.drain(rcv, expected=contents)
+ acker(self.ssn)
self.ssn.close()
# drain the queue a final time and verify that the messages were
@@ -254,6 +256,32 @@ class SessionTests(Base):
rcv = self.ssn.receiver("test-ack-queue")
self.assertEmpty(rcv)
+ def testAcknowledge(self):
+ self.ackTest(lambda ssn: ssn.acknowledge())
+
+ def testAcknowledgeAsync(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False))
+
+ def testAcknowledgeAsyncAckCap0(self):
+ try:
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 0)
+ assert False, "acknowledge shouldn't succeed with ack_capacity of zero"
+ except InsufficientCapacity:
+ pass
+ finally:
+ self.ssn.ack_capacity = UNLIMITED
+ self.drain(self.ssn.receiver("test-ack-queue"))
+ self.ssn.acknowledge()
+
+ def testAcknowledgeAsyncAckCap1(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 1)
+
+ def testAcknowledgeAsyncAckCap5(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 5)
+
+ def testAcknowledgeAsyncAckCapUNLIMITED(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED)
+
def send(self, ssn, queue, base, count=1):
snd = ssn.sender(queue)
contents = []