diff options
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 95 |
1 files changed, 73 insertions, 22 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index de5dfb4b10..55715639a4 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -1287,50 +1287,101 @@ class StoreTests(BrokerTest): cluster[0].assert_browse("q2", ["hello", "end"]) cluster[1].assert_browse_backup("q2", ["hello", "end"]) +def open_read(name): + with open(name) as f: return f.read() + class TransactionTests(BrokerTest): + load_store=["--load-module", BrokerTest.test_store_lib] + def tx_simple_setup(self, broker): - """Start a transaction: receive 'foo' from 'a' and send 'bar' to 'b'""" + """Start a transaction, remove messages from queue a, add messages to queue b""" c = broker.connect() - c.session().sender("a;{create:always}").send("foo") + # Send messages to a, no transaction. + sa = c.session().sender("a;{create:always,node:{durable:true}}") + tx_msgs = ["x","y","z"] + for m in tx_msgs: sa.send(Message(content=m, durable=True)) + + # Receive messages from a, in transaction. tx = c.session(transactional=True) - self.assertEqual("foo", tx.receiver("a").fetch(1).content) - tx.acknowledge(); - tx.sender("b;{create:always}").send("bar") + txr = tx.receiver("a") + tx_msgs2 = [txr.fetch(1).content for i in xrange(3)] + self.assertEqual(tx_msgs, tx_msgs2) + + # Send messages to b, transactional, mixed with non-transactional. + sb = c.session().sender("b;{create:always,node:{durable:true}}") + txs = tx.sender("b") + msgs = [str(i) for i in xrange(3)] + for tx_m,m in zip(tx_msgs2, msgs): + txs.send(tx_m); + sb.send(m) return tx def test_tx_simple_commit(self): - cluster = HaCluster(self, 2, args=["--log-enable=trace+:ha::"]) + cluster = HaCluster(self, 2, test_store=True) tx = self.tx_simple_setup(cluster[0]) + tx.sync() + + # NOTE: backup does not process transactional dequeues until prepare + cluster[1].assert_browse_backup("a", ["x","y","z"]) + cluster[1].assert_browse_backup("b", ['0', '1', '2']) + + tx.acknowledge() tx.commit() + tx.sync() + for b in cluster: b.assert_browse_backup("a", [], msg=b) - b.assert_browse_backup("b", ["bar"], msg=b) + b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b) + + # Check for expected actions on the store + expect = """<enqueue a x> +<enqueue a y> +<enqueue a z> +<begin tx 1> +<dequeue a x tx=1> +<dequeue a y tx=1> +<dequeue a z tx=1> +<commit tx=1> +""" + self.assertEqual(expect, open_read(cluster[0].store_log)) + self.assertEqual(expect, open_read(cluster[1].store_log)) def test_tx_simple_rollback(self): - cluster = HaCluster(self, 2) + cluster = HaCluster(self, 2, test_store=True) tx = self.tx_simple_setup(cluster[0]) + tx.acknowledge() tx.rollback() for b in cluster: - b.assert_browse_backup("a", ["foo"], msg=b) - b.assert_browse_backup("b", [], msg=b) + b.assert_browse_backup("a", ["x","y","z"], msg=b) + b.assert_browse_backup("b", ['0', '1', '2'], msg=b) + # Check for expected actions on the store + expect = """<enqueue a x> +<enqueue a y> +<enqueue a z> +""" + self.assertEqual(open_read(cluster[0].store_log), expect) + self.assertEqual(open_read(cluster[1].store_log), expect) def test_tx_simple_failover(self): - cluster = HaCluster(self, 2) + cluster = HaCluster(self, 2, test_store=True) tx = self.tx_simple_setup(cluster[0]) + tx.acknowledge() cluster.bounce(0) # Should cause roll-back + cluster[0].wait_status("ready") for b in cluster: - b.assert_browse_backup("a", ["foo"], msg=b) - b.assert_browse_backup("b", [], msg=b) - - def test_tx_simple_join(self): - cluster = HaCluster(self, 2) - tx = self.tx_simple_setup(cluster[0]) - cluster.bounce(1) # Should catch up with tx - tx.commit() - for b in cluster: - b.assert_browse_backup("a", [], msg=b) - b.assert_browse_backup("b", ["bar"], msg=b) + b.assert_browse_backup("a", ["x","y","z"], msg=b) + b.assert_browse_backup("b", ['0', '1', '2'], msg=b) + + # Check for expected actions on the store + expect = """<enqueue a x> +<enqueue a y> +<enqueue a z> +""" + self.assertEqual(open_read(cluster[0].store_log), expect) + self.assertEqual(open_read(cluster[1].store_log), expect) + +# FIXME aconway 2013-07-23: test with partial acknowledgement. if __name__ == "__main__": outdir = "ha_tests.tmp" |