diff options
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 63 |
1 files changed, 43 insertions, 20 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 79024d48e3..138868f64e 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -30,19 +30,10 @@ from qpidtoollibs import BrokerAgent, EventHelper log = getLogger(__name__) -def grep(filename, regexp): - for line in open(filename).readlines(): - if (regexp.search(line)): return True - return False class HaBrokerTest(BrokerTest): """Base class for HA broker tests""" - def assert_log_no_errors(self, broker): - log = broker.get_log() - if grep(log, re.compile("] error|] critical")): - self.fail("Errors in log file %s"%(log)) - class ReplicationTests(HaBrokerTest): """Correctness tests for HA replication.""" @@ -838,7 +829,7 @@ acl deny all all # It is possible for the backup to attempt to subscribe after the queue # is deleted. This is not an error, but is logged as an error on the primary. # The backup does not log this as an error so we only check the backup log for errors. - self.assert_log_no_errors(cluster[1]) + cluster[1].assert_log_clean() def test_missed_recreate(self): """If a queue or exchange is destroyed and one with the same name re-created @@ -1003,6 +994,32 @@ class LongTests(HaBrokerTest): dead = filter(lambda b: not b.is_running(), brokers) if dead: raise Exception("Brokers not running: %s"%dead) + def test_tx_send_receive(self): + brokers = HaCluster(self, 3) + sender = self.popen( + ["qpid-send", + "--broker", brokers[0].host_port(), + "--address", "q;{create:always}", + "--messages=1000", + "--tx=10" + ]) + receiver = self.popen( + ["qpid-receive", + "--broker", brokers[0].host_port(), + "--address", "q;{create:always}", + "--messages=990", + "--timeout=10", + "--tx=10" + ]) + self.assertEqual(sender.wait(), 0) + self.assertEqual(receiver.wait(), 0) + expect = [long(i) for i in range(991, 1001)] + sn = lambda m: m.properties["sn"] + brokers[0].assert_browse("q", expect, transform=sn) + brokers[1].assert_browse_backup("q", expect, transform=sn) + brokers[2].assert_browse_backup("q", expect, transform=sn) + + def test_qmf_order(self): """QPID 4402: HA QMF events can be out of order. This test mimics the test described in the JIRA. Two threads repeatedly @@ -1352,12 +1369,14 @@ class TransactionTests(HaBrokerTest): def assert_tx_clean(self, b): """Verify that there are no transaction artifacts (exchanges, queues, subscriptions) on b.""" - queues=[] - def txq(): queues = b.agent().tx_queues(); return not queues - assert retry(txq), "%s: unexpected %s"%(b,queues) - subs=[] - def txs(): subs = self.tx_subscriptions(b); return not subs - assert retry(txs), "%s: unexpected %s"%(b,subs) + class FunctionCache: # Call a function and cache the result. + def __init__(self, f): self.f, self.value = f, None + def __call__(self): self.value = self.f(); return self.value + + txq= FunctionCache(b.agent().tx_queues) + assert retry(lambda: not txq()), "%s: unexpected %s"%(b, txq.value) + txsub = FunctionCache(lambda: self.tx_subscriptions(b)) + assert retry(lambda: not txsub()), "%s: unexpected %s"%(b, txsub.value) # TODO aconway 2013-10-15: TX exchanges don't show up in management. def assert_simple_commit_outcome(self, b, tx_queues): @@ -1462,18 +1481,22 @@ class TransactionTests(HaBrokerTest): self.assertEqual([1,1,1], [len(b.agent().tx_queues()) for b in cluster]) cluster[1].kill(final=False) s.send("b") - self.assert_commit_raises(tx) - for b in [cluster[0],cluster[2]]: self.assert_tx_clean(b) + tx.commit() + tx.close() + for b in [cluster[0],cluster[2]]: + self.assert_tx_clean(b) + b.assert_browse_backup("q", ["a","b"], msg=b) # Joining tx = cluster[0].connect().session(transactional=True) s = tx.sender("q;{create:always}") s.send("foo") - cluster.restart(1) + cluster.restart(1) # Not a part of the current transaction. tx.commit() tx.close() for b in cluster: self.assert_tx_clean(b) # The new member is not in the tx but receives the results normal replication. - for b in cluster: b.assert_browse_backup("q", ["foo"], msg=b) + for b in cluster: b.assert_browse_backup("q", ["a", "b", "foo"], msg=b) + # FIXME aconway 2013-11-07: assert_log_clean def test_tx_block_threads(self): """Verify that TXs blocked in commit don't deadlock.""" |