summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/ha_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py63
1 files changed, 43 insertions, 20 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 79024d48e3..138868f64e 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -30,19 +30,10 @@ from qpidtoollibs import BrokerAgent, EventHelper
log = getLogger(__name__)
-def grep(filename, regexp):
- for line in open(filename).readlines():
- if (regexp.search(line)): return True
- return False
class HaBrokerTest(BrokerTest):
"""Base class for HA broker tests"""
- def assert_log_no_errors(self, broker):
- log = broker.get_log()
- if grep(log, re.compile("] error|] critical")):
- self.fail("Errors in log file %s"%(log))
-
class ReplicationTests(HaBrokerTest):
"""Correctness tests for HA replication."""
@@ -838,7 +829,7 @@ acl deny all all
# It is possible for the backup to attempt to subscribe after the queue
# is deleted. This is not an error, but is logged as an error on the primary.
# The backup does not log this as an error so we only check the backup log for errors.
- self.assert_log_no_errors(cluster[1])
+ cluster[1].assert_log_clean()
def test_missed_recreate(self):
"""If a queue or exchange is destroyed and one with the same name re-created
@@ -1003,6 +994,32 @@ class LongTests(HaBrokerTest):
dead = filter(lambda b: not b.is_running(), brokers)
if dead: raise Exception("Brokers not running: %s"%dead)
+ def test_tx_send_receive(self):
+ brokers = HaCluster(self, 3)
+ sender = self.popen(
+ ["qpid-send",
+ "--broker", brokers[0].host_port(),
+ "--address", "q;{create:always}",
+ "--messages=1000",
+ "--tx=10"
+ ])
+ receiver = self.popen(
+ ["qpid-receive",
+ "--broker", brokers[0].host_port(),
+ "--address", "q;{create:always}",
+ "--messages=990",
+ "--timeout=10",
+ "--tx=10"
+ ])
+ self.assertEqual(sender.wait(), 0)
+ self.assertEqual(receiver.wait(), 0)
+ expect = [long(i) for i in range(991, 1001)]
+ sn = lambda m: m.properties["sn"]
+ brokers[0].assert_browse("q", expect, transform=sn)
+ brokers[1].assert_browse_backup("q", expect, transform=sn)
+ brokers[2].assert_browse_backup("q", expect, transform=sn)
+
+
def test_qmf_order(self):
"""QPID 4402: HA QMF events can be out of order.
This test mimics the test described in the JIRA. Two threads repeatedly
@@ -1352,12 +1369,14 @@ class TransactionTests(HaBrokerTest):
def assert_tx_clean(self, b):
"""Verify that there are no transaction artifacts
(exchanges, queues, subscriptions) on b."""
- queues=[]
- def txq(): queues = b.agent().tx_queues(); return not queues
- assert retry(txq), "%s: unexpected %s"%(b,queues)
- subs=[]
- def txs(): subs = self.tx_subscriptions(b); return not subs
- assert retry(txs), "%s: unexpected %s"%(b,subs)
+ class FunctionCache: # Call a function and cache the result.
+ def __init__(self, f): self.f, self.value = f, None
+ def __call__(self): self.value = self.f(); return self.value
+
+ txq= FunctionCache(b.agent().tx_queues)
+ assert retry(lambda: not txq()), "%s: unexpected %s"%(b, txq.value)
+ txsub = FunctionCache(lambda: self.tx_subscriptions(b))
+ assert retry(lambda: not txsub()), "%s: unexpected %s"%(b, txsub.value)
# TODO aconway 2013-10-15: TX exchanges don't show up in management.
def assert_simple_commit_outcome(self, b, tx_queues):
@@ -1462,18 +1481,22 @@ class TransactionTests(HaBrokerTest):
self.assertEqual([1,1,1], [len(b.agent().tx_queues()) for b in cluster])
cluster[1].kill(final=False)
s.send("b")
- self.assert_commit_raises(tx)
- for b in [cluster[0],cluster[2]]: self.assert_tx_clean(b)
+ tx.commit()
+ tx.close()
+ for b in [cluster[0],cluster[2]]:
+ self.assert_tx_clean(b)
+ b.assert_browse_backup("q", ["a","b"], msg=b)
# Joining
tx = cluster[0].connect().session(transactional=True)
s = tx.sender("q;{create:always}")
s.send("foo")
- cluster.restart(1)
+ cluster.restart(1) # Not a part of the current transaction.
tx.commit()
tx.close()
for b in cluster: self.assert_tx_clean(b)
# The new member is not in the tx but receives the results normal replication.
- for b in cluster: b.assert_browse_backup("q", ["foo"], msg=b)
+ for b in cluster: b.assert_browse_backup("q", ["a", "b", "foo"], msg=b)
+ # FIXME aconway 2013-11-07: assert_log_clean
def test_tx_block_threads(self):
"""Verify that TXs blocked in commit don't deadlock."""