diff options
Diffstat (limited to 'python/tests_0-10/dtx.py')
-rw-r--r-- | python/tests_0-10/dtx.py | 26 |
1 files changed, 22 insertions, 4 deletions
diff --git a/python/tests_0-10/dtx.py b/python/tests_0-10/dtx.py index c0d1bd2b74..2835d703ae 100644 --- a/python/tests_0-10/dtx.py +++ b/python/tests_0-10/dtx.py @@ -40,6 +40,11 @@ class DtxTests(TestBase): XA_RBROLLBACK = 1 XA_RBTIMEOUT = 2 XA_OK = 8 + tx_counter = 0 + + def reset_channel(self): + self.channel.channel_close() + self.channel.channel_open() def test_simple_commit(self): """ @@ -56,6 +61,9 @@ class DtxTests(TestBase): #commit self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=True).status) + #should close and reopen channel to ensure no unacked messages are held + self.reset_channel() + #check result self.assertMessageCount(0, "queue-a") self.assertMessageCount(1, "queue-b") @@ -79,6 +87,8 @@ class DtxTests(TestBase): #commit self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=False).status) + self.reset_channel() + #check result self.assertMessageCount(0, "queue-a") self.assertMessageCount(1, "queue-b") @@ -100,6 +110,8 @@ class DtxTests(TestBase): #rollback self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status) + self.reset_channel() + #check result self.assertMessageCount(1, "queue-a") self.assertMessageCount(0, "queue-b") @@ -123,6 +135,8 @@ class DtxTests(TestBase): #rollback self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status) + self.reset_channel() + #check result self.assertMessageCount(1, "queue-a") self.assertMessageCount(0, "queue-b") @@ -191,6 +205,8 @@ class DtxTests(TestBase): channel = self.channel #do some transactional work & complete the transaction self.test_simple_commit() + # channel has been reset, so reselect for use with dtx + channel.dtx_demarcation_select() #start association for the same xid as the previously completed txn tx = self.xid("my-xid") @@ -355,7 +371,7 @@ class DtxTests(TestBase): self.assertEqual("two", msg.message_id) channel.message_cancel(destination="results") #ack the message then close the channel - msg.ok() + msg.complete() channel.channel_close() channel = self.channel @@ -446,7 +462,7 @@ class DtxTests(TestBase): channel2.dtx_demarcation_select() channel2.dtx_demarcation_start(xid=tx) channel2.message_get(queue="dummy", destination="dummy") - self.client.queue("dummy").get(timeout=1).ok() + self.client.queue("dummy").get(timeout=1).complete() channel2.message_transfer(routing_key="dummy", body="whatever") channel2.channel_close() @@ -548,7 +564,9 @@ class DtxTests(TestBase): channel.dtx_coordination_rollback(xid=x) self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra)) - def xid(self, txid, branchqual = ''): + def xid(self, txid): + DtxTests.tx_counter += 1 + branchqual = "v%s" % DtxTests.tx_counter return pack('LBB', 0, len(txid), len(branchqual)) + txid + branchqual def txswap(self, tx, id): @@ -573,7 +591,7 @@ class DtxTests(TestBase): #consume from src: channel.message_get(destination="temp-swap", queue=src) msg = self.client.queue("temp-swap").get(timeout=1) - msg.ok(); + msg.complete(); #re-publish to dest channel.message_transfer(routing_key=dest, message_id=msg.message_id, body=msg.body) |