summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/ha_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py95
1 files changed, 73 insertions, 22 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index de5dfb4b10..55715639a4 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -1287,50 +1287,101 @@ class StoreTests(BrokerTest):
cluster[0].assert_browse("q2", ["hello", "end"])
cluster[1].assert_browse_backup("q2", ["hello", "end"])
+def open_read(name):
+ with open(name) as f: return f.read()
+
class TransactionTests(BrokerTest):
+ load_store=["--load-module", BrokerTest.test_store_lib]
+
def tx_simple_setup(self, broker):
- """Start a transaction: receive 'foo' from 'a' and send 'bar' to 'b'"""
+ """Start a transaction, remove messages from queue a, add messages to queue b"""
c = broker.connect()
- c.session().sender("a;{create:always}").send("foo")
+ # Send messages to a, no transaction.
+ sa = c.session().sender("a;{create:always,node:{durable:true}}")
+ tx_msgs = ["x","y","z"]
+ for m in tx_msgs: sa.send(Message(content=m, durable=True))
+
+ # Receive messages from a, in transaction.
tx = c.session(transactional=True)
- self.assertEqual("foo", tx.receiver("a").fetch(1).content)
- tx.acknowledge();
- tx.sender("b;{create:always}").send("bar")
+ txr = tx.receiver("a")
+ tx_msgs2 = [txr.fetch(1).content for i in xrange(3)]
+ self.assertEqual(tx_msgs, tx_msgs2)
+
+ # Send messages to b, transactional, mixed with non-transactional.
+ sb = c.session().sender("b;{create:always,node:{durable:true}}")
+ txs = tx.sender("b")
+ msgs = [str(i) for i in xrange(3)]
+ for tx_m,m in zip(tx_msgs2, msgs):
+ txs.send(tx_m);
+ sb.send(m)
return tx
def test_tx_simple_commit(self):
- cluster = HaCluster(self, 2, args=["--log-enable=trace+:ha::"])
+ cluster = HaCluster(self, 2, test_store=True)
tx = self.tx_simple_setup(cluster[0])
+ tx.sync()
+
+ # NOTE: backup does not process transactional dequeues until prepare
+ cluster[1].assert_browse_backup("a", ["x","y","z"])
+ cluster[1].assert_browse_backup("b", ['0', '1', '2'])
+
+ tx.acknowledge()
tx.commit()
+ tx.sync()
+
for b in cluster:
b.assert_browse_backup("a", [], msg=b)
- b.assert_browse_backup("b", ["bar"], msg=b)
+ b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b)
+
+ # Check for expected actions on the store
+ expect = """<enqueue a x>
+<enqueue a y>
+<enqueue a z>
+<begin tx 1>
+<dequeue a x tx=1>
+<dequeue a y tx=1>
+<dequeue a z tx=1>
+<commit tx=1>
+"""
+ self.assertEqual(expect, open_read(cluster[0].store_log))
+ self.assertEqual(expect, open_read(cluster[1].store_log))
def test_tx_simple_rollback(self):
- cluster = HaCluster(self, 2)
+ cluster = HaCluster(self, 2, test_store=True)
tx = self.tx_simple_setup(cluster[0])
+ tx.acknowledge()
tx.rollback()
for b in cluster:
- b.assert_browse_backup("a", ["foo"], msg=b)
- b.assert_browse_backup("b", [], msg=b)
+ b.assert_browse_backup("a", ["x","y","z"], msg=b)
+ b.assert_browse_backup("b", ['0', '1', '2'], msg=b)
+ # Check for expected actions on the store
+ expect = """<enqueue a x>
+<enqueue a y>
+<enqueue a z>
+"""
+ self.assertEqual(open_read(cluster[0].store_log), expect)
+ self.assertEqual(open_read(cluster[1].store_log), expect)
def test_tx_simple_failover(self):
- cluster = HaCluster(self, 2)
+ cluster = HaCluster(self, 2, test_store=True)
tx = self.tx_simple_setup(cluster[0])
+ tx.acknowledge()
cluster.bounce(0) # Should cause roll-back
+ cluster[0].wait_status("ready")
for b in cluster:
- b.assert_browse_backup("a", ["foo"], msg=b)
- b.assert_browse_backup("b", [], msg=b)
-
- def test_tx_simple_join(self):
- cluster = HaCluster(self, 2)
- tx = self.tx_simple_setup(cluster[0])
- cluster.bounce(1) # Should catch up with tx
- tx.commit()
- for b in cluster:
- b.assert_browse_backup("a", [], msg=b)
- b.assert_browse_backup("b", ["bar"], msg=b)
+ b.assert_browse_backup("a", ["x","y","z"], msg=b)
+ b.assert_browse_backup("b", ['0', '1', '2'], msg=b)
+
+ # Check for expected actions on the store
+ expect = """<enqueue a x>
+<enqueue a y>
+<enqueue a z>
+"""
+ self.assertEqual(open_read(cluster[0].store_log), expect)
+ self.assertEqual(open_read(cluster[1].store_log), expect)
+
+# FIXME aconway 2013-07-23: test with partial acknowledgement.
if __name__ == "__main__":
outdir = "ha_tests.tmp"