summaryrefslogtreecommitdiff
path: root/qpid/python/qpid/tests/messaging.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python/qpid/tests/messaging.py')
-rw-r--r--qpid/python/qpid/tests/messaging.py124
1 files changed, 102 insertions, 22 deletions
diff --git a/qpid/python/qpid/tests/messaging.py b/qpid/python/qpid/tests/messaging.py
index 7706ebbabe..7623c1f93b 100644
--- a/qpid/python/qpid/tests/messaging.py
+++ b/qpid/python/qpid/tests/messaging.py
@@ -23,7 +23,8 @@
import time
from qpid.tests import Test
from qpid.harness import Skipped
-from qpid.messaging import Connection, ConnectError, Disconnected, Empty, Message, UNLIMITED, uuid4
+from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \
+ InsufficientCapacity, Message, UNLIMITED, uuid4
from Queue import Queue, Empty as QueueEmpty
class Base(Test):
@@ -71,19 +72,25 @@ class Base(Test):
ssn.acknowledge()
assert msg.content == content, "expected %r, got %r" % (content, msg.content)
- def drain(self, rcv, limit=None):
+ def drain(self, rcv, limit=None, timeout=0, expected=None):
contents = []
try:
while limit is None or len(contents) < limit:
- contents.append(rcv.fetch(0).content)
+ contents.append(rcv.fetch(timeout=timeout).content)
except Empty:
pass
+ if expected is not None:
+ assert expected == contents, "expected %s, got %s" % (expected, contents)
return contents
def assertEmpty(self, rcv):
contents = self.drain(rcv)
assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents)
+ def assertPending(self, rcv, expected):
+ p = rcv.pending()
+ assert p == expected, "expected %s, got %s" % (expected, p)
+
def sleep(self):
time.sleep(self.delay())
@@ -107,7 +114,8 @@ class SetupTests(Base):
try:
self.conn = Connection.open("localhost", 0)
assert False, "connect succeeded"
- except ConnectError:
+ except ConnectError, e:
+ # XXX: should verify that e includes appropriate diagnostic info
pass
class ConnectionTests(Base):
@@ -219,26 +227,27 @@ class SessionTests(Base):
# XXX, we need a convenient way to assert that required queues are
# empty on setup, and possibly also to drain queues on teardown
- def testAcknowledge(self):
+ def ackTest(self, acker, ack_capacity=None):
# send a bunch of messages
snd = self.ssn.sender("test-ack-queue")
- tid = "a"
- contents = ["testAcknowledge[%s, %s]" % (i, tid) for i in range(10)]
+ contents = [self.content("ackTest", i) for i in range(15)]
for c in contents:
snd.send(c)
# drain the queue, verify the messages are there and then close
# without acking
rcv = self.ssn.receiver(snd.target)
- assert contents == self.drain(rcv)
+ self.drain(rcv, expected=contents)
self.ssn.close()
# drain the queue again, verify that they are all the messages
# were requeued, and ack this time before closing
self.ssn = self.conn.session()
+ if ack_capacity is not None:
+ self.ssn.ack_capacity = ack_capacity
rcv = self.ssn.receiver("test-ack-queue")
- assert contents == self.drain(rcv)
- self.ssn.acknowledge()
+ self.drain(rcv, expected=contents)
+ acker(self.ssn)
self.ssn.close()
# drain the queue a final time and verify that the messages were
@@ -247,6 +256,33 @@ class SessionTests(Base):
rcv = self.ssn.receiver("test-ack-queue")
self.assertEmpty(rcv)
+ def testAcknowledge(self):
+ self.ackTest(lambda ssn: ssn.acknowledge())
+
+ def testAcknowledgeAsync(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False))
+
+ def testAcknowledgeAsyncAckCap0(self):
+ try:
+ try:
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 0)
+ assert False, "acknowledge shouldn't succeed with ack_capacity of zero"
+ except InsufficientCapacity:
+ pass
+ finally:
+ self.ssn.ack_capacity = UNLIMITED
+ self.drain(self.ssn.receiver("test-ack-queue"))
+ self.ssn.acknowledge()
+
+ def testAcknowledgeAsyncAckCap1(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 1)
+
+ def testAcknowledgeAsyncAckCap5(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 5)
+
+ def testAcknowledgeAsyncAckCapUNLIMITED(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED)
+
def send(self, ssn, queue, base, count=1):
snd = ssn.sender(queue)
contents = []
@@ -319,7 +355,8 @@ class SessionTests(Base):
txssn.acknowledge()
else:
txssn.rollback()
- assert contents == self.drain(txrcv)
+ drained = self.drain(txrcv)
+ assert contents == drained, "expected %s, got %s" % (contents, drained)
txssn.acknowledge()
txssn.rollback()
assert contents == self.drain(txrcv)
@@ -401,9 +438,9 @@ class ReceiverTests(Base):
elapsed = time.time() - start
assert elapsed >= self.delay()
- one = self.send("testListen", 1)
- two = self.send("testListen", 2)
- three = self.send("testListen", 3)
+ one = self.send("testFetch", 1)
+ two = self.send("testFetch", 2)
+ three = self.send("testFetch", 3)
msg = self.rcv.fetch(0)
assert msg.content == one
msg = self.rcv.fetch(self.delay())
@@ -467,34 +504,35 @@ class ReceiverTests(Base):
def testCapacity(self):
self.rcv.capacity = 5
self.rcv.start()
- assert self.rcv.pending() == 0
+ self.assertPending(self.rcv, 0)
for i in range(15):
self.send("testCapacity", i)
self.sleep()
- assert self.rcv.pending() == 5
+ self.assertPending(self.rcv, 5)
self.drain(self.rcv, limit = 5)
self.sleep()
- assert self.rcv.pending() == 5
+ self.assertPending(self.rcv, 5)
- self.drain(self.rcv)
- assert self.rcv.pending() == 0
+ drained = self.drain(self.rcv)
+ assert len(drained) == 10
+ self.assertPending(self.rcv, 0)
self.ssn.acknowledge()
def testCapacityUNLIMITED(self):
self.rcv.capacity = UNLIMITED
self.rcv.start()
- assert self.rcv.pending() == 0
+ self.assertPending(self.rcv, 0)
for i in range(10):
self.send("testCapacityUNLIMITED", i)
self.sleep()
- assert self.rcv.pending() == 10
+ self.assertPending(self.rcv, 10)
self.drain(self.rcv)
- assert self.rcv.pending() == 0
+ self.assertPending(self.rcv, 0)
self.ssn.acknowledge()
@@ -535,6 +573,48 @@ class SenderTests(Base):
def testSendMap(self):
self.checkContent({"testSendMap": self.test_id, "pie": "blueberry", "pi": 3.14})
+ def asyncTest(self, capacity):
+ self.snd.capacity = capacity
+ msgs = [self.content("asyncTest", i) for i in range(15)]
+ for m in msgs:
+ self.snd.send(m, sync=False)
+ drained = self.drain(self.rcv, timeout=self.delay())
+ assert msgs == drained, "expected %s, got %s" % (msgs, drained)
+ self.ssn.acknowledge()
+
+ def testSendAsyncCapacity0(self):
+ try:
+ self.asyncTest(0)
+ assert False, "send shouldn't succeed with zero capacity"
+ except InsufficientCapacity:
+ # this is expected
+ pass
+
+ def testSendAsyncCapacity1(self):
+ self.asyncTest(1)
+
+ def testSendAsyncCapacity5(self):
+ self.asyncTest(5)
+
+ def testSendAsyncCapacityUNLIMITED(self):
+ self.asyncTest(UNLIMITED)
+
+ def testCapacityTimeout(self):
+ self.snd.capacity = 1
+ msgs = []
+ caught = False
+ while len(msgs) < 100:
+ m = self.content("testCapacity", len(msgs))
+ try:
+ self.snd.send(m, sync=False, timeout=0)
+ msgs.append(m)
+ except InsufficientCapacity:
+ caught = True
+ break
+ self.drain(self.rcv, expected=msgs)
+ self.ssn.acknowledge()
+ assert caught, "did not exceed capacity"
+
class MessageTests(Base):
def testCreateString(self):