summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/cluster_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/cluster_tests.py')
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py95
1 files changed, 62 insertions, 33 deletions
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index e67a691283..db08f118da 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -733,11 +733,11 @@ class DtxTestFixture:
self.session.dtx_select()
self.consumer = None
- def start(self):
- self.test.assertEqual(XA_OK, self.session.dtx_start(xid=self.xid).status)
+ def start(self, resume=False):
+ self.test.assertEqual(XA_OK, self.session.dtx_start(xid=self.xid, resume=resume).status)
- def end(self):
- self.test.assertEqual(XA_OK, self.session.dtx_end(xid=self.xid).status)
+ def end(self, suspend=False):
+ self.test.assertEqual(XA_OK, self.session.dtx_end(xid=self.xid, suspend=suspend).status)
def prepare(self):
self.test.assertEqual(XA_OK, self.session.dtx_prepare(xid=self.xid).status)
@@ -767,10 +767,9 @@ class DtxTestFixture:
return msg
- def verify(self, cluster, messages):
- for b in cluster:
- self.test.assert_browse(b.connect().session(), self.name, messages)
-
+ def verify(self, sessions, messages):
+ for s in sessions:
+ self.test.assert_browse(s, self.name, messages)
class DtxTests(BrokerTest):
@@ -783,12 +782,13 @@ class DtxTests(BrokerTest):
# multiple brokers per test.
cluster=self.cluster(1)
+ sessions = [cluster[0].connect().session()] # For verify
# Transaction that will be open when new member joins, then committed.
t1 = DtxTestFixture(self, cluster[0], "t1")
t1.start()
t1.send(["1", "2"])
- t1.verify(cluster, []) # Not visible outside of transaction
+ t1.verify(sessions, []) # Not visible outside of transaction
# Transaction that will be open when new member joins, then rolled back.
t2 = DtxTestFixture(self, cluster[0], "t2")
@@ -801,7 +801,7 @@ class DtxTests(BrokerTest):
t3.send(["1", "2"])
t3.end()
t3.prepare()
- t1.verify(cluster, []) # Not visible outside of transaction
+ t1.verify(sessions, []) # Not visible outside of transaction
# Transaction that will be prepared when new member joins, then rolled back.
t4 = DtxTestFixture(self, cluster[0], "t4")
@@ -819,70 +819,99 @@ class DtxTests(BrokerTest):
t6 = DtxTestFixture(self, cluster[0], "t6")
t6.send(["a","b","c"])
t6.start()
- t6.verify(cluster, ["a","b","c"])
self.assertEqual(t6.accept().body, "a");
- t6.verify(cluster, ["b","c"])
# Accept messages in a transaction before/after join then roll back
t7 = DtxTestFixture(self, cluster[0], "t7")
t7.send(["a","b","c"])
t7.start()
- t7.verify(cluster, ["a","b","c"])
self.assertEqual(t7.accept().body, "a");
- t7.verify(cluster, ["b","c"])
+
+ # Suspended transaction across join.
+ t8 = DtxTestFixture(self, cluster[0], "t8")
+ t8.start()
+ t8.send(["x"])
+ t8.end(suspend=True)
# Start new member
cluster.start()
+ sessions.append(cluster[1].connect().session())
# Commit t1
t1.send(["3","4"])
- t1.verify(cluster, [])
+ t1.verify(sessions, [])
t1.end()
t1.commit(one_phase=True)
- t1.verify(cluster, ["1","2","3","4"])
+ t1.verify(sessions, ["1","2","3","4"])
# Rollback t2
t2.send(["3","4"])
- t2.verify(cluster, [])
t2.end()
t2.rollback()
- t2.verify(cluster, [])
+ t2.verify(sessions, [])
# Commit t3
- t3.verify(cluster, [])
t3.commit(one_phase=False)
- t3.verify(cluster, ["1","2"])
+ t3.verify(sessions, ["1","2"])
# Rollback t4
- t4.verify(cluster, [])
t4.rollback()
- t4.verify(cluster, [])
+ t4.verify(sessions, [])
# Commit t5
t5.send(["3","4"])
- t5.verify(cluster, [])
+ t5.verify(sessions, [])
t5.end()
t5.commit(one_phase=True)
- t5.verify(cluster, ["1","2","3","4"])
+ t5.verify(sessions, ["1","2","3","4"])
- # Commit t7
- t6.verify(cluster, ["b", "c"])
+ # Commit t6
self.assertEqual(t6.accept().body, "b");
- t6.verify(cluster, ["c"])
+ t6.verify(sessions, ["c"])
t6.end()
t6.commit(one_phase=True)
- t6.verify(cluster, ["c"])
t6.session.close() # Make sure they're not requeued by the session.
- t6.verify(cluster, ["c"])
+ t6.verify(sessions, ["c"])
# Rollback t7
- t7.verify(cluster, ["b", "c"])
self.assertEqual(t7.accept().body, "b");
- t7.verify(cluster, ["c"])
t7.end()
t7.rollback()
- t7.verify(cluster, ["a", "b", "c"])
-
+ t7.verify(sessions, ["a", "b", "c"])
+
+ # Resume t8
+ t8.start(resume=True)
+ t8.send(["y"])
+ t8.end()
+ t8.commit(one_phase=True)
+ t8.verify(sessions, ["x","y"])
+
+ def test_dtx_failover_rollback(self):
+ """Kill a broker during a transaction, verify we roll back correctly"""
+ cluster=self.cluster(1, expect=EXPECT_EXIT_FAIL)
+ cluster.start(expect=EXPECT_RUNNING)
+
+ # Test unprepared at crash
+ t1 = DtxTestFixture(self, cluster[0], "t1")
+ t1.send(["a"]) # Not in transaction
+ t1.start()
+ t1.send(["b"]) # In transaction
+
+ # Test prepared at crash
+ t2 = DtxTestFixture(self, cluster[0], "t2")
+ t2.send(["a"]) # Not in transaction
+ t2.start()
+ t2.send(["b"]) # In transaction
+ t2.end()
+ t2.prepare()
+
+ # Crash the broker
+ cluster[0].kill()
+
+ # Transactional changes should not appear
+ s = cluster[1].connect().session();
+ self.assert_browse(s, "t1", ["a"])
+ self.assert_browse(s, "t2", ["a"])
class TxTests(BrokerTest):