diff options
Diffstat (limited to 'qpid/cpp/src/tests/cluster_tests.py')
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 95 |
1 files changed, 62 insertions, 33 deletions
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index e67a691283..db08f118da 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -733,11 +733,11 @@ class DtxTestFixture: self.session.dtx_select() self.consumer = None - def start(self): - self.test.assertEqual(XA_OK, self.session.dtx_start(xid=self.xid).status) + def start(self, resume=False): + self.test.assertEqual(XA_OK, self.session.dtx_start(xid=self.xid, resume=resume).status) - def end(self): - self.test.assertEqual(XA_OK, self.session.dtx_end(xid=self.xid).status) + def end(self, suspend=False): + self.test.assertEqual(XA_OK, self.session.dtx_end(xid=self.xid, suspend=suspend).status) def prepare(self): self.test.assertEqual(XA_OK, self.session.dtx_prepare(xid=self.xid).status) @@ -767,10 +767,9 @@ class DtxTestFixture: return msg - def verify(self, cluster, messages): - for b in cluster: - self.test.assert_browse(b.connect().session(), self.name, messages) - + def verify(self, sessions, messages): + for s in sessions: + self.test.assert_browse(s, self.name, messages) class DtxTests(BrokerTest): @@ -783,12 +782,13 @@ class DtxTests(BrokerTest): # multiple brokers per test. cluster=self.cluster(1) + sessions = [cluster[0].connect().session()] # For verify # Transaction that will be open when new member joins, then committed. t1 = DtxTestFixture(self, cluster[0], "t1") t1.start() t1.send(["1", "2"]) - t1.verify(cluster, []) # Not visible outside of transaction + t1.verify(sessions, []) # Not visible outside of transaction # Transaction that will be open when new member joins, then rolled back. t2 = DtxTestFixture(self, cluster[0], "t2") @@ -801,7 +801,7 @@ class DtxTests(BrokerTest): t3.send(["1", "2"]) t3.end() t3.prepare() - t1.verify(cluster, []) # Not visible outside of transaction + t1.verify(sessions, []) # Not visible outside of transaction # Transaction that will be prepared when new member joins, then rolled back. t4 = DtxTestFixture(self, cluster[0], "t4") @@ -819,70 +819,99 @@ class DtxTests(BrokerTest): t6 = DtxTestFixture(self, cluster[0], "t6") t6.send(["a","b","c"]) t6.start() - t6.verify(cluster, ["a","b","c"]) self.assertEqual(t6.accept().body, "a"); - t6.verify(cluster, ["b","c"]) # Accept messages in a transaction before/after join then roll back t7 = DtxTestFixture(self, cluster[0], "t7") t7.send(["a","b","c"]) t7.start() - t7.verify(cluster, ["a","b","c"]) self.assertEqual(t7.accept().body, "a"); - t7.verify(cluster, ["b","c"]) + + # Suspended transaction across join. + t8 = DtxTestFixture(self, cluster[0], "t8") + t8.start() + t8.send(["x"]) + t8.end(suspend=True) # Start new member cluster.start() + sessions.append(cluster[1].connect().session()) # Commit t1 t1.send(["3","4"]) - t1.verify(cluster, []) + t1.verify(sessions, []) t1.end() t1.commit(one_phase=True) - t1.verify(cluster, ["1","2","3","4"]) + t1.verify(sessions, ["1","2","3","4"]) # Rollback t2 t2.send(["3","4"]) - t2.verify(cluster, []) t2.end() t2.rollback() - t2.verify(cluster, []) + t2.verify(sessions, []) # Commit t3 - t3.verify(cluster, []) t3.commit(one_phase=False) - t3.verify(cluster, ["1","2"]) + t3.verify(sessions, ["1","2"]) # Rollback t4 - t4.verify(cluster, []) t4.rollback() - t4.verify(cluster, []) + t4.verify(sessions, []) # Commit t5 t5.send(["3","4"]) - t5.verify(cluster, []) + t5.verify(sessions, []) t5.end() t5.commit(one_phase=True) - t5.verify(cluster, ["1","2","3","4"]) + t5.verify(sessions, ["1","2","3","4"]) - # Commit t7 - t6.verify(cluster, ["b", "c"]) + # Commit t6 self.assertEqual(t6.accept().body, "b"); - t6.verify(cluster, ["c"]) + t6.verify(sessions, ["c"]) t6.end() t6.commit(one_phase=True) - t6.verify(cluster, ["c"]) t6.session.close() # Make sure they're not requeued by the session. - t6.verify(cluster, ["c"]) + t6.verify(sessions, ["c"]) # Rollback t7 - t7.verify(cluster, ["b", "c"]) self.assertEqual(t7.accept().body, "b"); - t7.verify(cluster, ["c"]) t7.end() t7.rollback() - t7.verify(cluster, ["a", "b", "c"]) - + t7.verify(sessions, ["a", "b", "c"]) + + # Resume t8 + t8.start(resume=True) + t8.send(["y"]) + t8.end() + t8.commit(one_phase=True) + t8.verify(sessions, ["x","y"]) + + def test_dtx_failover_rollback(self): + """Kill a broker during a transaction, verify we roll back correctly""" + cluster=self.cluster(1, expect=EXPECT_EXIT_FAIL) + cluster.start(expect=EXPECT_RUNNING) + + # Test unprepared at crash + t1 = DtxTestFixture(self, cluster[0], "t1") + t1.send(["a"]) # Not in transaction + t1.start() + t1.send(["b"]) # In transaction + + # Test prepared at crash + t2 = DtxTestFixture(self, cluster[0], "t2") + t2.send(["a"]) # Not in transaction + t2.start() + t2.send(["b"]) # In transaction + t2.end() + t2.prepare() + + # Crash the broker + cluster[0].kill() + + # Transactional changes should not appear + s = cluster[1].connect().session(); + self.assert_browse(s, "t1", ["a"]) + self.assert_browse(s, "t2", ["a"]) class TxTests(BrokerTest): |