summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2015-09-03 18:59:05 +0000
committerAlan Conway <aconway@apache.org>2015-09-03 18:59:05 +0000
commit14de0a97b80cc0f63208b61654dd0348fd3da8b1 (patch)
treea9f3a2bcdda814d6a36ab1981ae528272d460523 /qpid/cpp/src/tests
parent492f7a78b8fd2ed7dd4e08184f42b7496aa8fed1 (diff)
downloadqpid-python-14de0a97b80cc0f63208b61654dd0348fd3da8b1.tar.gz
QPID-5855 - Simplified HA transaction logic.
Removed complex and incorrect HA+TX logic, reverted to the following limitation: You can use transactions in a HA cluster, but there are limitations on the transactional guarantees. Transactions function normally with the *primary* broker but replication to the backups is not coverted by the atomic guarantee. The following situations are all safe: - Client rolls back a transaction. - Client successfully commits a transaction. - Primary fails during a transaction *before* the client sends a commit. - Transaction contains only one message. The problem case is when all of the following occur: - transaction contains multiple actions (enqueues or dequeues) - primary fails between client sending commit and receiving commit-complete. In this case it is possible that only part of the transaction was replicated to the backups, so on fail-over partial transaction results may be visible. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1701109 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py2
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py206
2 files changed, 20 insertions, 188 deletions
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index 82ca808cb1..ace225a509 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -267,6 +267,8 @@ acl allow all all
c = self.connect_admin()
try:
wait_address(c, queue)
+ if not "msg" in kwargs:
+ kwargs["msg"]=str(self)
assert_browse_retry(c.session(), queue, expected, **kwargs)
finally: c.close()
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 2ee2e291e2..0efb8182ec 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -1327,28 +1327,25 @@ class TransactionTests(HaBrokerTest):
sb.close()
return tx
- def tx_subscriptions(self, broker):
- """Return list of queue names for tx subscriptions"""
- return [q for q in broker.agent.repsub_queues()
- if q.startswith("qpid.ha-tx")]
-
def test_tx_simple_commit(self):
cluster = HaCluster(self, 2, test_store=True, wait=True)
tx = self.tx_simple_setup(cluster)
tx.sync()
- tx_queues = cluster[0].agent.tx_queues()
-
- # NOTE: backup does not process transactional dequeues until prepare
- cluster[1].assert_browse_backup("a", ["x","y","z"])
- cluster[1].assert_browse_backup("b", ['0', '1', '2'])
-
tx.acknowledge()
+ # Pre transaction - messages are acquired on primary but not yet dequeued
+ # so still there on backup.
+ cluster[0].assert_browse_backup("a", [])
+ cluster[1].assert_browse_backup("a", ['x', 'y', 'z'])
+ for b in cluster:
+ b.assert_browse_backup("b", ['0', '1', '2'])
tx.commit()
tx.sync()
tx.close()
+ # Post transaction: all synced.
for b in cluster:
- self.assert_simple_commit_outcome(b, tx_queues)
+ b.assert_browse_backup("a", [])
+ b.assert_browse_backup("b", ['0', '1', '2', "x","y","z"])
# Verify non-tx dequeue is replicated correctly
c = cluster.connect(0, protocol=self.tx_protocol)
@@ -1360,121 +1357,22 @@ class TransactionTests(HaBrokerTest):
c.close()
tx.connection.close()
-
- def check_enq_deq(self, cluster, queue, expect):
- for b in cluster:
- q = b.agent.getQueue(queue)
- self.assertEqual(
- (b.name,)+expect,
- (b.name, q.msgTotalEnqueues, q.msgTotalDequeues, q.msgTxnEnqueues, q.msgTxnDequeues))
-
- def test_tx_enq_notx_deq(self):
- """Verify that a non-tx dequeue of a tx enqueue is replicated correctly"""
- cluster = HaCluster(self, 2, test_store=True)
- c = cluster.connect(0, protocol=self.tx_protocol)
-
- tx = c.session(transactional=True)
- c.session().sender("qq;{create:always}").send("m1")
- tx.sender("qq;{create:always}").send("tx")
- tx.commit()
- tx.close()
- c.session().sender("qq;{create:always}").send("m2")
- self.check_enq_deq(cluster, 'qq', (3, 0, 1, 0))
-
- notx = c.session()
- self.assertEqual(['m1', 'tx', 'm2'], [m.content for m in receiver_iter(notx.receiver('qq'))])
- notx.acknowledge()
- self.check_enq_deq(cluster, 'qq', (3, 3, 1, 0))
- for b in cluster: b.assert_browse_backup('qq', [], msg=b)
- for b in cluster: self.assert_tx_clean(b)
-
- def test_tx_enq_notx_deq_qpid_send(self):
- """Verify that a non-tx dequeue of a tx enqueue is replicated correctly"""
- cluster = HaCluster(self, 2, test_store=True)
-
- self.popen(
- ['qpid-send', '-a', 'qq;{create:always}', '-b', cluster[0].host_port(), '--tx=1',
- '--content-string=foo']
- ).assert_exit_ok()
- for b in cluster: b.assert_browse_backup('qq', ['foo'], msg=b)
- self.check_enq_deq(cluster, 'qq', (1, 0, 1, 0))
-
- self.popen(['qpid-receive', '-a', 'qq', '-b', cluster[0].host_port()]).assert_exit_ok()
- self.check_enq_deq(cluster, 'qq', (1, 1, 1, 0))
- for b in cluster: b.assert_browse_backup('qq', [], msg=b)
- for b in cluster: self.assert_tx_clean(b)
-
- def assert_tx_clean(self, b):
- """Verify that there are no transaction artifacts
- (exchanges, queues, subscriptions) on b."""
- class FunctionCache: # Call a function and cache the result.
- def __init__(self, f): self.f, self.value = f, None
- def __call__(self): self.value = self.f(); return self.value
-
- txq= FunctionCache(b.agent.tx_queues)
- assert retry(lambda: not txq()), "%s: unexpected %s"%(b, txq.value)
- txsub = FunctionCache(lambda: self.tx_subscriptions(b))
- assert retry(lambda: not txsub()), "%s: unexpected %s"%(b, txsub.value)
- # TODO aconway 2013-10-15: TX exchanges don't show up in management.
-
- def assert_simple_commit_outcome(self, b, tx_queues):
- b.assert_browse_backup("a", [], msg=b)
- b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b)
- # Check for expected actions on the store
- expect = """<enqueue a x>
-<enqueue a y>
-<enqueue a z>
-<begin tx 1>
-<dequeue a x tx=1>
-<dequeue a y tx=1>
-<dequeue a z tx=1>
-<commit tx=1>
-"""
- self.assertEqual(expect, open_read(b.store_log), msg=b)
- self.assert_tx_clean(b)
-
def test_tx_simple_rollback(self):
cluster = HaCluster(self, 2, test_store=True)
tx = self.tx_simple_setup(cluster)
tx.sync()
- tx_queues = cluster[0].agent.tx_queues()
tx.acknowledge()
tx.rollback()
- tx.close() # For clean test.
- for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+
+ for b in cluster:
+ b.assert_browse_backup("a", ["x","y","z"])
+ b.assert_browse_backup("b", ['0', '1', '2'])
+
+ tx.close()
tx.connection.close()
- def assert_simple_rollback_outcome(self, b, tx_queues):
- b.assert_browse_backup("a", ["x","y","z"], msg=b)
- b.assert_browse_backup("b", ['0', '1', '2'], msg=b)
- # Check for expected actions on the store
- expect = """<enqueue a x>
-<enqueue a y>
-<enqueue a z>
-"""
- self.assertEqual(open_read(b.store_log), expect, msg=b)
- self.assert_tx_clean(b)
def test_tx_simple_failure(self):
- """Verify we throw TransactionAborted if there is a store error during a transaction"""
- cluster = HaCluster(self, 3, test_store=True)
- tx = self.tx_simple_setup(cluster)
- tx.sync()
- tx_queues = cluster[0].agent.tx_queues()
- tx.acknowledge()
- l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
- try:
- cluster.bounce(0) # Should cause roll-back
- tx.connection.session() # Wait for reconnect
- for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
- self.assertRaises(qm.TransactionAborted, tx.sync)
- self.assertRaises(qm.TransactionAborted, tx.commit)
- try: tx.connection.close()
- except qm.TransactionAborted: pass # Occasionally get exception on close.
- for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
- finally: l.restore()
-
- def test_tx_simple_failover(self):
"""Verify we throw TransactionAborted if there is a fail-over during a transaction"""
cluster = HaCluster(self, 3, test_store=True)
tx = self.tx_simple_setup(cluster)
@@ -1485,79 +1383,15 @@ class TransactionTests(HaBrokerTest):
try:
cluster.bounce(0) # Should cause roll-back
tx.connection.session() # Wait for reconnect
- for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
self.assertRaises(qm.TransactionAborted, tx.sync)
self.assertRaises(qm.TransactionAborted, tx.commit)
try: tx.connection.close()
except qm.TransactionAborted: pass # Occasionally get exception on close.
- for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+ for b in cluster:
+ b.assert_browse_backup("a", ["x","y","z"])
+ b.assert_browse_backup("b", ['0', '1', '2'])
finally: l.restore()
- def test_tx_unknown_failover(self):
- """Verify we throw TransactionUnknown if there is a failure during commit"""
- cluster = HaCluster(self, 3, test_store=True)
- tx = self.tx_simple_setup(cluster)
- tx.sync()
- tx_queues = cluster[0].agent.tx_queues()
- tx.acknowledge()
- l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
- try:
- os.kill(cluster[2].pid, signal.SIGSTOP) # Delay prepare response
- class CommitThread(Thread):
- def run(self):
- try: tx.commit()
- except Exception, e:
- self.error = e
- t = CommitThread()
- t.start() # Commit in progress
- t.join(timeout=0.01)
- self.assertTrue(t.isAlive())
- cluster.bounce(0)
- os.kill(cluster[2].pid, signal.SIGCONT)
- t.join()
- try: raise t.error
- except qm.TransactionUnknown: pass
- for b in cluster: self.assert_tx_clean(b)
- try: tx.connection.close()
- except qm.TransactionUnknown: pass # Occasionally get exception on close.
- finally: l.restore()
-
- def test_tx_no_backups(self):
- """Test the special case of a TX where there are no backups"""
-
- # Test commit
- cluster = HaCluster(self, 1, test_store=True)
- tx = self.tx_simple_setup(cluster)
- tx.acknowledge()
- tx.commit()
- tx.sync()
- tx_queues = cluster[0].agent.tx_queues()
- tx.close()
- self.assert_simple_commit_outcome(cluster[0], tx_queues)
-
- # Test rollback
- cluster = HaCluster(self, 1, test_store=True)
- tx = self.tx_simple_setup(cluster)
- tx.sync()
- tx_queues = cluster[0].agent.tx_queues()
- tx.acknowledge()
- tx.rollback()
- tx.sync()
- tx.close()
- self.assert_simple_rollback_outcome(cluster[0], tx_queues)
-
- def test_tx_backup_fail(self):
- cluster = HaCluster(self, 2, test_store=True, s_args=[[],["--test-store-name=bang"]])
- c = cluster[0].connect(protocol=self.tx_protocol)
- tx = c.session(transactional=True)
- s = tx.sender("q;{create:always,node:{durable:true}}")
- for m in ["foo","TEST_STORE_DO bang: throw","bar"]: s.send(qm.Message(m, durable=True))
- def commit_sync(): tx.commit(); tx.sync()
- self.assertRaises(qm.TransactionAborted, commit_sync)
- for b in cluster: b.assert_browse_backup("q", [])
- self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q TEST_STORE_DO bang: throw tx=1>\n<enqueue q bar tx=1>\n<abort tx=1>\n")
- self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q TEST_STORE_DO bang: throw tx=1>\n<abort tx=1>\n")
-
def test_tx_join_leave(self):
"""Test cluster members joining/leaving cluster.
Also check that tx-queues are cleaned up at end of transaction."""
@@ -1568,13 +1402,11 @@ class TransactionTests(HaBrokerTest):
tx = cluster[0].connect(protocol=self.tx_protocol).session(transactional=True)
s = tx.sender("q;{create:always}")
s.send("a", sync=True)
- self.assertEqual([1,1,1], [len(b.agent.tx_queues()) for b in cluster])
cluster[1].kill(final=False)
s.send("b")
tx.commit()
tx.connection.close()
for b in [cluster[0],cluster[2]]:
- self.assert_tx_clean(b)
b.assert_browse_backup("q", ["a","b"], msg=b)
# Joining
tx = cluster[0].connect(protocol=self.tx_protocol).session(transactional=True)
@@ -1583,7 +1415,6 @@ class TransactionTests(HaBrokerTest):
cluster.restart(1) # Not a part of the current transaction.
tx.commit()
tx.connection.close()
- for b in cluster: self.assert_tx_clean(b)
# The new member is not in the tx but receives the results normal replication.
for b in cluster: b.assert_browse_backup("q", ["a", "b", "foo"], msg=b)
@@ -1596,7 +1427,6 @@ class TransactionTests(HaBrokerTest):
for s in sessions:
sn = s.sender("qq;{create:always,node:{durable:true}}")
sn.send(qm.Message("foo", durable=True))
- self.assertEqual(n, len(cluster[1].agent.tx_queues()))
threads = [ Thread(target=s.commit) for s in sessions]
for t in threads: t.start()
cluster[0].ready(timeout=1) # Check for deadlock