summaryrefslogtreecommitdiff
path: root/python/qpid/tests
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-03-11 00:03:25 +0000
committerRafael H. Schloming <rhs@apache.org>2010-03-11 00:03:25 +0000
commit88086e0099c0fb67ac3a01c5f8793c0634b946a0 (patch)
tree7fab04466df2bb9e33e9e83ccc3286a420f0ee0d /python/qpid/tests
parent195193dab20a2e7481e470ddc8226cff9102e1fb (diff)
downloadqpid-python-88086e0099c0fb67ac3a01c5f8793c0634b946a0.tar.gz
added support for reject/release
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@921638 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/tests')
-rw-r--r--python/qpid/tests/messaging/__init__.py21
-rw-r--r--python/qpid/tests/messaging/endpoints.py71
2 files changed, 70 insertions, 22 deletions
diff --git a/python/qpid/tests/messaging/__init__.py b/python/qpid/tests/messaging/__init__.py
index e47f8cb119..0744932944 100644
--- a/python/qpid/tests/messaging/__init__.py
+++ b/python/qpid/tests/messaging/__init__.py
@@ -59,6 +59,9 @@ class Base(Test):
else:
return "%s[%s, %s]" % (base, count, self.test_id)
+ def message(self, base, count = None):
+ return Message(self.content(base, count))
+
def ping(self, ssn):
PING_Q = 'ping-queue; {create: always, delete: always}'
# send a message
@@ -70,7 +73,7 @@ class Base(Test):
ssn.acknowledge()
assert msg.content == content, "expected %r, got %r" % (content, msg.content)
- def drain(self, rcv, limit=None, timeout=0, expected=None):
+ def drain(self, rcv, limit=None, timeout=0, expected=None, redelivered=False):
messages = []
try:
while limit is None or len(messages) < limit:
@@ -78,21 +81,22 @@ class Base(Test):
except Empty:
pass
if expected is not None:
- self.assertEchos(expected, messages)
+ self.assertEchos(expected, messages, redelivered)
return messages
def diff(self, m1, m2):
result = {}
for attr in ("id", "subject", "user_id", "to", "reply_to",
"correlation_id", "durable", "priority", "ttl",
- "properties", "content_type", "content"):
+ "redelivered", "properties", "content_type",
+ "content"):
a1 = getattr(m1, attr)
a2 = getattr(m2, attr)
if a1 != a2:
result[attr] = (a1, a2)
return result
- def assertEcho(self, msg, echo):
+ def assertEcho(self, msg, echo, redelivered=False):
if not isinstance(msg, Message) or not isinstance(echo, Message):
if isinstance(msg, Message):
msg = msg.content
@@ -102,14 +106,19 @@ class Base(Test):
else:
delta = self.diff(msg, echo)
mttl, ettl = delta.pop("ttl", (0, 0))
+ if redelivered:
+ assert echo.redelivered, \
+ "expected %s to be redelivered: %s" % (msg, echo)
+ if delta.has_key("redelivered"):
+ del delta["redelivered"]
assert mttl is not None and ettl is not None, "%s, %s" % (mttl, ettl)
assert mttl >= ettl, "%s, %s" % (mttl, ettl)
assert not delta, "expected %s, got %s, delta %s" % (msg, echo, delta)
- def assertEchos(self, msgs, echoes):
+ def assertEchos(self, msgs, echoes, redelivered=False):
assert len(msgs) == len(echoes), "%s, %s" % (msgs, echoes)
for m, e in zip(msgs, echoes):
- self.assertEcho(m, e)
+ self.assertEcho(m, e, redelivered)
def assertEmpty(self, rcv):
contents = self.drain(rcv)
diff --git a/python/qpid/tests/messaging/endpoints.py b/python/qpid/tests/messaging/endpoints.py
index 049413206b..3f2c823914 100644
--- a/python/qpid/tests/messaging/endpoints.py
+++ b/python/qpid/tests/messaging/endpoints.py
@@ -227,21 +227,60 @@ class SessionTests(Base):
def testAcknowledgeAsyncAckCapUNLIMITED(self):
self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED)
- def send(self, ssn, queue, base, count=1):
- snd = ssn.sender(queue, durable=self.durable())
- contents = []
+ def testRelease(self):
+ msgs = [self.message("testRelease", i) for i in range(3)]
+ snd = self.ssn.sender("test-release-queue; {create: always, delete: always}")
+ for m in msgs:
+ snd.send(m)
+ rcv = self.ssn.receiver(snd.target)
+ echos = self.drain(rcv, expected=msgs)
+ self.ssn.acknowledge(echos[0])
+ self.ssn.acknowledge(echos[1], Disposition(RELEASED, set_redelivered=True))
+ self.ssn.acknowledge(echos[2], Disposition(RELEASED))
+ self.drain(rcv, limit=1, expected=msgs[1:2], redelivered=True)
+ self.drain(rcv, expected=msgs[2:3])
+ self.ssn.acknowledge()
+
+ def testReject(self):
+ msgs = [self.message("testReject", i) for i in range(3)]
+ snd = self.ssn.sender("""
+ test-reject-queue; {
+ create: always,
+ delete: always,
+ node-properties: {
+ x-properties: {
+ alternate_exchange: 'amq.topic'
+ }
+ }
+ }
+""")
+ for m in msgs:
+ snd.send(m)
+ rcv = self.ssn.receiver(snd.target)
+ rej = self.ssn.receiver("amq.topic")
+ echos = self.drain(rcv, expected=msgs)
+ self.ssn.acknowledge(echos[0])
+ self.ssn.acknowledge(echos[1], Disposition(REJECTED))
+ self.ssn.acknowledge(echos[2],
+ Disposition(REJECTED, code=3, text="test-reject"))
+ self.drain(rej, expected=msgs[1:])
+ self.ssn.acknowledge()
+
+ def send(self, ssn, target, base, count=1):
+ snd = ssn.sender(target, durable=self.durable())
+ messages = []
for i in range(count):
- c = self.content(base, i)
+ c = self.message(base, i)
snd.send(c)
- contents.append(c)
+ messages.append(c)
snd.close()
- return contents
+ return messages
def txTest(self, commit):
TX_Q = 'test-tx-queue; {create: sender, delete: receiver}'
TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}'
txssn = self.conn.session(transactional=True)
- contents = self.send(self.ssn, TX_Q, "txTest", 3)
+ messages = self.send(self.ssn, TX_Q, "txTest", 3)
txrcv = txssn.receiver(TX_Q)
txsnd = txssn.sender(TX_Q_COPY, durable=self.durable())
rcv = self.ssn.receiver(txrcv.source)
@@ -255,10 +294,10 @@ class SessionTests(Base):
if commit:
txssn.commit()
self.assertEmpty(rcv)
- self.drain(copy_rcv, expected=contents)
+ self.drain(copy_rcv, expected=messages)
else:
txssn.rollback()
- self.drain(rcv, expected=contents)
+ self.drain(rcv, expected=messages, redelivered=True)
self.assertEmpty(copy_rcv)
self.ssn.acknowledge()
@@ -271,13 +310,13 @@ class SessionTests(Base):
def txTestSend(self, commit):
TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}'
txssn = self.conn.session(transactional=True)
- contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
+ messages = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
rcv = self.ssn.receiver(TX_SEND_Q)
self.assertEmpty(rcv)
if commit:
txssn.commit()
- self.drain(rcv, expected=contents)
+ self.drain(rcv, expected=messages)
self.ssn.acknowledge()
else:
txssn.rollback()
@@ -297,17 +336,17 @@ class SessionTests(Base):
txssn = self.conn.session(transactional=True)
txrcv = txssn.receiver(TX_ACK_QC)
self.assertEmpty(txrcv)
- contents = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3)
- self.drain(txrcv, expected=contents)
+ messages = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3)
+ self.drain(txrcv, expected=messages)
if commit:
txssn.acknowledge()
else:
txssn.rollback()
- self.drain(txrcv, expected=contents)
+ self.drain(txrcv, expected=messages, redelivered=True)
txssn.acknowledge()
txssn.rollback()
- self.drain(txrcv, expected=contents)
+ self.drain(txrcv, expected=messages, redelivered=True)
txssn.commit() # commit without ack
self.assertEmpty(txrcv)
@@ -315,7 +354,7 @@ class SessionTests(Base):
txssn = self.conn.session(transactional=True)
txrcv = txssn.receiver(TX_ACK_QC)
- self.drain(txrcv, expected=contents)
+ self.drain(txrcv, expected=messages, redelivered=True)
txssn.acknowledge()
txssn.commit()
rcv = self.ssn.receiver(TX_ACK_QD)