diff options
Diffstat (limited to 'qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py')
-rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py | 102 |
1 files changed, 51 insertions, 51 deletions
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py b/qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py index 2823385a3b..19a5c6a8d9 100644 --- a/qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/dtx.py @@ -6,9 +6,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -36,7 +36,7 @@ class DtxTests(TestBase010): and the appropriate result verified. The other tests enforce more specific rules and behaviour on a - per-method or per-field basis. + per-method or per-field basis. """ XA_RBROLLBACK = 1 @@ -49,8 +49,8 @@ class DtxTests(TestBase010): self.session = self.conn.session("dtx-session", 1) def test_simple_commit(self): - """ - Test basic one-phase commit behaviour. + """ + Test basic one-phase commit behaviour. """ guard = self.keepQueuesAlive(["queue-a", "queue-b"]) session = self.session @@ -73,8 +73,8 @@ class DtxTests(TestBase010): self.assertMessageId("commit", "queue-b") def test_simple_prepare_commit(self): - """ - Test basic two-phase commit behaviour. + """ + Test basic two-phase commit behaviour. """ guard = self.keepQueuesAlive(["queue-a", "queue-b"]) session = self.session @@ -100,8 +100,8 @@ class DtxTests(TestBase010): def test_simple_rollback(self): - """ - Test basic rollback behaviour. + """ + Test basic rollback behaviour. """ guard = self.keepQueuesAlive(["queue-a", "queue-b"]) session = self.session @@ -123,8 +123,8 @@ class DtxTests(TestBase010): self.assertMessageId("rollback", "queue-a") def test_simple_prepare_rollback(self): - """ - Test basic rollback behaviour after the transaction has been prepared. + """ + Test basic rollback behaviour after the transaction has been prepared. """ guard = self.keepQueuesAlive(["queue-a", "queue-b"]) session = self.session @@ -146,18 +146,18 @@ class DtxTests(TestBase010): #check result self.assertMessageCount(1, "queue-a") self.assertMessageCount(0, "queue-b") - self.assertMessageId("prepare-rollback", "queue-a") + self.assertMessageId("prepare-rollback", "queue-a") def test_select_required(self): """ check that an error is flagged if select is not issued before - start or end + start or end """ session = self.session tx = self.xid("dummy") try: session.dtx_start(xid=tx) - + #if we get here we have failed, but need to do some cleanup: session.dtx_end(xid=tx) session.dtx_rollback(xid=tx) @@ -197,10 +197,10 @@ class DtxTests(TestBase010): other.close() session1.dtx_end(xid=tx) session1.dtx_rollback(xid=tx) - + #verification: if failed: self.assertEquals(530, error.args[0].error_code) - else: self.fail("Xid already known, expected exception!") + else: self.fail("Xid already known, expected exception!") def test_forget_xid_on_completion(self): """ @@ -210,8 +210,8 @@ class DtxTests(TestBase010): #do some transactional work & complete the transaction self.test_simple_commit() # session has been reset, so reselect for use with dtx - self.session.dtx_select() - + self.session.dtx_select() + #start association for the same xid as the previously completed txn tx = self.xid("my-xid") self.session.dtx_start(xid=tx) @@ -237,9 +237,9 @@ class DtxTests(TestBase010): self.assertEquals(503, e.args[0].error_code) def test_start_join(self): - """ + """ Verify 'join' behaviour, where a session is associated with a - transaction that is already associated with another session. + transaction that is already associated with another session. """ guard = self.keepQueuesAlive(["one", "two"]) #create two sessions & select them for use with dtx: @@ -269,14 +269,14 @@ class DtxTests(TestBase010): #mark end on both sessions session1.dtx_end(xid=tx) session2.dtx_end(xid=tx) - + #commit and check session1.dtx_commit(xid=tx, one_phase=True) self.assertMessageCount(1, "one") self.assertMessageCount(1, "two") self.assertMessageId("a", "two") self.assertMessageId("b", "one") - + def test_suspend_resume(self): """ @@ -300,7 +300,7 @@ class DtxTests(TestBase010): session.dtx_start(xid=tx, resume=True) self.swap(session, "two", "one")#swap 'b' from 'two' to 'one' session.dtx_end(xid=tx) - + #commit and check session.dtx_commit(xid=tx, one_phase=True) self.assertMessageCount(1, "one") @@ -308,7 +308,7 @@ class DtxTests(TestBase010): self.assertMessageId("a", "two") self.assertMessageId("b", "one") - def test_suspend_start_end_resume(self): + def test_suspend_start_end_resume(self): """ Test suspension and resumption of an association with work done on another transaction when the first transaction is @@ -332,7 +332,7 @@ class DtxTests(TestBase010): session.dtx_start(xid=tx, resume=True) self.swap(session, "two", "one")#swap 'b' from 'two' to 'one' session.dtx_end(xid=tx) - + #commit and check session.dtx_commit(xid=tx, one_phase=True) self.assertMessageCount(1, "one") @@ -341,10 +341,10 @@ class DtxTests(TestBase010): self.assertMessageId("b", "one") def test_end_suspend_and_fail(self): - """ + """ Verify that the correct error is signalled if the suspend and fail flag are both set when disassociating a transaction from - the session + the session """ session = self.session session.dtx_select() @@ -356,16 +356,16 @@ class DtxTests(TestBase010): except SessionException, e: self.assertEquals(503, e.args[0].error_code) - #cleanup + #cleanup other = self.connect() session = other.session("cleanup", 1) session.dtx_rollback(xid=tx) session.close() other.close() - + def test_end_unknown_xid(self): - """ + """ Verifies that the correct exception is thrown when an attempt is made to end the association for a xid not previously associated with the session @@ -382,7 +382,7 @@ class DtxTests(TestBase010): def test_end(self): """ Verify that the association is terminated by end and subsequent - operations are non-transactional + operations are non-transactional """ guard = self.keepQueuesAlive(["tx-queue"]) session = self.conn.session("alternate", 1) @@ -408,7 +408,7 @@ class DtxTests(TestBase010): session.message_accept(RangedSet(msg.id)) session.close() - session = self.session + session = self.session #commit the transaction and check that the first message (and #only the first message) is then delivered session.dtx_commit(xid=tx, one_phase=True) @@ -418,7 +418,7 @@ class DtxTests(TestBase010): def test_invalid_commit_one_phase_true(self): """ Test that a commit with one_phase = True is rejected if the - transaction in question has already been prepared. + transaction in question has already been prepared. """ other = self.connect() tester = other.session("tester", 1) @@ -447,7 +447,7 @@ class DtxTests(TestBase010): def test_invalid_commit_one_phase_false(self): """ Test that a commit with one_phase = False is rejected if the - transaction in question has not yet been prepared. + transaction in question has not yet been prepared. """ other = self.connect() tester = other.session("tester", 1) @@ -474,7 +474,7 @@ class DtxTests(TestBase010): def test_invalid_commit_not_ended(self): """ - Test that a commit fails if the xid is still associated with a session. + Test that a commit fails if the xid is still associated with a session. """ other = self.connect() tester = other.session("tester", 1) @@ -502,7 +502,7 @@ class DtxTests(TestBase010): def test_invalid_rollback_not_ended(self): """ - Test that a rollback fails if the xid is still associated with a session. + Test that a rollback fails if the xid is still associated with a session. """ other = self.connect() tester = other.session("tester", 1) @@ -531,7 +531,7 @@ class DtxTests(TestBase010): def test_invalid_prepare_not_ended(self): """ - Test that a prepare fails if the xid is still associated with a session. + Test that a prepare fails if the xid is still associated with a session. """ other = self.connect() tester = other.session("tester", 1) @@ -586,9 +586,9 @@ class DtxTests(TestBase010): session1.dtx_rollback(xid=tx) def test_get_timeout(self): - """ + """ Check that get-timeout returns the correct value, (and that a - transaction with a timeout can complete normally) + transaction with a timeout can complete normally) """ session = self.session tx = self.xid("dummy") @@ -599,12 +599,12 @@ class DtxTests(TestBase010): session.dtx_set_timeout(xid=tx, timeout=60) self.assertEqual(60, session.dtx_get_timeout(xid=tx).timeout) self.assertEqual(self.XA_OK, session.dtx_end(xid=tx).status) - self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status) - + self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status) + def test_set_timeout(self): - """ + """ Test the timeout of a transaction results in the expected - behaviour + behaviour """ guard = self.keepQueuesAlive(["queue-a", "queue-b"]) @@ -627,7 +627,7 @@ class DtxTests(TestBase010): self.assertMessageId("timeout", "queue-a") #check the correct codes are returned when we try to complete the txn self.assertEqual(self.XA_RBTIMEOUT, session.dtx_end(xid=tx).status) - self.assertEqual(self.XA_RBTIMEOUT, session.dtx_rollback(xid=tx).status) + self.assertEqual(self.XA_RBTIMEOUT, session.dtx_rollback(xid=tx).status) @@ -649,20 +649,20 @@ class DtxTests(TestBase010): if i in [2, 5, 6, 8]: session.dtx_prepare(xid=tx) prepared.append(tx) - else: + else: session.dtx_rollback(xid=tx) xids = session.dtx_recover().in_doubt - + #rollback the prepared transactions returned by recover for x in xids: - session.dtx_rollback(xid=x) + session.dtx_rollback(xid=x) #validate against the expected list of prepared transactions actual = set([x.global_id for x in xids]) #TODO: come up with nicer way to test these expected = set([x.global_id for x in prepared]) intersection = actual.intersection(expected) - + if intersection != expected: missing = expected.difference(actual) extra = actual.difference(expected) @@ -723,7 +723,7 @@ class DtxTests(TestBase010): session.message_transfer(message=Message(dp, mp, "DtxMessage")) #start the transaction: - session.dtx_select() + session.dtx_select() self.assertEqual(self.XA_OK, self.session.dtx_start(xid=tx).status) #'swap' the message from one queue to the other, under that transaction: @@ -760,7 +760,7 @@ class DtxTests(TestBase010): def getMessageProperty(self, msg, prop): for h in msg.headers: if hasattr(h, prop): return getattr(h, prop) - return None + return None def keepQueuesAlive(self, names): session = self.conn.session("nasty", 99) @@ -768,7 +768,7 @@ class DtxTests(TestBase010): session.queue_declare(queue=n, auto_delete=True) session.message_subscribe(destination=n, queue=n) return session - + def createMessage(self, session, key, id, body): dp=session.delivery_properties(routing_key=key) mp=session.message_properties(correlation_id=id) |