summaryrefslogtreecommitdiff
path: root/M4-RCs/qpid/python/tests_0-9/dtx.py
diff options
context:
space:
mode:
Diffstat (limited to 'M4-RCs/qpid/python/tests_0-9/dtx.py')
-rw-r--r--M4-RCs/qpid/python/tests_0-9/dtx.py587
1 files changed, 0 insertions, 587 deletions
diff --git a/M4-RCs/qpid/python/tests_0-9/dtx.py b/M4-RCs/qpid/python/tests_0-9/dtx.py
deleted file mode 100644
index bc268f4129..0000000000
--- a/M4-RCs/qpid/python/tests_0-9/dtx.py
+++ /dev/null
@@ -1,587 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from qpid.client import Client, Closed
-from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-from struct import pack, unpack
-from time import sleep
-
-class DtxTests(TestBase):
- """
- Tests for the amqp dtx related classes.
-
- Tests of the form test_simple_xxx test the basic transactional
- behaviour. The approach here is to 'swap' a message from one queue
- to another by consuming and re-publishing in the same
- transaction. That transaction is then completed in different ways
- and the appropriate result verified.
-
- The other tests enforce more specific rules and behaviour on a
- per-method or per-field basis.
- """
-
- XA_RBROLLBACK = 1
- XA_RBTIMEOUT = 2
- XA_OK = 8
-
- def test_simple_commit(self):
- """
- Test basic one-phase commit behaviour.
- """
- channel = self.channel
- tx = self.xid("my-xid")
- self.txswap(tx, "commit")
-
- #neither queue should have any messages accessible
- self.assertMessageCount(0, "queue-a")
- self.assertMessageCount(0, "queue-b")
-
- #commit
- self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=True).flags)
-
- #check result
- self.assertMessageCount(0, "queue-a")
- self.assertMessageCount(1, "queue-b")
- self.assertMessageId("commit", "queue-b")
-
- def test_simple_prepare_commit(self):
- """
- Test basic two-phase commit behaviour.
- """
- channel = self.channel
- tx = self.xid("my-xid")
- self.txswap(tx, "prepare-commit")
-
- #prepare
- self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).flags)
-
- #neither queue should have any messages accessible
- self.assertMessageCount(0, "queue-a")
- self.assertMessageCount(0, "queue-b")
-
- #commit
- self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=False).flags)
-
- #check result
- self.assertMessageCount(0, "queue-a")
- self.assertMessageCount(1, "queue-b")
- self.assertMessageId("prepare-commit", "queue-b")
-
-
- def test_simple_rollback(self):
- """
- Test basic rollback behaviour.
- """
- channel = self.channel
- tx = self.xid("my-xid")
- self.txswap(tx, "rollback")
-
- #neither queue should have any messages accessible
- self.assertMessageCount(0, "queue-a")
- self.assertMessageCount(0, "queue-b")
-
- #rollback
- self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).flags)
-
- #check result
- self.assertMessageCount(1, "queue-a")
- self.assertMessageCount(0, "queue-b")
- self.assertMessageId("rollback", "queue-a")
-
- def test_simple_prepare_rollback(self):
- """
- Test basic rollback behaviour after the transaction has been prepared.
- """
- channel = self.channel
- tx = self.xid("my-xid")
- self.txswap(tx, "prepare-rollback")
-
- #prepare
- self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).flags)
-
- #neither queue should have any messages accessible
- self.assertMessageCount(0, "queue-a")
- self.assertMessageCount(0, "queue-b")
-
- #rollback
- self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).flags)
-
- #check result
- self.assertMessageCount(1, "queue-a")
- self.assertMessageCount(0, "queue-b")
- self.assertMessageId("prepare-rollback", "queue-a")
-
- def test_select_required(self):
- """
- check that an error is flagged if select is not issued before
- start or end
- """
- channel = self.channel
- tx = self.xid("dummy")
- try:
- channel.dtx_demarcation_start(xid=tx)
-
- #if we get here we have failed, but need to do some cleanup:
- channel.dtx_demarcation_end(xid=tx)
- channel.dtx_coordination_rollback(xid=tx)
- self.fail("Channel not selected for use with dtx, expected exception!")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- def test_start_already_known(self):
- """
- Verify that an attempt to start an association with a
- transaction that is already known is not allowed (unless the
- join flag is set).
- """
- #create two channels on different connection & select them for use with dtx:
- channel1 = self.channel
- channel1.dtx_demarcation_select()
-
- other = self.connect()
- channel2 = other.channel(1)
- channel2.channel_open()
- channel2.dtx_demarcation_select()
-
- #create a xid
- tx = self.xid("dummy")
- #start work on one channel under that xid:
- channel1.dtx_demarcation_start(xid=tx)
- #then start on the other without the join set
- failed = False
- try:
- channel2.dtx_demarcation_start(xid=tx)
- except Closed, e:
- failed = True
- error = e
-
- #cleanup:
- if not failed:
- channel2.dtx_demarcation_end(xid=tx)
- other.close()
- channel1.dtx_demarcation_end(xid=tx)
- channel1.dtx_coordination_rollback(xid=tx)
-
- #verification:
- if failed: self.assertConnectionException(503, e.args[0])
- else: self.fail("Xid already known, expected exception!")
-
- def test_forget_xid_on_completion(self):
- """
- Verify that a xid is 'forgotten' - and can therefore be used
- again - once it is completed.
- """
- channel = self.channel
- #do some transactional work & complete the transaction
- self.test_simple_commit()
-
- #start association for the same xid as the previously completed txn
- tx = self.xid("my-xid")
- channel.dtx_demarcation_start(xid=tx)
- channel.dtx_demarcation_end(xid=tx)
- channel.dtx_coordination_rollback(xid=tx)
-
- def test_start_join_and_resume(self):
- """
- Ensure the correct error is signalled when both the join and
- resume flags are set on starting an association between a
- channel and a transcation.
- """
- channel = self.channel
- channel.dtx_demarcation_select()
- tx = self.xid("dummy")
- try:
- channel.dtx_demarcation_start(xid=tx, join=True, resume=True)
- #failed, but need some cleanup:
- channel.dtx_demarcation_end(xid=tx)
- channel.dtx_coordination_rollback(xid=tx)
- self.fail("Join and resume both set, expected exception!")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- def test_start_join(self):
- """
- Verify 'join' behaviour, where a channel is associated with a
- transaction that is already associated with another channel.
- """
- #create two channels & select them for use with dtx:
- channel1 = self.channel
- channel1.dtx_demarcation_select()
-
- channel2 = self.client.channel(2)
- channel2.channel_open()
- channel2.dtx_demarcation_select()
-
- #setup
- channel1.queue_declare(queue="one", exclusive=True)
- channel1.queue_declare(queue="two", exclusive=True)
- channel1.message_transfer(routing_key="one", message_id="a", body="DtxMessage")
- channel1.message_transfer(routing_key="two", message_id="b", body="DtxMessage")
-
- #create a xid
- tx = self.xid("dummy")
- #start work on one channel under that xid:
- channel1.dtx_demarcation_start(xid=tx)
- #then start on the other with the join flag set
- channel2.dtx_demarcation_start(xid=tx, join=True)
-
- #do work through each channel
- self.swap(channel1, "one", "two")#swap 'a' from 'one' to 'two'
- self.swap(channel2, "two", "one")#swap 'b' from 'two' to 'one'
-
- #mark end on both channels
- channel1.dtx_demarcation_end(xid=tx)
- channel2.dtx_demarcation_end(xid=tx)
-
- #commit and check
- channel1.dtx_coordination_commit(xid=tx, one_phase=True)
- self.assertMessageCount(1, "one")
- self.assertMessageCount(1, "two")
- self.assertMessageId("a", "two")
- self.assertMessageId("b", "one")
-
-
- def test_suspend_resume(self):
- """
- Test suspension and resumption of an association
- """
- channel = self.channel
- channel.dtx_demarcation_select()
-
- #setup
- channel.queue_declare(queue="one", exclusive=True)
- channel.queue_declare(queue="two", exclusive=True)
- channel.message_transfer(routing_key="one", message_id="a", body="DtxMessage")
- channel.message_transfer(routing_key="two", message_id="b", body="DtxMessage")
-
- tx = self.xid("dummy")
-
- channel.dtx_demarcation_start(xid=tx)
- self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two'
- channel.dtx_demarcation_end(xid=tx, suspend=True)
-
- channel.dtx_demarcation_start(xid=tx, resume=True)
- self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one'
- channel.dtx_demarcation_end(xid=tx)
-
- #commit and check
- channel.dtx_coordination_commit(xid=tx, one_phase=True)
- self.assertMessageCount(1, "one")
- self.assertMessageCount(1, "two")
- self.assertMessageId("a", "two")
- self.assertMessageId("b", "one")
-
- def test_end_suspend_and_fail(self):
- """
- Verify that the correct error is signalled if the suspend and
- fail flag are both set when disassociating a transaction from
- the channel
- """
- channel = self.channel
- channel.dtx_demarcation_select()
- tx = self.xid("suspend_and_fail")
- channel.dtx_demarcation_start(xid=tx)
- try:
- channel.dtx_demarcation_end(xid=tx, suspend=True, fail=True)
- self.fail("Suspend and fail both set, expected exception!")
- except Closed, e:
- self.assertConnectionException(503, e.args[0])
-
- #cleanup
- other = self.connect()
- channel = other.channel(1)
- channel.channel_open()
- channel.dtx_coordination_rollback(xid=tx)
- channel.channel_close()
- other.close()
-
-
- def test_end_unknown_xid(self):
- """
- Verifies that the correct exception is thrown when an attempt
- is made to end the association for a xid not previously
- associated with the channel
- """
- channel = self.channel
- channel.dtx_demarcation_select()
- tx = self.xid("unknown-xid")
- try:
- channel.dtx_demarcation_end(xid=tx)
- self.fail("Attempted to end association with unknown xid, expected exception!")
- except Closed, e:
- #FYI: this is currently *not* the exception specified, but I think the spec is wrong! Confirming...
- self.assertConnectionException(503, e.args[0])
-
- def test_end(self):
- """
- Verify that the association is terminated by end and subsequent
- operations are non-transactional
- """
- channel = self.client.channel(2)
- channel.channel_open()
- channel.queue_declare(queue="tx-queue", exclusive=True)
-
- #publish a message under a transaction
- channel.dtx_demarcation_select()
- tx = self.xid("dummy")
- channel.dtx_demarcation_start(xid=tx)
- channel.message_transfer(routing_key="tx-queue", message_id="one", body="DtxMessage")
- channel.dtx_demarcation_end(xid=tx)
-
- #now that association with txn is ended, publish another message
- channel.message_transfer(routing_key="tx-queue", message_id="two", body="DtxMessage")
-
- #check the second message is available, but not the first
- self.assertMessageCount(1, "tx-queue")
- channel.message_consume(queue="tx-queue", destination="results", no_ack=False)
- msg = self.client.queue("results").get(timeout=1)
- self.assertEqual("two", msg.message_id)
- channel.message_cancel(destination="results")
- #ack the message then close the channel
- msg.ok()
- channel.channel_close()
-
- channel = self.channel
- #commit the transaction and check that the first message (and
- #only the first message) is then delivered
- channel.dtx_coordination_commit(xid=tx, one_phase=True)
- self.assertMessageCount(1, "tx-queue")
- self.assertMessageId("one", "tx-queue")
-
- def test_invalid_commit_one_phase_true(self):
- """
- Test that a commit with one_phase = True is rejected if the
- transaction in question has already been prepared.
- """
- other = self.connect()
- tester = other.channel(1)
- tester.channel_open()
- tester.queue_declare(queue="dummy", exclusive=True)
- tester.dtx_demarcation_select()
- tx = self.xid("dummy")
- tester.dtx_demarcation_start(xid=tx)
- tester.message_transfer(routing_key="dummy", body="whatever")
- tester.dtx_demarcation_end(xid=tx)
- tester.dtx_coordination_prepare(xid=tx)
- failed = False
- try:
- tester.dtx_coordination_commit(xid=tx, one_phase=True)
- except Closed, e:
- failed = True
- error = e
-
- if failed:
- self.channel.dtx_coordination_rollback(xid=tx)
- self.assertConnectionException(503, e.args[0])
- else:
- tester.channel_close()
- other.close()
- self.fail("Invalid use of one_phase=True, expected exception!")
-
- def test_invalid_commit_one_phase_false(self):
- """
- Test that a commit with one_phase = False is rejected if the
- transaction in question has not yet been prepared.
- """
- """
- Test that a commit with one_phase = True is rejected if the
- transaction in question has already been prepared.
- """
- other = self.connect()
- tester = other.channel(1)
- tester.channel_open()
- tester.queue_declare(queue="dummy", exclusive=True)
- tester.dtx_demarcation_select()
- tx = self.xid("dummy")
- tester.dtx_demarcation_start(xid=tx)
- tester.message_transfer(routing_key="dummy", body="whatever")
- tester.dtx_demarcation_end(xid=tx)
- failed = False
- try:
- tester.dtx_coordination_commit(xid=tx, one_phase=False)
- except Closed, e:
- failed = True
- error = e
-
- if failed:
- self.channel.dtx_coordination_rollback(xid=tx)
- self.assertConnectionException(503, e.args[0])
- else:
- tester.channel_close()
- other.close()
- self.fail("Invalid use of one_phase=False, expected exception!")
-
- def test_implicit_end(self):
- """
- Test that an association is implicitly ended when the channel
- is closed (whether by exception or explicit client request)
- and the transaction in question is marked as rollback only.
- """
- channel1 = self.channel
- channel2 = self.client.channel(2)
- channel2.channel_open()
-
- #setup:
- channel2.queue_declare(queue="dummy", exclusive=True)
- channel2.message_transfer(routing_key="dummy", body="whatever")
- tx = self.xid("dummy")
-
- channel2.dtx_demarcation_select()
- channel2.dtx_demarcation_start(xid=tx)
- channel2.message_get(queue="dummy", destination="dummy")
- self.client.queue("dummy").get(timeout=1).ok()
- channel2.message_transfer(routing_key="dummy", body="whatever")
- channel2.channel_close()
-
- self.assertEqual(self.XA_RBROLLBACK, channel1.dtx_coordination_prepare(xid=tx).flags)
- channel1.dtx_coordination_rollback(xid=tx)
-
- def test_get_timeout(self):
- """
- Check that get-timeout returns the correct value, (and that a
- transaction with a timeout can complete normally)
- """
- channel = self.channel
- tx = self.xid("dummy")
-
- channel.dtx_demarcation_select()
- channel.dtx_demarcation_start(xid=tx)
- self.assertEqual(0, channel.dtx_coordination_get_timeout(xid=tx).timeout)
- channel.dtx_coordination_set_timeout(xid=tx, timeout=60)
- self.assertEqual(60, channel.dtx_coordination_get_timeout(xid=tx).timeout)
- self.assertEqual(self.XA_OK, channel.dtx_demarcation_end(xid=tx).flags)
- self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).flags)
-
- def test_set_timeout(self):
- """
- Test the timeout of a transaction results in the expected
- behaviour
- """
- #open new channel to allow self.channel to be used in checking te queue
- channel = self.client.channel(2)
- channel.channel_open()
- #setup:
- tx = self.xid("dummy")
- channel.queue_declare(queue="queue-a", exclusive=True)
- channel.queue_declare(queue="queue-b", exclusive=True)
- channel.message_transfer(routing_key="queue-a", message_id="timeout", body="DtxMessage")
-
- channel.dtx_demarcation_select()
- channel.dtx_demarcation_start(xid=tx)
- self.swap(channel, "queue-a", "queue-b")
- channel.dtx_coordination_set_timeout(xid=tx, timeout=2)
- sleep(3)
- #check that the work has been rolled back already
- self.assertMessageCount(1, "queue-a")
- self.assertMessageCount(0, "queue-b")
- self.assertMessageId("timeout", "queue-a")
- #check the correct codes are returned when we try to complete the txn
- self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_demarcation_end(xid=tx).flags)
- self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_coordination_rollback(xid=tx).flags)
-
-
-
- def test_recover(self):
- """
- Test basic recover behaviour
- """
- channel = self.channel
-
- channel.dtx_demarcation_select()
- channel.queue_declare(queue="dummy", exclusive=True)
-
- prepared = []
- for i in range(1, 10):
- tx = self.xid("tx%s" % (i))
- channel.dtx_demarcation_start(xid=tx)
- channel.message_transfer(routing_key="dummy", body="message%s" % (i))
- channel.dtx_demarcation_end(xid=tx)
- if i in [2, 5, 6, 8]:
- channel.dtx_coordination_prepare(xid=tx)
- prepared.append(tx)
- else:
- channel.dtx_coordination_rollback(xid=tx)
-
- indoubt = channel.dtx_coordination_recover().xids
- #convert indoubt table to a list of xids (note: this will change for 0-10)
- data = indoubt["xids"]
- xids = []
- pos = 0
- while pos < len(data):
- size = unpack("!B", data[pos])[0]
- start = pos + 1
- end = start + size
- xid = data[start:end]
- xids.append(xid)
- pos = end
-
- #rollback the prepared transactions returned by recover
- for x in xids:
- channel.dtx_coordination_rollback(xid=x)
-
- #validate against the expected list of prepared transactions
- actual = set(xids)
- expected = set(prepared)
- intersection = actual.intersection(expected)
-
- if intersection != expected:
- missing = expected.difference(actual)
- extra = actual.difference(expected)
- for x in missing:
- channel.dtx_coordination_rollback(xid=x)
- self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra))
-
- def xid(self, txid, branchqual = ''):
- return pack('LBB', 0, len(txid), len(branchqual)) + txid + branchqual
-
- def txswap(self, tx, id):
- channel = self.channel
- #declare two queues:
- channel.queue_declare(queue="queue-a", exclusive=True)
- channel.queue_declare(queue="queue-b", exclusive=True)
- #put message with specified id on one queue:
- channel.message_transfer(routing_key="queue-a", message_id=id, body="DtxMessage")
-
- #start the transaction:
- channel.dtx_demarcation_select()
- self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_start(xid=tx).flags)
-
- #'swap' the message from one queue to the other, under that transaction:
- self.swap(self.channel, "queue-a", "queue-b")
-
- #mark the end of the transactional work:
- self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_end(xid=tx).flags)
-
- def swap(self, channel, src, dest):
- #consume from src:
- channel.message_get(destination="temp-swap", queue=src)
- msg = self.client.queue("temp-swap").get(timeout=1)
- msg.ok();
-
- #re-publish to dest
- channel.message_transfer(routing_key=dest, message_id=msg.message_id, body=msg.body)
-
- def assertMessageCount(self, expected, queue):
- self.assertEqual(expected, self.channel.queue_declare(queue=queue, passive=True).message_count)
-
- def assertMessageId(self, expected, queue):
- self.channel.message_consume(queue=queue, destination="results", no_ack=True)
- self.assertEqual(expected, self.client.queue("results").get(timeout=1).message_id)
- self.channel.message_cancel(destination="results")