diff options
Diffstat (limited to 'python/qpid/tests/messaging/endpoints.py')
-rw-r--r-- | python/qpid/tests/messaging/endpoints.py | 71 |
1 files changed, 55 insertions, 16 deletions
diff --git a/python/qpid/tests/messaging/endpoints.py b/python/qpid/tests/messaging/endpoints.py index 049413206b..3f2c823914 100644 --- a/python/qpid/tests/messaging/endpoints.py +++ b/python/qpid/tests/messaging/endpoints.py @@ -227,21 +227,60 @@ class SessionTests(Base): def testAcknowledgeAsyncAckCapUNLIMITED(self): self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED) - def send(self, ssn, queue, base, count=1): - snd = ssn.sender(queue, durable=self.durable()) - contents = [] + def testRelease(self): + msgs = [self.message("testRelease", i) for i in range(3)] + snd = self.ssn.sender("test-release-queue; {create: always, delete: always}") + for m in msgs: + snd.send(m) + rcv = self.ssn.receiver(snd.target) + echos = self.drain(rcv, expected=msgs) + self.ssn.acknowledge(echos[0]) + self.ssn.acknowledge(echos[1], Disposition(RELEASED, set_redelivered=True)) + self.ssn.acknowledge(echos[2], Disposition(RELEASED)) + self.drain(rcv, limit=1, expected=msgs[1:2], redelivered=True) + self.drain(rcv, expected=msgs[2:3]) + self.ssn.acknowledge() + + def testReject(self): + msgs = [self.message("testReject", i) for i in range(3)] + snd = self.ssn.sender(""" + test-reject-queue; { + create: always, + delete: always, + node-properties: { + x-properties: { + alternate_exchange: 'amq.topic' + } + } + } +""") + for m in msgs: + snd.send(m) + rcv = self.ssn.receiver(snd.target) + rej = self.ssn.receiver("amq.topic") + echos = self.drain(rcv, expected=msgs) + self.ssn.acknowledge(echos[0]) + self.ssn.acknowledge(echos[1], Disposition(REJECTED)) + self.ssn.acknowledge(echos[2], + Disposition(REJECTED, code=3, text="test-reject")) + self.drain(rej, expected=msgs[1:]) + self.ssn.acknowledge() + + def send(self, ssn, target, base, count=1): + snd = ssn.sender(target, durable=self.durable()) + messages = [] for i in range(count): - c = self.content(base, i) + c = self.message(base, i) snd.send(c) - contents.append(c) + messages.append(c) snd.close() - return contents + return messages def txTest(self, commit): TX_Q = 'test-tx-queue; {create: sender, delete: receiver}' TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}' txssn = self.conn.session(transactional=True) - contents = self.send(self.ssn, TX_Q, "txTest", 3) + messages = self.send(self.ssn, TX_Q, "txTest", 3) txrcv = txssn.receiver(TX_Q) txsnd = txssn.sender(TX_Q_COPY, durable=self.durable()) rcv = self.ssn.receiver(txrcv.source) @@ -255,10 +294,10 @@ class SessionTests(Base): if commit: txssn.commit() self.assertEmpty(rcv) - self.drain(copy_rcv, expected=contents) + self.drain(copy_rcv, expected=messages) else: txssn.rollback() - self.drain(rcv, expected=contents) + self.drain(rcv, expected=messages, redelivered=True) self.assertEmpty(copy_rcv) self.ssn.acknowledge() @@ -271,13 +310,13 @@ class SessionTests(Base): def txTestSend(self, commit): TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}' txssn = self.conn.session(transactional=True) - contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3) + messages = self.send(txssn, TX_SEND_Q, "txTestSend", 3) rcv = self.ssn.receiver(TX_SEND_Q) self.assertEmpty(rcv) if commit: txssn.commit() - self.drain(rcv, expected=contents) + self.drain(rcv, expected=messages) self.ssn.acknowledge() else: txssn.rollback() @@ -297,17 +336,17 @@ class SessionTests(Base): txssn = self.conn.session(transactional=True) txrcv = txssn.receiver(TX_ACK_QC) self.assertEmpty(txrcv) - contents = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3) - self.drain(txrcv, expected=contents) + messages = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3) + self.drain(txrcv, expected=messages) if commit: txssn.acknowledge() else: txssn.rollback() - self.drain(txrcv, expected=contents) + self.drain(txrcv, expected=messages, redelivered=True) txssn.acknowledge() txssn.rollback() - self.drain(txrcv, expected=contents) + self.drain(txrcv, expected=messages, redelivered=True) txssn.commit() # commit without ack self.assertEmpty(txrcv) @@ -315,7 +354,7 @@ class SessionTests(Base): txssn = self.conn.session(transactional=True) txrcv = txssn.receiver(TX_ACK_QC) - self.drain(txrcv, expected=contents) + self.drain(txrcv, expected=messages, redelivered=True) txssn.acknowledge() txssn.commit() rcv = self.ssn.receiver(TX_ACK_QD) |