summaryrefslogtreecommitdiff
path: root/python/qpid/tests/messaging/endpoints.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/tests/messaging/endpoints.py')
-rw-r--r--python/qpid/tests/messaging/endpoints.py71
1 files changed, 55 insertions, 16 deletions
diff --git a/python/qpid/tests/messaging/endpoints.py b/python/qpid/tests/messaging/endpoints.py
index 049413206b..3f2c823914 100644
--- a/python/qpid/tests/messaging/endpoints.py
+++ b/python/qpid/tests/messaging/endpoints.py
@@ -227,21 +227,60 @@ class SessionTests(Base):
def testAcknowledgeAsyncAckCapUNLIMITED(self):
self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED)
- def send(self, ssn, queue, base, count=1):
- snd = ssn.sender(queue, durable=self.durable())
- contents = []
+ def testRelease(self):
+ msgs = [self.message("testRelease", i) for i in range(3)]
+ snd = self.ssn.sender("test-release-queue; {create: always, delete: always}")
+ for m in msgs:
+ snd.send(m)
+ rcv = self.ssn.receiver(snd.target)
+ echos = self.drain(rcv, expected=msgs)
+ self.ssn.acknowledge(echos[0])
+ self.ssn.acknowledge(echos[1], Disposition(RELEASED, set_redelivered=True))
+ self.ssn.acknowledge(echos[2], Disposition(RELEASED))
+ self.drain(rcv, limit=1, expected=msgs[1:2], redelivered=True)
+ self.drain(rcv, expected=msgs[2:3])
+ self.ssn.acknowledge()
+
+ def testReject(self):
+ msgs = [self.message("testReject", i) for i in range(3)]
+ snd = self.ssn.sender("""
+ test-reject-queue; {
+ create: always,
+ delete: always,
+ node-properties: {
+ x-properties: {
+ alternate_exchange: 'amq.topic'
+ }
+ }
+ }
+""")
+ for m in msgs:
+ snd.send(m)
+ rcv = self.ssn.receiver(snd.target)
+ rej = self.ssn.receiver("amq.topic")
+ echos = self.drain(rcv, expected=msgs)
+ self.ssn.acknowledge(echos[0])
+ self.ssn.acknowledge(echos[1], Disposition(REJECTED))
+ self.ssn.acknowledge(echos[2],
+ Disposition(REJECTED, code=3, text="test-reject"))
+ self.drain(rej, expected=msgs[1:])
+ self.ssn.acknowledge()
+
+ def send(self, ssn, target, base, count=1):
+ snd = ssn.sender(target, durable=self.durable())
+ messages = []
for i in range(count):
- c = self.content(base, i)
+ c = self.message(base, i)
snd.send(c)
- contents.append(c)
+ messages.append(c)
snd.close()
- return contents
+ return messages
def txTest(self, commit):
TX_Q = 'test-tx-queue; {create: sender, delete: receiver}'
TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}'
txssn = self.conn.session(transactional=True)
- contents = self.send(self.ssn, TX_Q, "txTest", 3)
+ messages = self.send(self.ssn, TX_Q, "txTest", 3)
txrcv = txssn.receiver(TX_Q)
txsnd = txssn.sender(TX_Q_COPY, durable=self.durable())
rcv = self.ssn.receiver(txrcv.source)
@@ -255,10 +294,10 @@ class SessionTests(Base):
if commit:
txssn.commit()
self.assertEmpty(rcv)
- self.drain(copy_rcv, expected=contents)
+ self.drain(copy_rcv, expected=messages)
else:
txssn.rollback()
- self.drain(rcv, expected=contents)
+ self.drain(rcv, expected=messages, redelivered=True)
self.assertEmpty(copy_rcv)
self.ssn.acknowledge()
@@ -271,13 +310,13 @@ class SessionTests(Base):
def txTestSend(self, commit):
TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}'
txssn = self.conn.session(transactional=True)
- contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
+ messages = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
rcv = self.ssn.receiver(TX_SEND_Q)
self.assertEmpty(rcv)
if commit:
txssn.commit()
- self.drain(rcv, expected=contents)
+ self.drain(rcv, expected=messages)
self.ssn.acknowledge()
else:
txssn.rollback()
@@ -297,17 +336,17 @@ class SessionTests(Base):
txssn = self.conn.session(transactional=True)
txrcv = txssn.receiver(TX_ACK_QC)
self.assertEmpty(txrcv)
- contents = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3)
- self.drain(txrcv, expected=contents)
+ messages = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3)
+ self.drain(txrcv, expected=messages)
if commit:
txssn.acknowledge()
else:
txssn.rollback()
- self.drain(txrcv, expected=contents)
+ self.drain(txrcv, expected=messages, redelivered=True)
txssn.acknowledge()
txssn.rollback()
- self.drain(txrcv, expected=contents)
+ self.drain(txrcv, expected=messages, redelivered=True)
txssn.commit() # commit without ack
self.assertEmpty(txrcv)
@@ -315,7 +354,7 @@ class SessionTests(Base):
txssn = self.conn.session(transactional=True)
txrcv = txssn.receiver(TX_ACK_QC)
- self.drain(txrcv, expected=contents)
+ self.drain(txrcv, expected=messages, redelivered=True)
txssn.acknowledge()
txssn.commit()
rcv = self.ssn.receiver(TX_ACK_QD)