summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/ha_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py57
1 files changed, 54 insertions, 3 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index f71560dffb..58b3ff2802 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -1217,7 +1217,6 @@ class RecoveryTests(HaBrokerTest):
def test_stalled_backup(self):
"""Make sure that a stalled backup broker does not stall the primary"""
- # FIXME aconway 2014-04-15: merge with test_join_ready_cluster?
cluster = HaCluster(self, 3, args=["--link-heartbeat-interval=1"])
os.kill(cluster[1].pid, signal.SIGSTOP)
s = cluster[0].connect().session()
@@ -1272,7 +1271,7 @@ class StoreTests(HaBrokerTest):
"""Verify that a backup erases queue data from store recovery before
doing catch-up from the primary."""
if self.check_skip(): return
- cluster = HaCluster(self, 2)
+ cluster = HaCluster(self, 2, args=['--log-enable=trace+:ha', '--log-enable=trace+:Store'])
sn = cluster[0].connect(heartbeat=HaBroker.heartbeat).session()
s1 = sn.sender("q1;{create:always,node:{durable:true}}")
for m in ["foo","bar"]: s1.send(qm.Message(m, durable=True))
@@ -1362,9 +1361,61 @@ class TransactionTests(HaBrokerTest):
tx.acknowledge()
tx.commit()
tx.sync()
+ tx.close()
+
+ for b in cluster:
+ self.assert_simple_commit_outcome(b, tx_queues)
+
+ # Verify non-tx dequeue is replicated correctly
+ c = cluster.connect(0, protocol=self.tx_protocol)
+ r = c.session().receiver("b")
+ ri = receiver_iter(r, timeout=1)
+ self.assertEqual(['0', '1', '2', 'x', 'y', 'z'], [m.content for m in ri])
+ r.session.acknowledge()
+ for b in cluster: b.assert_browse_backup("b", [], msg=b)
+
+ def check_enq_deq(self, cluster, queue, expect):
+ for b in cluster:
+ q = b.agent.getQueue(queue)
+ self.assertEqual(
+ (b.name,)+expect,
+ (b.name, q.msgTotalEnqueues, q.msgTotalDequeues, q.msgTxnEnqueues, q.msgTxnDequeues))
+
+ def test_tx_enq_notx_deq(self):
+ """Verify that a non-tx dequeue of a tx enqueue is replicated correctly"""
+ cluster = HaCluster(self, 2, test_store=True)
+ c = cluster.connect(0, protocol=self.tx_protocol)
+ tx = c.session(transactional=True)
+ c.session().sender("qq;{create:always}").send("m1")
+ tx.sender("qq;{create:always}").send("tx")
+ tx.commit()
tx.close()
- for b in cluster: self.assert_simple_commit_outcome(b, tx_queues)
+ c.session().sender("qq;{create:always}").send("m2")
+ self.check_enq_deq(cluster, 'qq', (3, 0, 1, 0))
+
+ notx = c.session()
+ self.assertEqual(['m1', 'tx', 'm2'], [m.content for m in receiver_iter(notx.receiver('qq'))])
+ notx.acknowledge()
+ self.check_enq_deq(cluster, 'qq', (3, 3, 1, 0))
+ for b in cluster: b.assert_browse_backup('qq', [], msg=b)
+ for b in cluster: self.assert_tx_clean(b)
+
+ def test_tx_enq_notx_deq_qpid_send(self):
+ """Verify that a non-tx dequeue of a tx enqueue is replicated correctly"""
+ cluster = HaCluster(self, 2, test_store=True)
+
+ self.popen(
+ ['qpid-send', '-a', 'qq;{create:always}', '-b', cluster[0].host_port(), '--tx=1',
+ '--content-string=foo']
+ ).assert_exit_ok()
+ for b in cluster: b.assert_browse_backup('qq', ['foo'], msg=b)
+ self.check_enq_deq(cluster, 'qq', (1, 0, 1, 0))
+
+ self.popen(['qpid-receive', '-a', 'qq', '-b', cluster[0].host_port()]).assert_exit_ok()
+ self.check_enq_deq(cluster, 'qq', (1, 1, 1, 0))
+ for b in cluster: b.assert_browse_backup('qq', [], msg=b)
+ for b in cluster: self.assert_tx_clean(b)
def assert_tx_clean(self, b):
"""Verify that there are no transaction artifacts