diff options
author | Rafael H. Schloming <rhs@apache.org> | 2010-03-11 00:03:25 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2010-03-11 00:03:25 +0000 |
commit | 88086e0099c0fb67ac3a01c5f8793c0634b946a0 (patch) | |
tree | 7fab04466df2bb9e33e9e83ccc3286a420f0ee0d /python/qpid/tests | |
parent | 195193dab20a2e7481e470ddc8226cff9102e1fb (diff) | |
download | qpid-python-88086e0099c0fb67ac3a01c5f8793c0634b946a0.tar.gz |
added support for reject/release
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@921638 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/tests')
-rw-r--r-- | python/qpid/tests/messaging/__init__.py | 21 | ||||
-rw-r--r-- | python/qpid/tests/messaging/endpoints.py | 71 |
2 files changed, 70 insertions, 22 deletions
diff --git a/python/qpid/tests/messaging/__init__.py b/python/qpid/tests/messaging/__init__.py index e47f8cb119..0744932944 100644 --- a/python/qpid/tests/messaging/__init__.py +++ b/python/qpid/tests/messaging/__init__.py @@ -59,6 +59,9 @@ class Base(Test): else: return "%s[%s, %s]" % (base, count, self.test_id) + def message(self, base, count = None): + return Message(self.content(base, count)) + def ping(self, ssn): PING_Q = 'ping-queue; {create: always, delete: always}' # send a message @@ -70,7 +73,7 @@ class Base(Test): ssn.acknowledge() assert msg.content == content, "expected %r, got %r" % (content, msg.content) - def drain(self, rcv, limit=None, timeout=0, expected=None): + def drain(self, rcv, limit=None, timeout=0, expected=None, redelivered=False): messages = [] try: while limit is None or len(messages) < limit: @@ -78,21 +81,22 @@ class Base(Test): except Empty: pass if expected is not None: - self.assertEchos(expected, messages) + self.assertEchos(expected, messages, redelivered) return messages def diff(self, m1, m2): result = {} for attr in ("id", "subject", "user_id", "to", "reply_to", "correlation_id", "durable", "priority", "ttl", - "properties", "content_type", "content"): + "redelivered", "properties", "content_type", + "content"): a1 = getattr(m1, attr) a2 = getattr(m2, attr) if a1 != a2: result[attr] = (a1, a2) return result - def assertEcho(self, msg, echo): + def assertEcho(self, msg, echo, redelivered=False): if not isinstance(msg, Message) or not isinstance(echo, Message): if isinstance(msg, Message): msg = msg.content @@ -102,14 +106,19 @@ class Base(Test): else: delta = self.diff(msg, echo) mttl, ettl = delta.pop("ttl", (0, 0)) + if redelivered: + assert echo.redelivered, \ + "expected %s to be redelivered: %s" % (msg, echo) + if delta.has_key("redelivered"): + del delta["redelivered"] assert mttl is not None and ettl is not None, "%s, %s" % (mttl, ettl) assert mttl >= ettl, "%s, %s" % (mttl, ettl) assert not delta, "expected %s, got %s, delta %s" % (msg, echo, delta) - def assertEchos(self, msgs, echoes): + def assertEchos(self, msgs, echoes, redelivered=False): assert len(msgs) == len(echoes), "%s, %s" % (msgs, echoes) for m, e in zip(msgs, echoes): - self.assertEcho(m, e) + self.assertEcho(m, e, redelivered) def assertEmpty(self, rcv): contents = self.drain(rcv) diff --git a/python/qpid/tests/messaging/endpoints.py b/python/qpid/tests/messaging/endpoints.py index 049413206b..3f2c823914 100644 --- a/python/qpid/tests/messaging/endpoints.py +++ b/python/qpid/tests/messaging/endpoints.py @@ -227,21 +227,60 @@ class SessionTests(Base): def testAcknowledgeAsyncAckCapUNLIMITED(self): self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED) - def send(self, ssn, queue, base, count=1): - snd = ssn.sender(queue, durable=self.durable()) - contents = [] + def testRelease(self): + msgs = [self.message("testRelease", i) for i in range(3)] + snd = self.ssn.sender("test-release-queue; {create: always, delete: always}") + for m in msgs: + snd.send(m) + rcv = self.ssn.receiver(snd.target) + echos = self.drain(rcv, expected=msgs) + self.ssn.acknowledge(echos[0]) + self.ssn.acknowledge(echos[1], Disposition(RELEASED, set_redelivered=True)) + self.ssn.acknowledge(echos[2], Disposition(RELEASED)) + self.drain(rcv, limit=1, expected=msgs[1:2], redelivered=True) + self.drain(rcv, expected=msgs[2:3]) + self.ssn.acknowledge() + + def testReject(self): + msgs = [self.message("testReject", i) for i in range(3)] + snd = self.ssn.sender(""" + test-reject-queue; { + create: always, + delete: always, + node-properties: { + x-properties: { + alternate_exchange: 'amq.topic' + } + } + } +""") + for m in msgs: + snd.send(m) + rcv = self.ssn.receiver(snd.target) + rej = self.ssn.receiver("amq.topic") + echos = self.drain(rcv, expected=msgs) + self.ssn.acknowledge(echos[0]) + self.ssn.acknowledge(echos[1], Disposition(REJECTED)) + self.ssn.acknowledge(echos[2], + Disposition(REJECTED, code=3, text="test-reject")) + self.drain(rej, expected=msgs[1:]) + self.ssn.acknowledge() + + def send(self, ssn, target, base, count=1): + snd = ssn.sender(target, durable=self.durable()) + messages = [] for i in range(count): - c = self.content(base, i) + c = self.message(base, i) snd.send(c) - contents.append(c) + messages.append(c) snd.close() - return contents + return messages def txTest(self, commit): TX_Q = 'test-tx-queue; {create: sender, delete: receiver}' TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}' txssn = self.conn.session(transactional=True) - contents = self.send(self.ssn, TX_Q, "txTest", 3) + messages = self.send(self.ssn, TX_Q, "txTest", 3) txrcv = txssn.receiver(TX_Q) txsnd = txssn.sender(TX_Q_COPY, durable=self.durable()) rcv = self.ssn.receiver(txrcv.source) @@ -255,10 +294,10 @@ class SessionTests(Base): if commit: txssn.commit() self.assertEmpty(rcv) - self.drain(copy_rcv, expected=contents) + self.drain(copy_rcv, expected=messages) else: txssn.rollback() - self.drain(rcv, expected=contents) + self.drain(rcv, expected=messages, redelivered=True) self.assertEmpty(copy_rcv) self.ssn.acknowledge() @@ -271,13 +310,13 @@ class SessionTests(Base): def txTestSend(self, commit): TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}' txssn = self.conn.session(transactional=True) - contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3) + messages = self.send(txssn, TX_SEND_Q, "txTestSend", 3) rcv = self.ssn.receiver(TX_SEND_Q) self.assertEmpty(rcv) if commit: txssn.commit() - self.drain(rcv, expected=contents) + self.drain(rcv, expected=messages) self.ssn.acknowledge() else: txssn.rollback() @@ -297,17 +336,17 @@ class SessionTests(Base): txssn = self.conn.session(transactional=True) txrcv = txssn.receiver(TX_ACK_QC) self.assertEmpty(txrcv) - contents = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3) - self.drain(txrcv, expected=contents) + messages = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3) + self.drain(txrcv, expected=messages) if commit: txssn.acknowledge() else: txssn.rollback() - self.drain(txrcv, expected=contents) + self.drain(txrcv, expected=messages, redelivered=True) txssn.acknowledge() txssn.rollback() - self.drain(txrcv, expected=contents) + self.drain(txrcv, expected=messages, redelivered=True) txssn.commit() # commit without ack self.assertEmpty(txrcv) @@ -315,7 +354,7 @@ class SessionTests(Base): txssn = self.conn.session(transactional=True) txrcv = txssn.receiver(TX_ACK_QC) - self.drain(txrcv, expected=contents) + self.drain(txrcv, expected=messages, redelivered=True) txssn.acknowledge() txssn.commit() rcv = self.ssn.receiver(TX_ACK_QD) |