diff options
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 57 |
1 files changed, 54 insertions, 3 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index f71560dffb..58b3ff2802 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -1217,7 +1217,6 @@ class RecoveryTests(HaBrokerTest): def test_stalled_backup(self): """Make sure that a stalled backup broker does not stall the primary""" - # FIXME aconway 2014-04-15: merge with test_join_ready_cluster? cluster = HaCluster(self, 3, args=["--link-heartbeat-interval=1"]) os.kill(cluster[1].pid, signal.SIGSTOP) s = cluster[0].connect().session() @@ -1272,7 +1271,7 @@ class StoreTests(HaBrokerTest): """Verify that a backup erases queue data from store recovery before doing catch-up from the primary.""" if self.check_skip(): return - cluster = HaCluster(self, 2) + cluster = HaCluster(self, 2, args=['--log-enable=trace+:ha', '--log-enable=trace+:Store']) sn = cluster[0].connect(heartbeat=HaBroker.heartbeat).session() s1 = sn.sender("q1;{create:always,node:{durable:true}}") for m in ["foo","bar"]: s1.send(qm.Message(m, durable=True)) @@ -1362,9 +1361,61 @@ class TransactionTests(HaBrokerTest): tx.acknowledge() tx.commit() tx.sync() + tx.close() + + for b in cluster: + self.assert_simple_commit_outcome(b, tx_queues) + + # Verify non-tx dequeue is replicated correctly + c = cluster.connect(0, protocol=self.tx_protocol) + r = c.session().receiver("b") + ri = receiver_iter(r, timeout=1) + self.assertEqual(['0', '1', '2', 'x', 'y', 'z'], [m.content for m in ri]) + r.session.acknowledge() + for b in cluster: b.assert_browse_backup("b", [], msg=b) + + def check_enq_deq(self, cluster, queue, expect): + for b in cluster: + q = b.agent.getQueue(queue) + self.assertEqual( + (b.name,)+expect, + (b.name, q.msgTotalEnqueues, q.msgTotalDequeues, q.msgTxnEnqueues, q.msgTxnDequeues)) + + def test_tx_enq_notx_deq(self): + """Verify that a non-tx dequeue of a tx enqueue is replicated correctly""" + cluster = HaCluster(self, 2, test_store=True) + c = cluster.connect(0, protocol=self.tx_protocol) + tx = c.session(transactional=True) + c.session().sender("qq;{create:always}").send("m1") + tx.sender("qq;{create:always}").send("tx") + tx.commit() tx.close() - for b in cluster: self.assert_simple_commit_outcome(b, tx_queues) + c.session().sender("qq;{create:always}").send("m2") + self.check_enq_deq(cluster, 'qq', (3, 0, 1, 0)) + + notx = c.session() + self.assertEqual(['m1', 'tx', 'm2'], [m.content for m in receiver_iter(notx.receiver('qq'))]) + notx.acknowledge() + self.check_enq_deq(cluster, 'qq', (3, 3, 1, 0)) + for b in cluster: b.assert_browse_backup('qq', [], msg=b) + for b in cluster: self.assert_tx_clean(b) + + def test_tx_enq_notx_deq_qpid_send(self): + """Verify that a non-tx dequeue of a tx enqueue is replicated correctly""" + cluster = HaCluster(self, 2, test_store=True) + + self.popen( + ['qpid-send', '-a', 'qq;{create:always}', '-b', cluster[0].host_port(), '--tx=1', + '--content-string=foo'] + ).assert_exit_ok() + for b in cluster: b.assert_browse_backup('qq', ['foo'], msg=b) + self.check_enq_deq(cluster, 'qq', (1, 0, 1, 0)) + + self.popen(['qpid-receive', '-a', 'qq', '-b', cluster[0].host_port()]).assert_exit_ok() + self.check_enq_deq(cluster, 'qq', (1, 1, 1, 0)) + for b in cluster: b.assert_browse_backup('qq', [], msg=b) + for b in cluster: self.assert_tx_clean(b) def assert_tx_clean(self, b): """Verify that there are no transaction artifacts |