# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # from qpid.client import Client, Closed from qpid.queue import Empty from qpid.datatypes import Message, RangedSet from qpid.session import SessionException from qpid.testlib import TestBase010 from qpid.compat import set from struct import pack, unpack from time import sleep class DtxTests(TestBase010): """ Tests for the amqp dtx related classes. Tests of the form test_simple_xxx test the basic transactional behaviour. The approach here is to 'swap' a message from one queue to another by consuming and re-publishing in the same transaction. That transaction is then completed in different ways and the appropriate result verified. The other tests enforce more specific rules and behaviour on a per-method or per-field basis. """ XA_RBROLLBACK = 1 XA_RBTIMEOUT = 2 XA_OK = 0 tx_counter = 0 def reset_channel(self): self.session.close() self.session = self.conn.session("dtx-session", 1) def test_simple_commit(self): """ Test basic one-phase commit behaviour. """ guard = self.keepQueuesAlive(["queue-a", "queue-b"]) session = self.session tx = self.xid("my-xid") self.txswap(tx, "commit") #neither queue should have any messages accessible self.assertMessageCount(0, "queue-a") self.assertMessageCount(0, "queue-b") #commit self.assertEqual(self.XA_OK, session.dtx_commit(xid=tx, one_phase=True).status) #should close and reopen session to ensure no unacked messages are held self.reset_channel() #check result self.assertMessageCount(0, "queue-a") self.assertMessageCount(1, "queue-b") self.assertMessageId("commit", "queue-b") def test_simple_prepare_commit(self): """ Test basic two-phase commit behaviour. """ guard = self.keepQueuesAlive(["queue-a", "queue-b"]) session = self.session tx = self.xid("my-xid") self.txswap(tx, "prepare-commit") #prepare self.assertEqual(self.XA_OK, session.dtx_prepare(xid=tx).status) #neither queue should have any messages accessible self.assertMessageCount(0, "queue-a") self.assertMessageCount(0, "queue-b") #commit self.assertEqual(self.XA_OK, session.dtx_commit(xid=tx, one_phase=False).status) self.reset_channel() #check result self.assertMessageCount(0, "queue-a") self.assertMessageCount(1, "queue-b") self.assertMessageId("prepare-commit", "queue-b") def test_simple_rollback(self): """ Test basic rollback behaviour. """ guard = self.keepQueuesAlive(["queue-a", "queue-b"]) session = self.session tx = self.xid("my-xid") self.txswap(tx, "rollback") #neither queue should have any messages accessible self.assertMessageCount(0, "queue-a") self.assertMessageCount(0, "queue-b") #rollback self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status) self.reset_channel() #check result self.assertMessageCount(1, "queue-a") self.assertMessageCount(0, "queue-b") self.assertMessageId("rollback", "queue-a") def test_simple_prepare_rollback(self): """ Test basic rollback behaviour after the transaction has been prepared. """ guard = self.keepQueuesAlive(["queue-a", "queue-b"]) session = self.session tx = self.xid("my-xid") self.txswap(tx, "prepare-rollback") #prepare self.assertEqual(self.XA_OK, session.dtx_prepare(xid=tx).status) #neither queue should have any messages accessible self.assertMessageCount(0, "queue-a") self.assertMessageCount(0, "queue-b") #rollback self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status) self.reset_channel() #check result self.assertMessageCount(1, "queue-a") self.assertMessageCount(0, "queue-b") self.assertMessageId("prepare-rollback", "queue-a") def test_select_required(self): """ check that an error is flagged if select is not issued before start or end """ session = self.session tx = self.xid("dummy") try: session.dtx_start(xid=tx) #if we get here we have failed, but need to do some cleanup: session.dtx_end(xid=tx) session.dtx_rollback(xid=tx) self.fail("Session not selected for use with dtx, expected exception!") except SessionException, e: self.assertEquals(503, e.args[0].error_code) def test_start_already_known(self): """ Verify that an attempt to start an association with a transaction that is already known is not allowed (unless the join flag is set). """ #create two sessions on different connection & select them for use with dtx: session1 = self.session session1.dtx_select() other = self.connect() session2 = other.session("other", 0) session2.dtx_select() #create a xid tx = self.xid("dummy") #start work on one session under that xid: session1.dtx_start(xid=tx) #then start on the other without the join set failed = False try: session2.dtx_start(xid=tx) except SessionException, e: failed = True error = e #cleanup: if not failed: session2.dtx_end(xid=tx) other.close() session1.dtx_end(xid=tx) session1.dtx_rollback(xid=tx) #verification: if failed: self.assertEquals(530, error.args[0].error_code) else: self.fail("Xid already known, expected exception!") def test_forget_xid_on_completion(self): """ Verify that a xid is 'forgotten' - and can therefore be used again - once it is completed. """ #do some transactional work & complete the transaction self.test_simple_commit() # session has been reset, so reselect for use with dtx self.session.dtx_select() #start association for the same xid as the previously completed txn tx = self.xid("my-xid") self.session.dtx_start(xid=tx) self.session.dtx_end(xid=tx) self.session.dtx_rollback(xid=tx) def test_start_join_and_resume(self): """ Ensure the correct error is signalled when both the join and resume flags are set on starting an association between a session and a transcation. """ session = self.session session.dtx_select() tx = self.xid("dummy") try: session.dtx_start(xid=tx, join=True, resume=True) #failed, but need some cleanup: session.dtx_end(xid=tx) session.dtx_rollback(xid=tx) self.fail("Join and resume both set, expected exception!") except SessionException, e: self.assertEquals(503, e.args[0].error_code) def test_start_join(self): """ Verify 'join' behaviour, where a session is associated with a transaction that is already associated with another session. """ guard = self.keepQueuesAlive(["one", "two"]) #create two sessions & select them for use with dtx: session1 = self.session session1.dtx_select() session2 = self.conn.session("second", 2) session2.dtx_select() #setup session1.queue_declare(queue="one", auto_delete=True) session1.queue_declare(queue="two", auto_delete=True) session1.message_transfer(self.createMessage(session1, "one", "a", "DtxMessage")) session1.message_transfer(self.createMessage(session1, "two", "b", "DtxMessage")) #create a xid tx = self.xid("dummy") #start work on one session under that xid: session1.dtx_start(xid=tx) #then start on the other with the join flag set session2.dtx_start(xid=tx, join=True) #do work through each session self.swap(session1, "one", "two")#swap 'a' from 'one' to 'two' self.swap(session2, "two", "one")#swap 'b' from 'two' to 'one' #mark end on both sessions session1.dtx_end(xid=tx) session2.dtx_end(xid=tx) #commit and check session1.dtx_commit(xid=tx, one_phase=True) self.assertMessageCount(1, "one") self.assertMessageCount(1, "two") self.assertMessageId("a", "two") self.assertMessageId("b", "one") def test_suspend_resume(self): """ Test suspension and resumption of an association """ session = self.session session.dtx_select() #setup session.queue_declare(queue="one", exclusive=True, auto_delete=True) session.queue_declare(queue="two", exclusive=True, auto_delete=True) session.message_transfer(self.createMessage(session, "one", "a", "DtxMessage")) session.message_transfer(self.createMessage(session, "two", "b", "DtxMessage")) tx = self.xid("dummy") session.dtx_start(xid=tx) self.swap(session, "one", "two")#swap 'a' from 'one' to 'two' session.dtx_end(xid=tx, suspend=True) session.dtx_start(xid=tx, resume=True) self.swap(session, "two", "one")#swap 'b' from 'two' to 'one' session.dtx_end(xid=tx) #commit and check session.dtx_commit(xid=tx, one_phase=True) self.assertMessageCount(1, "one") self.assertMessageCount(1, "two") self.assertMessageId("a", "two") self.assertMessageId("b", "one") def test_suspend_start_end_resume(self): """ Test suspension and resumption of an association with work done on another transaction when the first transaction is suspended """ session = self.session session.dtx_select() #setup session.queue_declare(queue="one", exclusive=True, auto_delete=True) session.queue_declare(queue="two", exclusive=True, auto_delete=True) session.message_transfer(self.createMessage(session, "one", "a", "DtxMessage")) session.message_transfer(self.createMessage(session, "two", "b", "DtxMessage")) tx = self.xid("dummy") session.dtx_start(xid=tx) self.swap(session, "one", "two")#swap 'a' from 'one' to 'two' session.dtx_end(xid=tx, suspend=True) session.dtx_start(xid=tx, resume=True) self.swap(session, "two", "one")#swap 'b' from 'two' to 'one' session.dtx_end(xid=tx) #commit and check session.dtx_commit(xid=tx, one_phase=True) self.assertMessageCount(1, "one") self.assertMessageCount(1, "two") self.assertMessageId("a", "two") self.assertMessageId("b", "one") def test_end_suspend_and_fail(self): """ Verify that the correct error is signalled if the suspend and fail flag are both set when disassociating a transaction from the session """ session = self.session session.dtx_select() tx = self.xid("suspend_and_fail") session.dtx_start(xid=tx) try: session.dtx_end(xid=tx, suspend=True, fail=True) self.fail("Suspend and fail both set, expected exception!") except SessionException, e: self.assertEquals(503, e.args[0].error_code) #cleanup other = self.connect() session = other.session("cleanup", 1) session.dtx_rollback(xid=tx) session.close() other.close() def test_end_unknown_xid(self): """ Verifies that the correct exception is thrown when an attempt is made to end the association for a xid not previously associated with the session """ session = self.session session.dtx_select() tx = self.xid("unknown-xid") try: session.dtx_end(xid=tx) self.fail("Attempted to end association with unknown xid, expected exception!") except SessionException, e: self.assertEquals(409, e.args[0].error_code) def test_end(self): """ Verify that the association is terminated by end and subsequent operations are non-transactional """ guard = self.keepQueuesAlive(["tx-queue"]) session = self.conn.session("alternate", 1) #publish a message under a transaction session.dtx_select() tx = self.xid("dummy") session.dtx_start(xid=tx) session.message_transfer(self.createMessage(session, "tx-queue", "one", "DtxMessage")) session.dtx_end(xid=tx) #now that association with txn is ended, publish another message session.message_transfer(self.createMessage(session, "tx-queue", "two", "DtxMessage")) #check the second message is available, but not the first self.assertMessageCount(1, "tx-queue") self.subscribe(session, queue="tx-queue", destination="results") msg = session.incoming("results").get(timeout=1) self.assertEqual("two", self.getMessageProperty(msg, 'correlation_id')) session.message_cancel(destination="results") #ack the message then close the session session.message_accept(RangedSet(msg.id)) session.close() session = self.session #commit the transaction and check that the first message (and #only the first message) is then delivered session.dtx_commit(xid=tx, one_phase=True) self.assertMessageCount(1, "tx-queue") self.assertMessageId("one", "tx-queue") def test_invalid_commit_one_phase_true(self): """ Test that a commit with one_phase = True is rejected if the transaction in question has already been prepared. """ other = self.connect() tester = other.session("tester", 1) tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True) tester.dtx_select() tx = self.xid("dummy") tester.dtx_start(xid=tx) tester.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever")) tester.dtx_end(xid=tx) tester.dtx_prepare(xid=tx) failed = False try: tester.dtx_commit(xid=tx, one_phase=True) except SessionException, e: failed = True error = e if failed: self.session.dtx_rollback(xid=tx) self.assertEquals(409, error.args[0].error_code) else: tester.close() other.close() self.fail("Invalid use of one_phase=True, expected exception!") def test_invalid_commit_one_phase_false(self): """ Test that a commit with one_phase = False is rejected if the transaction in question has not yet been prepared. """ other = self.connect() tester = other.session("tester", 1) tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True) tester.dtx_select() tx = self.xid("dummy") tester.dtx_start(xid=tx) tester.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever")) tester.dtx_end(xid=tx) failed = False try: tester.dtx_commit(xid=tx, one_phase=False) except SessionException, e: failed = True error = e if failed: self.session.dtx_rollback(xid=tx) self.assertEquals(409, error.args[0].error_code) else: tester.close() other.close() self.fail("Invalid use of one_phase=False, expected exception!") def test_invalid_commit_not_ended(self): """ Test that a commit fails if the xid is still associated with a session. """ other = self.connect() tester = other.session("tester", 1) self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True) self.session.dtx_select() tx = self.xid("dummy") self.session.dtx_start(xid=tx) self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever")) failed = False try: tester.dtx_commit(xid=tx, one_phase=False) except SessionException, e: failed = True error = e if failed: self.session.dtx_end(xid=tx) self.session.dtx_rollback(xid=tx) self.assertEquals(409, error.args[0].error_code) else: tester.close() other.close() self.fail("Commit should fail as xid is still associated!") def test_invalid_rollback_not_ended(self): """ Test that a rollback fails if the xid is still associated with a session. """ other = self.connect() tester = other.session("tester", 1) self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True) self.session.dtx_select() tx = self.xid("dummy") self.session.dtx_start(xid=tx) self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever")) failed = False try: tester.dtx_rollback(xid=tx) except SessionException, e: failed = True error = e if failed: self.session.dtx_end(xid=tx) self.session.dtx_rollback(xid=tx) self.assertEquals(409, error.args[0].error_code) else: tester.close() other.close() self.fail("Rollback should fail as xid is still associated!") def test_invalid_prepare_not_ended(self): """ Test that a prepare fails if the xid is still associated with a session. """ other = self.connect() tester = other.session("tester", 1) self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True) self.session.dtx_select() tx = self.xid("dummy") self.session.dtx_start(xid=tx) self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever")) failed = False try: tester.dtx_prepare(xid=tx) except SessionException, e: failed = True error = e if failed: self.session.dtx_end(xid=tx) self.session.dtx_rollback(xid=tx) self.assertEquals(409, error.args[0].error_code) else: tester.close() other.close() self.fail("Rollback should fail as xid is still associated!") def test_implicit_end(self): """ Test that an association is implicitly ended when the session is closed (whether by exception or explicit client request) and the transaction in question is marked as rollback only. """ session1 = self.session session2 = self.conn.session("other", 2) #setup: session2.queue_declare(queue="dummy", exclusive=True, auto_delete=True) session2.message_transfer(self.createMessage(session2, "dummy", "a", "whatever")) tx = self.xid("dummy") session2.dtx_select() session2.dtx_start(xid=tx) session2.message_subscribe(queue="dummy", destination="dummy") session2.message_flow(destination="dummy", unit=session2.credit_unit.message, value=1) session2.message_flow(destination="dummy", unit=session2.credit_unit.byte, value=0xFFFFFFFFL) msg = session2.incoming("dummy").get(timeout=1) session2.message_accept(RangedSet(msg.id)) session2.message_cancel(destination="dummy") session2.message_transfer(self.createMessage(session2, "dummy", "b", "whatever")) session2.close() self.assertEqual(self.XA_RBROLLBACK, session1.dtx_prepare(xid=tx).status) session1.dtx_rollback(xid=tx) def test_get_timeout(self): """ Check that get-timeout returns the correct value, (and that a transaction with a timeout can complete normally) """ session = self.session tx = self.xid("dummy") session.dtx_select() session.dtx_start(xid=tx) # below test checks for default value of dtx-default-timeout broker option self.assertEqual(60, session.dtx_get_timeout(xid=tx).timeout) session.dtx_set_timeout(xid=tx, timeout=200) self.assertEqual(200, session.dtx_get_timeout(xid=tx).timeout) self.assertEqual(self.XA_OK, session.dtx_end(xid=tx).status) self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status) def test_set_timeout(self): """ Test the timeout of a transaction results in the expected behaviour """ guard = self.keepQueuesAlive(["queue-a", "queue-b"]) #open new session to allow self.session to be used in checking the queue session = self.conn.session("worker", 1) #setup: tx = self.xid("dummy") session.queue_declare(queue="queue-a", auto_delete=True) session.queue_declare(queue="queue-b", auto_delete=True) session.message_transfer(self.createMessage(session, "queue-a", "timeout", "DtxMessage")) session.dtx_select() session.dtx_start(xid=tx) self.swap(session, "queue-a", "queue-b") session.dtx_set_timeout(xid=tx, timeout=2) sleep(3) #check that the work has been rolled back already self.assertMessageCount(1, "queue-a") self.assertMessageCount(0, "queue-b") self.assertMessageId("timeout", "queue-a") #check the correct codes are returned when we try to complete the txn self.assertEqual(self.XA_RBTIMEOUT, session.dtx_end(xid=tx).status) self.assertEqual(self.XA_RBTIMEOUT, session.dtx_rollback(xid=tx).status) def test_set_timeout_too_high(self): """ Test the timeout can't be more than --dtx-max-timeout broker option """ session = self.session tx = self.xid("dummy") session.dtx_select() session.dtx_start(xid=tx) try: session.dtx_set_timeout(xid=tx, timeout=3601) except SessionException, e: self.assertEquals(542, e.args[0].error_code) def test_recover(self): """ Test basic recover behaviour """ session = self.session session.dtx_select() session.queue_declare(queue="dummy", exclusive=True, auto_delete=True) prepared = [] for i in range(1, 10): tx = self.xid("tx%s" % (i)) session.dtx_start(xid=tx) session.message_transfer(self.createMessage(session, "dummy", "message%s" % (i), "message%s" % (i))) session.dtx_end(xid=tx) if i in [2, 5, 6, 8]: session.dtx_prepare(xid=tx) prepared.append(tx) else: session.dtx_rollback(xid=tx) xids = session.dtx_recover().in_doubt #rollback the prepared transactions returned by recover for x in xids: session.dtx_rollback(xid=x) #validate against the expected list of prepared transactions actual = set([x.global_id for x in xids]) #TODO: come up with nicer way to test these expected = set([x.global_id for x in prepared]) intersection = actual.intersection(expected) if intersection != expected: missing = expected.difference(actual) extra = actual.difference(expected) self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra)) def test_bad_resume(self): """ Test that a resume on a session not selected for use with dtx fails """ session = self.session try: session.dtx_start(resume=True) except SessionException, e: self.assertEquals(503, e.args[0].error_code) def test_prepare_unknown(self): session = self.session try: session.dtx_prepare(xid=self.xid("unknown")) except SessionException, e: self.assertEquals(404, e.args[0].error_code) def test_commit_unknown(self): session = self.session try: session.dtx_commit(xid=self.xid("unknown")) except SessionException, e: self.assertEquals(404, e.args[0].error_code) def test_rollback_unknown(self): session = self.session try: session.dtx_rollback(xid=self.xid("unknown")) except SessionException, e: self.assertEquals(404, e.args[0].error_code) def test_get_timeout_unknown(self): session = self.session try: session.dtx_get_timeout(xid=self.xid("unknown")) except SessionException, e: self.assertEquals(404, e.args[0].error_code) def xid(self, txid): DtxTests.tx_counter += 1 branchqual = "v%s" % DtxTests.tx_counter return self.session.xid(format=0, global_id=txid, branch_id=branchqual) def txswap(self, tx, id): session = self.session #declare two queues: session.queue_declare(queue="queue-a", auto_delete=True) session.queue_declare(queue="queue-b", auto_delete=True) #put message with specified id on one queue: dp=session.delivery_properties(routing_key="queue-a") mp=session.message_properties(correlation_id=id) session.message_transfer(message=Message(dp, mp, "DtxMessage")) #start the transaction: session.dtx_select() self.assertEqual(self.XA_OK, self.session.dtx_start(xid=tx).status) #'swap' the message from one queue to the other, under that transaction: self.swap(self.session, "queue-a", "queue-b") #mark the end of the transactional work: self.assertEqual(self.XA_OK, self.session.dtx_end(xid=tx).status) def swap(self, session, src, dest): #consume from src: session.message_subscribe(destination="temp-swap", queue=src) session.message_flow(destination="temp-swap", unit=session.credit_unit.message, value=1) session.message_flow(destination="temp-swap", unit=session.credit_unit.byte, value=0xFFFFFFFFL) msg = session.incoming("temp-swap").get(timeout=1) session.message_cancel(destination="temp-swap") session.message_accept(RangedSet(msg.id)) #todo: also complete at this point? #re-publish to dest: dp=session.delivery_properties(routing_key=dest) mp=session.message_properties(correlation_id=self.getMessageProperty(msg, 'correlation_id')) session.message_transfer(message=Message(dp, mp, msg.body)) def assertMessageCount(self, expected, queue): self.assertEqual(expected, self.session.queue_query(queue=queue).message_count) def assertMessageId(self, expected, queue): self.session.message_subscribe(queue=queue, destination="results") self.session.message_flow(destination="results", unit=self.session.credit_unit.message, value=1) self.session.message_flow(destination="results", unit=self.session.credit_unit.byte, value=0xFFFFFFFFL) self.assertEqual(expected, self.getMessageProperty(self.session.incoming("results").get(timeout=1), 'correlation_id')) self.session.message_cancel(destination="results") def getMessageProperty(self, msg, prop): for h in msg.headers: if hasattr(h, prop): return getattr(h, prop) return None def keepQueuesAlive(self, names): session = self.conn.session("nasty", 99) for n in names: session.queue_declare(queue=n, auto_delete=True) session.message_subscribe(destination=n, queue=n) return session def createMessage(self, session, key, id, body): dp=session.delivery_properties(routing_key=key) mp=session.message_properties(correlation_id=id) session.message_transfer(message=Message(dp, mp, body))