summaryrefslogtreecommitdiff
path: root/python/tests_0-10/dtx.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/tests_0-10/dtx.py')
-rw-r--r--python/tests_0-10/dtx.py26
1 files changed, 22 insertions, 4 deletions
diff --git a/python/tests_0-10/dtx.py b/python/tests_0-10/dtx.py
index c0d1bd2b74..2835d703ae 100644
--- a/python/tests_0-10/dtx.py
+++ b/python/tests_0-10/dtx.py
@@ -40,6 +40,11 @@ class DtxTests(TestBase):
XA_RBROLLBACK = 1
XA_RBTIMEOUT = 2
XA_OK = 8
+ tx_counter = 0
+
+ def reset_channel(self):
+ self.channel.channel_close()
+ self.channel.channel_open()
def test_simple_commit(self):
"""
@@ -56,6 +61,9 @@ class DtxTests(TestBase):
#commit
self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=True).status)
+ #should close and reopen channel to ensure no unacked messages are held
+ self.reset_channel()
+
#check result
self.assertMessageCount(0, "queue-a")
self.assertMessageCount(1, "queue-b")
@@ -79,6 +87,8 @@ class DtxTests(TestBase):
#commit
self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=False).status)
+ self.reset_channel()
+
#check result
self.assertMessageCount(0, "queue-a")
self.assertMessageCount(1, "queue-b")
@@ -100,6 +110,8 @@ class DtxTests(TestBase):
#rollback
self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status)
+ self.reset_channel()
+
#check result
self.assertMessageCount(1, "queue-a")
self.assertMessageCount(0, "queue-b")
@@ -123,6 +135,8 @@ class DtxTests(TestBase):
#rollback
self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status)
+ self.reset_channel()
+
#check result
self.assertMessageCount(1, "queue-a")
self.assertMessageCount(0, "queue-b")
@@ -191,6 +205,8 @@ class DtxTests(TestBase):
channel = self.channel
#do some transactional work & complete the transaction
self.test_simple_commit()
+ # channel has been reset, so reselect for use with dtx
+ channel.dtx_demarcation_select()
#start association for the same xid as the previously completed txn
tx = self.xid("my-xid")
@@ -355,7 +371,7 @@ class DtxTests(TestBase):
self.assertEqual("two", msg.message_id)
channel.message_cancel(destination="results")
#ack the message then close the channel
- msg.ok()
+ msg.complete()
channel.channel_close()
channel = self.channel
@@ -446,7 +462,7 @@ class DtxTests(TestBase):
channel2.dtx_demarcation_select()
channel2.dtx_demarcation_start(xid=tx)
channel2.message_get(queue="dummy", destination="dummy")
- self.client.queue("dummy").get(timeout=1).ok()
+ self.client.queue("dummy").get(timeout=1).complete()
channel2.message_transfer(routing_key="dummy", body="whatever")
channel2.channel_close()
@@ -548,7 +564,9 @@ class DtxTests(TestBase):
channel.dtx_coordination_rollback(xid=x)
self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra))
- def xid(self, txid, branchqual = ''):
+ def xid(self, txid):
+ DtxTests.tx_counter += 1
+ branchqual = "v%s" % DtxTests.tx_counter
return pack('LBB', 0, len(txid), len(branchqual)) + txid + branchqual
def txswap(self, tx, id):
@@ -573,7 +591,7 @@ class DtxTests(TestBase):
#consume from src:
channel.message_get(destination="temp-swap", queue=src)
msg = self.client.queue("temp-swap").get(timeout=1)
- msg.ok();
+ msg.complete();
#re-publish to dest
channel.message_transfer(routing_key=dest, message_id=msg.message_id, body=msg.body)