diff options
Diffstat (limited to 'M4-RCs/qpid/python/tests_0-9/dtx.py')
-rw-r--r-- | M4-RCs/qpid/python/tests_0-9/dtx.py | 587 |
1 files changed, 0 insertions, 587 deletions
diff --git a/M4-RCs/qpid/python/tests_0-9/dtx.py b/M4-RCs/qpid/python/tests_0-9/dtx.py deleted file mode 100644 index bc268f4129..0000000000 --- a/M4-RCs/qpid/python/tests_0-9/dtx.py +++ /dev/null @@ -1,587 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from qpid.client import Client, Closed -from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import testrunner, TestBase -from struct import pack, unpack -from time import sleep - -class DtxTests(TestBase): - """ - Tests for the amqp dtx related classes. - - Tests of the form test_simple_xxx test the basic transactional - behaviour. The approach here is to 'swap' a message from one queue - to another by consuming and re-publishing in the same - transaction. That transaction is then completed in different ways - and the appropriate result verified. - - The other tests enforce more specific rules and behaviour on a - per-method or per-field basis. - """ - - XA_RBROLLBACK = 1 - XA_RBTIMEOUT = 2 - XA_OK = 8 - - def test_simple_commit(self): - """ - Test basic one-phase commit behaviour. - """ - channel = self.channel - tx = self.xid("my-xid") - self.txswap(tx, "commit") - - #neither queue should have any messages accessible - self.assertMessageCount(0, "queue-a") - self.assertMessageCount(0, "queue-b") - - #commit - self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=True).flags) - - #check result - self.assertMessageCount(0, "queue-a") - self.assertMessageCount(1, "queue-b") - self.assertMessageId("commit", "queue-b") - - def test_simple_prepare_commit(self): - """ - Test basic two-phase commit behaviour. - """ - channel = self.channel - tx = self.xid("my-xid") - self.txswap(tx, "prepare-commit") - - #prepare - self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).flags) - - #neither queue should have any messages accessible - self.assertMessageCount(0, "queue-a") - self.assertMessageCount(0, "queue-b") - - #commit - self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=False).flags) - - #check result - self.assertMessageCount(0, "queue-a") - self.assertMessageCount(1, "queue-b") - self.assertMessageId("prepare-commit", "queue-b") - - - def test_simple_rollback(self): - """ - Test basic rollback behaviour. - """ - channel = self.channel - tx = self.xid("my-xid") - self.txswap(tx, "rollback") - - #neither queue should have any messages accessible - self.assertMessageCount(0, "queue-a") - self.assertMessageCount(0, "queue-b") - - #rollback - self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).flags) - - #check result - self.assertMessageCount(1, "queue-a") - self.assertMessageCount(0, "queue-b") - self.assertMessageId("rollback", "queue-a") - - def test_simple_prepare_rollback(self): - """ - Test basic rollback behaviour after the transaction has been prepared. - """ - channel = self.channel - tx = self.xid("my-xid") - self.txswap(tx, "prepare-rollback") - - #prepare - self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).flags) - - #neither queue should have any messages accessible - self.assertMessageCount(0, "queue-a") - self.assertMessageCount(0, "queue-b") - - #rollback - self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).flags) - - #check result - self.assertMessageCount(1, "queue-a") - self.assertMessageCount(0, "queue-b") - self.assertMessageId("prepare-rollback", "queue-a") - - def test_select_required(self): - """ - check that an error is flagged if select is not issued before - start or end - """ - channel = self.channel - tx = self.xid("dummy") - try: - channel.dtx_demarcation_start(xid=tx) - - #if we get here we have failed, but need to do some cleanup: - channel.dtx_demarcation_end(xid=tx) - channel.dtx_coordination_rollback(xid=tx) - self.fail("Channel not selected for use with dtx, expected exception!") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def test_start_already_known(self): - """ - Verify that an attempt to start an association with a - transaction that is already known is not allowed (unless the - join flag is set). - """ - #create two channels on different connection & select them for use with dtx: - channel1 = self.channel - channel1.dtx_demarcation_select() - - other = self.connect() - channel2 = other.channel(1) - channel2.channel_open() - channel2.dtx_demarcation_select() - - #create a xid - tx = self.xid("dummy") - #start work on one channel under that xid: - channel1.dtx_demarcation_start(xid=tx) - #then start on the other without the join set - failed = False - try: - channel2.dtx_demarcation_start(xid=tx) - except Closed, e: - failed = True - error = e - - #cleanup: - if not failed: - channel2.dtx_demarcation_end(xid=tx) - other.close() - channel1.dtx_demarcation_end(xid=tx) - channel1.dtx_coordination_rollback(xid=tx) - - #verification: - if failed: self.assertConnectionException(503, e.args[0]) - else: self.fail("Xid already known, expected exception!") - - def test_forget_xid_on_completion(self): - """ - Verify that a xid is 'forgotten' - and can therefore be used - again - once it is completed. - """ - channel = self.channel - #do some transactional work & complete the transaction - self.test_simple_commit() - - #start association for the same xid as the previously completed txn - tx = self.xid("my-xid") - channel.dtx_demarcation_start(xid=tx) - channel.dtx_demarcation_end(xid=tx) - channel.dtx_coordination_rollback(xid=tx) - - def test_start_join_and_resume(self): - """ - Ensure the correct error is signalled when both the join and - resume flags are set on starting an association between a - channel and a transcation. - """ - channel = self.channel - channel.dtx_demarcation_select() - tx = self.xid("dummy") - try: - channel.dtx_demarcation_start(xid=tx, join=True, resume=True) - #failed, but need some cleanup: - channel.dtx_demarcation_end(xid=tx) - channel.dtx_coordination_rollback(xid=tx) - self.fail("Join and resume both set, expected exception!") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - def test_start_join(self): - """ - Verify 'join' behaviour, where a channel is associated with a - transaction that is already associated with another channel. - """ - #create two channels & select them for use with dtx: - channel1 = self.channel - channel1.dtx_demarcation_select() - - channel2 = self.client.channel(2) - channel2.channel_open() - channel2.dtx_demarcation_select() - - #setup - channel1.queue_declare(queue="one", exclusive=True) - channel1.queue_declare(queue="two", exclusive=True) - channel1.message_transfer(routing_key="one", message_id="a", body="DtxMessage") - channel1.message_transfer(routing_key="two", message_id="b", body="DtxMessage") - - #create a xid - tx = self.xid("dummy") - #start work on one channel under that xid: - channel1.dtx_demarcation_start(xid=tx) - #then start on the other with the join flag set - channel2.dtx_demarcation_start(xid=tx, join=True) - - #do work through each channel - self.swap(channel1, "one", "two")#swap 'a' from 'one' to 'two' - self.swap(channel2, "two", "one")#swap 'b' from 'two' to 'one' - - #mark end on both channels - channel1.dtx_demarcation_end(xid=tx) - channel2.dtx_demarcation_end(xid=tx) - - #commit and check - channel1.dtx_coordination_commit(xid=tx, one_phase=True) - self.assertMessageCount(1, "one") - self.assertMessageCount(1, "two") - self.assertMessageId("a", "two") - self.assertMessageId("b", "one") - - - def test_suspend_resume(self): - """ - Test suspension and resumption of an association - """ - channel = self.channel - channel.dtx_demarcation_select() - - #setup - channel.queue_declare(queue="one", exclusive=True) - channel.queue_declare(queue="two", exclusive=True) - channel.message_transfer(routing_key="one", message_id="a", body="DtxMessage") - channel.message_transfer(routing_key="two", message_id="b", body="DtxMessage") - - tx = self.xid("dummy") - - channel.dtx_demarcation_start(xid=tx) - self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two' - channel.dtx_demarcation_end(xid=tx, suspend=True) - - channel.dtx_demarcation_start(xid=tx, resume=True) - self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one' - channel.dtx_demarcation_end(xid=tx) - - #commit and check - channel.dtx_coordination_commit(xid=tx, one_phase=True) - self.assertMessageCount(1, "one") - self.assertMessageCount(1, "two") - self.assertMessageId("a", "two") - self.assertMessageId("b", "one") - - def test_end_suspend_and_fail(self): - """ - Verify that the correct error is signalled if the suspend and - fail flag are both set when disassociating a transaction from - the channel - """ - channel = self.channel - channel.dtx_demarcation_select() - tx = self.xid("suspend_and_fail") - channel.dtx_demarcation_start(xid=tx) - try: - channel.dtx_demarcation_end(xid=tx, suspend=True, fail=True) - self.fail("Suspend and fail both set, expected exception!") - except Closed, e: - self.assertConnectionException(503, e.args[0]) - - #cleanup - other = self.connect() - channel = other.channel(1) - channel.channel_open() - channel.dtx_coordination_rollback(xid=tx) - channel.channel_close() - other.close() - - - def test_end_unknown_xid(self): - """ - Verifies that the correct exception is thrown when an attempt - is made to end the association for a xid not previously - associated with the channel - """ - channel = self.channel - channel.dtx_demarcation_select() - tx = self.xid("unknown-xid") - try: - channel.dtx_demarcation_end(xid=tx) - self.fail("Attempted to end association with unknown xid, expected exception!") - except Closed, e: - #FYI: this is currently *not* the exception specified, but I think the spec is wrong! Confirming... - self.assertConnectionException(503, e.args[0]) - - def test_end(self): - """ - Verify that the association is terminated by end and subsequent - operations are non-transactional - """ - channel = self.client.channel(2) - channel.channel_open() - channel.queue_declare(queue="tx-queue", exclusive=True) - - #publish a message under a transaction - channel.dtx_demarcation_select() - tx = self.xid("dummy") - channel.dtx_demarcation_start(xid=tx) - channel.message_transfer(routing_key="tx-queue", message_id="one", body="DtxMessage") - channel.dtx_demarcation_end(xid=tx) - - #now that association with txn is ended, publish another message - channel.message_transfer(routing_key="tx-queue", message_id="two", body="DtxMessage") - - #check the second message is available, but not the first - self.assertMessageCount(1, "tx-queue") - channel.message_consume(queue="tx-queue", destination="results", no_ack=False) - msg = self.client.queue("results").get(timeout=1) - self.assertEqual("two", msg.message_id) - channel.message_cancel(destination="results") - #ack the message then close the channel - msg.ok() - channel.channel_close() - - channel = self.channel - #commit the transaction and check that the first message (and - #only the first message) is then delivered - channel.dtx_coordination_commit(xid=tx, one_phase=True) - self.assertMessageCount(1, "tx-queue") - self.assertMessageId("one", "tx-queue") - - def test_invalid_commit_one_phase_true(self): - """ - Test that a commit with one_phase = True is rejected if the - transaction in question has already been prepared. - """ - other = self.connect() - tester = other.channel(1) - tester.channel_open() - tester.queue_declare(queue="dummy", exclusive=True) - tester.dtx_demarcation_select() - tx = self.xid("dummy") - tester.dtx_demarcation_start(xid=tx) - tester.message_transfer(routing_key="dummy", body="whatever") - tester.dtx_demarcation_end(xid=tx) - tester.dtx_coordination_prepare(xid=tx) - failed = False - try: - tester.dtx_coordination_commit(xid=tx, one_phase=True) - except Closed, e: - failed = True - error = e - - if failed: - self.channel.dtx_coordination_rollback(xid=tx) - self.assertConnectionException(503, e.args[0]) - else: - tester.channel_close() - other.close() - self.fail("Invalid use of one_phase=True, expected exception!") - - def test_invalid_commit_one_phase_false(self): - """ - Test that a commit with one_phase = False is rejected if the - transaction in question has not yet been prepared. - """ - """ - Test that a commit with one_phase = True is rejected if the - transaction in question has already been prepared. - """ - other = self.connect() - tester = other.channel(1) - tester.channel_open() - tester.queue_declare(queue="dummy", exclusive=True) - tester.dtx_demarcation_select() - tx = self.xid("dummy") - tester.dtx_demarcation_start(xid=tx) - tester.message_transfer(routing_key="dummy", body="whatever") - tester.dtx_demarcation_end(xid=tx) - failed = False - try: - tester.dtx_coordination_commit(xid=tx, one_phase=False) - except Closed, e: - failed = True - error = e - - if failed: - self.channel.dtx_coordination_rollback(xid=tx) - self.assertConnectionException(503, e.args[0]) - else: - tester.channel_close() - other.close() - self.fail("Invalid use of one_phase=False, expected exception!") - - def test_implicit_end(self): - """ - Test that an association is implicitly ended when the channel - is closed (whether by exception or explicit client request) - and the transaction in question is marked as rollback only. - """ - channel1 = self.channel - channel2 = self.client.channel(2) - channel2.channel_open() - - #setup: - channel2.queue_declare(queue="dummy", exclusive=True) - channel2.message_transfer(routing_key="dummy", body="whatever") - tx = self.xid("dummy") - - channel2.dtx_demarcation_select() - channel2.dtx_demarcation_start(xid=tx) - channel2.message_get(queue="dummy", destination="dummy") - self.client.queue("dummy").get(timeout=1).ok() - channel2.message_transfer(routing_key="dummy", body="whatever") - channel2.channel_close() - - self.assertEqual(self.XA_RBROLLBACK, channel1.dtx_coordination_prepare(xid=tx).flags) - channel1.dtx_coordination_rollback(xid=tx) - - def test_get_timeout(self): - """ - Check that get-timeout returns the correct value, (and that a - transaction with a timeout can complete normally) - """ - channel = self.channel - tx = self.xid("dummy") - - channel.dtx_demarcation_select() - channel.dtx_demarcation_start(xid=tx) - self.assertEqual(0, channel.dtx_coordination_get_timeout(xid=tx).timeout) - channel.dtx_coordination_set_timeout(xid=tx, timeout=60) - self.assertEqual(60, channel.dtx_coordination_get_timeout(xid=tx).timeout) - self.assertEqual(self.XA_OK, channel.dtx_demarcation_end(xid=tx).flags) - self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).flags) - - def test_set_timeout(self): - """ - Test the timeout of a transaction results in the expected - behaviour - """ - #open new channel to allow self.channel to be used in checking te queue - channel = self.client.channel(2) - channel.channel_open() - #setup: - tx = self.xid("dummy") - channel.queue_declare(queue="queue-a", exclusive=True) - channel.queue_declare(queue="queue-b", exclusive=True) - channel.message_transfer(routing_key="queue-a", message_id="timeout", body="DtxMessage") - - channel.dtx_demarcation_select() - channel.dtx_demarcation_start(xid=tx) - self.swap(channel, "queue-a", "queue-b") - channel.dtx_coordination_set_timeout(xid=tx, timeout=2) - sleep(3) - #check that the work has been rolled back already - self.assertMessageCount(1, "queue-a") - self.assertMessageCount(0, "queue-b") - self.assertMessageId("timeout", "queue-a") - #check the correct codes are returned when we try to complete the txn - self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_demarcation_end(xid=tx).flags) - self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_coordination_rollback(xid=tx).flags) - - - - def test_recover(self): - """ - Test basic recover behaviour - """ - channel = self.channel - - channel.dtx_demarcation_select() - channel.queue_declare(queue="dummy", exclusive=True) - - prepared = [] - for i in range(1, 10): - tx = self.xid("tx%s" % (i)) - channel.dtx_demarcation_start(xid=tx) - channel.message_transfer(routing_key="dummy", body="message%s" % (i)) - channel.dtx_demarcation_end(xid=tx) - if i in [2, 5, 6, 8]: - channel.dtx_coordination_prepare(xid=tx) - prepared.append(tx) - else: - channel.dtx_coordination_rollback(xid=tx) - - indoubt = channel.dtx_coordination_recover().xids - #convert indoubt table to a list of xids (note: this will change for 0-10) - data = indoubt["xids"] - xids = [] - pos = 0 - while pos < len(data): - size = unpack("!B", data[pos])[0] - start = pos + 1 - end = start + size - xid = data[start:end] - xids.append(xid) - pos = end - - #rollback the prepared transactions returned by recover - for x in xids: - channel.dtx_coordination_rollback(xid=x) - - #validate against the expected list of prepared transactions - actual = set(xids) - expected = set(prepared) - intersection = actual.intersection(expected) - - if intersection != expected: - missing = expected.difference(actual) - extra = actual.difference(expected) - for x in missing: - channel.dtx_coordination_rollback(xid=x) - self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra)) - - def xid(self, txid, branchqual = ''): - return pack('LBB', 0, len(txid), len(branchqual)) + txid + branchqual - - def txswap(self, tx, id): - channel = self.channel - #declare two queues: - channel.queue_declare(queue="queue-a", exclusive=True) - channel.queue_declare(queue="queue-b", exclusive=True) - #put message with specified id on one queue: - channel.message_transfer(routing_key="queue-a", message_id=id, body="DtxMessage") - - #start the transaction: - channel.dtx_demarcation_select() - self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_start(xid=tx).flags) - - #'swap' the message from one queue to the other, under that transaction: - self.swap(self.channel, "queue-a", "queue-b") - - #mark the end of the transactional work: - self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_end(xid=tx).flags) - - def swap(self, channel, src, dest): - #consume from src: - channel.message_get(destination="temp-swap", queue=src) - msg = self.client.queue("temp-swap").get(timeout=1) - msg.ok(); - - #re-publish to dest - channel.message_transfer(routing_key=dest, message_id=msg.message_id, body=msg.body) - - def assertMessageCount(self, expected, queue): - self.assertEqual(expected, self.channel.queue_declare(queue=queue, passive=True).message_count) - - def assertMessageId(self, expected, queue): - self.channel.message_consume(queue=queue, destination="results", no_ack=True) - self.assertEqual(expected, self.client.queue("results").get(timeout=1).message_id) - self.channel.message_cancel(destination="results") |