diff options
Diffstat (limited to 'qpid/python/tests_0-10/tx.py')
-rw-r--r-- | qpid/python/tests_0-10/tx.py | 56 |
1 files changed, 36 insertions, 20 deletions
diff --git a/qpid/python/tests_0-10/tx.py b/qpid/python/tests_0-10/tx.py index 0f6b4f5cd1..b499c2d1f9 100644 --- a/qpid/python/tests_0-10/tx.py +++ b/qpid/python/tests_0-10/tx.py @@ -30,23 +30,39 @@ class TxTests(TestBase): """ Test that commited publishes are delivered and commited acks are not re-delivered """ + channel2 = self.client.channel(2) + channel2.channel_open() + self.perform_txn_work(channel2, "tx-commit-a", "tx-commit-b", "tx-commit-c") + channel2.tx_commit() + channel2.channel_close() + + #use a different channel with new subscriptions to ensure + #there is no redelivery of acked messages: channel = self.channel - queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-commit-a", "tx-commit-b", "tx-commit-c") - channel.tx_commit() + channel.tx_select() + + channel.message_consume(queue="tx-commit-a", destination="qa", no_ack=False) + queue_a = self.client.queue("qa") + + channel.message_consume(queue="tx-commit-b", destination="qb", no_ack=False) + queue_b = self.client.queue("qb") + + channel.message_consume(queue="tx-commit-c", destination="qc", no_ack=False) + queue_c = self.client.queue("qc") #check results for i in range(1, 5): msg = queue_c.get(timeout=1) self.assertEqual("TxMessage %d" % i, msg.body) - msg.ok() + msg.complete() msg = queue_b.get(timeout=1) self.assertEqual("TxMessage 6", msg.body) - msg.ok() + msg.complete() msg = queue_a.get(timeout=1) self.assertEqual("TxMessage 7", msg.body) - msg.ok() + msg.complete() for q in [queue_a, queue_b, queue_c]: try: @@ -76,15 +92,15 @@ class TxTests(TestBase): for i in range(1, 5): msg = queue_a.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msg.ok() + msg.complete() msg = queue_b.get(timeout=1) self.assertEqual("Message 6", msg.body) - msg.ok() + msg.complete() msg = queue_c.get(timeout=1) self.assertEqual("Message 7", msg.body) - msg.ok() + msg.complete() for q in [queue_a, queue_b, queue_c]: try: @@ -114,15 +130,15 @@ class TxTests(TestBase): for i in range(1, 5): msg = queue_a.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msg.ok() + msg.complete() msg = queue_b.get(timeout=1) self.assertEqual("Message 6", msg.body) - msg.ok() + msg.complete() msg = queue_c.get(timeout=1) self.assertEqual("Message 7", msg.body) - msg.ok() + msg.complete() for q in [queue_a, queue_b, queue_c]: try: @@ -150,10 +166,10 @@ class TxTests(TestBase): channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic) for i in range(1, 5): - channel.message_transfer(routing_key=name_a, body="Message %d" % i) + channel.message_transfer(routing_key=name_a, message_id="msg%d" % i, body="Message %d" % i) - channel.message_transfer(routing_key=key, destination="amq.direct", body="Message 6") - channel.message_transfer(routing_key=topic, destination="amq.topic", body="Message 7") + channel.message_transfer(routing_key=key, destination="amq.direct", message_id="msg6", body="Message 6") + channel.message_transfer(routing_key=topic, destination="amq.topic", message_id="msg7", body="Message 7") channel.tx_select() @@ -164,25 +180,25 @@ class TxTests(TestBase): msg = queue_a.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msg.ok(batchoffset=-3) + msg.complete() channel.message_consume(queue=name_b, destination="sub_b", no_ack=False) queue_b = self.client.queue("sub_b") msg = queue_b.get(timeout=1) self.assertEqual("Message 6", msg.body) - msg.ok() + msg.complete() sub_c = channel.message_consume(queue=name_c, destination="sub_c", no_ack=False) queue_c = self.client.queue("sub_c") msg = queue_c.get(timeout=1) self.assertEqual("Message 7", msg.body) - msg.ok() + msg.complete() #publish messages for i in range(1, 5): - channel.message_transfer(routing_key=topic, destination="amq.topic", body="TxMessage %d" % i) + channel.message_transfer(routing_key=topic, destination="amq.topic", message_id="tx-msg%d" % i, body="TxMessage %d" % i) - channel.message_transfer(routing_key=key, destination="amq.direct", body="TxMessage 6") - channel.message_transfer(routing_key=name_a, body="TxMessage 7") + channel.message_transfer(routing_key=key, destination="amq.direct", message_id="tx-msg6", body="TxMessage 6") + channel.message_transfer(routing_key=name_a, message_id="tx-msg7", body="TxMessage 7") return queue_a, queue_b, queue_c |