diff options
author | Alan Conway <aconway@apache.org> | 2015-09-03 18:59:05 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2015-09-03 18:59:05 +0000 |
commit | 14de0a97b80cc0f63208b61654dd0348fd3da8b1 (patch) | |
tree | a9f3a2bcdda814d6a36ab1981ae528272d460523 /qpid/cpp/src/tests | |
parent | 492f7a78b8fd2ed7dd4e08184f42b7496aa8fed1 (diff) | |
download | qpid-python-14de0a97b80cc0f63208b61654dd0348fd3da8b1.tar.gz |
QPID-5855 - Simplified HA transaction logic.
Removed complex and incorrect HA+TX logic, reverted to the following limitation:
You can use transactions in a HA cluster, but there are limitations on the
transactional guarantees. Transactions function normally with the *primary*
broker but replication to the backups is not coverted by the atomic guarantee.
The following situations are all safe:
- Client rolls back a transaction.
- Client successfully commits a transaction.
- Primary fails during a transaction *before* the client sends a commit.
- Transaction contains only one message.
The problem case is when all of the following occur:
- transaction contains multiple actions (enqueues or dequeues)
- primary fails between client sending commit and receiving commit-complete.
In this case it is possible that only part of the transaction was replicated to
the backups, so on fail-over partial transaction results may be visible.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1701109 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
-rwxr-xr-x | qpid/cpp/src/tests/ha_test.py | 2 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 206 |
2 files changed, 20 insertions, 188 deletions
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index 82ca808cb1..ace225a509 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -267,6 +267,8 @@ acl allow all all c = self.connect_admin() try: wait_address(c, queue) + if not "msg" in kwargs: + kwargs["msg"]=str(self) assert_browse_retry(c.session(), queue, expected, **kwargs) finally: c.close() diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 2ee2e291e2..0efb8182ec 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -1327,28 +1327,25 @@ class TransactionTests(HaBrokerTest): sb.close() return tx - def tx_subscriptions(self, broker): - """Return list of queue names for tx subscriptions""" - return [q for q in broker.agent.repsub_queues() - if q.startswith("qpid.ha-tx")] - def test_tx_simple_commit(self): cluster = HaCluster(self, 2, test_store=True, wait=True) tx = self.tx_simple_setup(cluster) tx.sync() - tx_queues = cluster[0].agent.tx_queues() - - # NOTE: backup does not process transactional dequeues until prepare - cluster[1].assert_browse_backup("a", ["x","y","z"]) - cluster[1].assert_browse_backup("b", ['0', '1', '2']) - tx.acknowledge() + # Pre transaction - messages are acquired on primary but not yet dequeued + # so still there on backup. + cluster[0].assert_browse_backup("a", []) + cluster[1].assert_browse_backup("a", ['x', 'y', 'z']) + for b in cluster: + b.assert_browse_backup("b", ['0', '1', '2']) tx.commit() tx.sync() tx.close() + # Post transaction: all synced. for b in cluster: - self.assert_simple_commit_outcome(b, tx_queues) + b.assert_browse_backup("a", []) + b.assert_browse_backup("b", ['0', '1', '2', "x","y","z"]) # Verify non-tx dequeue is replicated correctly c = cluster.connect(0, protocol=self.tx_protocol) @@ -1360,121 +1357,22 @@ class TransactionTests(HaBrokerTest): c.close() tx.connection.close() - - def check_enq_deq(self, cluster, queue, expect): - for b in cluster: - q = b.agent.getQueue(queue) - self.assertEqual( - (b.name,)+expect, - (b.name, q.msgTotalEnqueues, q.msgTotalDequeues, q.msgTxnEnqueues, q.msgTxnDequeues)) - - def test_tx_enq_notx_deq(self): - """Verify that a non-tx dequeue of a tx enqueue is replicated correctly""" - cluster = HaCluster(self, 2, test_store=True) - c = cluster.connect(0, protocol=self.tx_protocol) - - tx = c.session(transactional=True) - c.session().sender("qq;{create:always}").send("m1") - tx.sender("qq;{create:always}").send("tx") - tx.commit() - tx.close() - c.session().sender("qq;{create:always}").send("m2") - self.check_enq_deq(cluster, 'qq', (3, 0, 1, 0)) - - notx = c.session() - self.assertEqual(['m1', 'tx', 'm2'], [m.content for m in receiver_iter(notx.receiver('qq'))]) - notx.acknowledge() - self.check_enq_deq(cluster, 'qq', (3, 3, 1, 0)) - for b in cluster: b.assert_browse_backup('qq', [], msg=b) - for b in cluster: self.assert_tx_clean(b) - - def test_tx_enq_notx_deq_qpid_send(self): - """Verify that a non-tx dequeue of a tx enqueue is replicated correctly""" - cluster = HaCluster(self, 2, test_store=True) - - self.popen( - ['qpid-send', '-a', 'qq;{create:always}', '-b', cluster[0].host_port(), '--tx=1', - '--content-string=foo'] - ).assert_exit_ok() - for b in cluster: b.assert_browse_backup('qq', ['foo'], msg=b) - self.check_enq_deq(cluster, 'qq', (1, 0, 1, 0)) - - self.popen(['qpid-receive', '-a', 'qq', '-b', cluster[0].host_port()]).assert_exit_ok() - self.check_enq_deq(cluster, 'qq', (1, 1, 1, 0)) - for b in cluster: b.assert_browse_backup('qq', [], msg=b) - for b in cluster: self.assert_tx_clean(b) - - def assert_tx_clean(self, b): - """Verify that there are no transaction artifacts - (exchanges, queues, subscriptions) on b.""" - class FunctionCache: # Call a function and cache the result. - def __init__(self, f): self.f, self.value = f, None - def __call__(self): self.value = self.f(); return self.value - - txq= FunctionCache(b.agent.tx_queues) - assert retry(lambda: not txq()), "%s: unexpected %s"%(b, txq.value) - txsub = FunctionCache(lambda: self.tx_subscriptions(b)) - assert retry(lambda: not txsub()), "%s: unexpected %s"%(b, txsub.value) - # TODO aconway 2013-10-15: TX exchanges don't show up in management. - - def assert_simple_commit_outcome(self, b, tx_queues): - b.assert_browse_backup("a", [], msg=b) - b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b) - # Check for expected actions on the store - expect = """<enqueue a x> -<enqueue a y> -<enqueue a z> -<begin tx 1> -<dequeue a x tx=1> -<dequeue a y tx=1> -<dequeue a z tx=1> -<commit tx=1> -""" - self.assertEqual(expect, open_read(b.store_log), msg=b) - self.assert_tx_clean(b) - def test_tx_simple_rollback(self): cluster = HaCluster(self, 2, test_store=True) tx = self.tx_simple_setup(cluster) tx.sync() - tx_queues = cluster[0].agent.tx_queues() tx.acknowledge() tx.rollback() - tx.close() # For clean test. - for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) + + for b in cluster: + b.assert_browse_backup("a", ["x","y","z"]) + b.assert_browse_backup("b", ['0', '1', '2']) + + tx.close() tx.connection.close() - def assert_simple_rollback_outcome(self, b, tx_queues): - b.assert_browse_backup("a", ["x","y","z"], msg=b) - b.assert_browse_backup("b", ['0', '1', '2'], msg=b) - # Check for expected actions on the store - expect = """<enqueue a x> -<enqueue a y> -<enqueue a z> -""" - self.assertEqual(open_read(b.store_log), expect, msg=b) - self.assert_tx_clean(b) def test_tx_simple_failure(self): - """Verify we throw TransactionAborted if there is a store error during a transaction""" - cluster = HaCluster(self, 3, test_store=True) - tx = self.tx_simple_setup(cluster) - tx.sync() - tx_queues = cluster[0].agent.tx_queues() - tx.acknowledge() - l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. - try: - cluster.bounce(0) # Should cause roll-back - tx.connection.session() # Wait for reconnect - for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) - self.assertRaises(qm.TransactionAborted, tx.sync) - self.assertRaises(qm.TransactionAborted, tx.commit) - try: tx.connection.close() - except qm.TransactionAborted: pass # Occasionally get exception on close. - for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) - finally: l.restore() - - def test_tx_simple_failover(self): """Verify we throw TransactionAborted if there is a fail-over during a transaction""" cluster = HaCluster(self, 3, test_store=True) tx = self.tx_simple_setup(cluster) @@ -1485,79 +1383,15 @@ class TransactionTests(HaBrokerTest): try: cluster.bounce(0) # Should cause roll-back tx.connection.session() # Wait for reconnect - for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) self.assertRaises(qm.TransactionAborted, tx.sync) self.assertRaises(qm.TransactionAborted, tx.commit) try: tx.connection.close() except qm.TransactionAborted: pass # Occasionally get exception on close. - for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) + for b in cluster: + b.assert_browse_backup("a", ["x","y","z"]) + b.assert_browse_backup("b", ['0', '1', '2']) finally: l.restore() - def test_tx_unknown_failover(self): - """Verify we throw TransactionUnknown if there is a failure during commit""" - cluster = HaCluster(self, 3, test_store=True) - tx = self.tx_simple_setup(cluster) - tx.sync() - tx_queues = cluster[0].agent.tx_queues() - tx.acknowledge() - l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. - try: - os.kill(cluster[2].pid, signal.SIGSTOP) # Delay prepare response - class CommitThread(Thread): - def run(self): - try: tx.commit() - except Exception, e: - self.error = e - t = CommitThread() - t.start() # Commit in progress - t.join(timeout=0.01) - self.assertTrue(t.isAlive()) - cluster.bounce(0) - os.kill(cluster[2].pid, signal.SIGCONT) - t.join() - try: raise t.error - except qm.TransactionUnknown: pass - for b in cluster: self.assert_tx_clean(b) - try: tx.connection.close() - except qm.TransactionUnknown: pass # Occasionally get exception on close. - finally: l.restore() - - def test_tx_no_backups(self): - """Test the special case of a TX where there are no backups""" - - # Test commit - cluster = HaCluster(self, 1, test_store=True) - tx = self.tx_simple_setup(cluster) - tx.acknowledge() - tx.commit() - tx.sync() - tx_queues = cluster[0].agent.tx_queues() - tx.close() - self.assert_simple_commit_outcome(cluster[0], tx_queues) - - # Test rollback - cluster = HaCluster(self, 1, test_store=True) - tx = self.tx_simple_setup(cluster) - tx.sync() - tx_queues = cluster[0].agent.tx_queues() - tx.acknowledge() - tx.rollback() - tx.sync() - tx.close() - self.assert_simple_rollback_outcome(cluster[0], tx_queues) - - def test_tx_backup_fail(self): - cluster = HaCluster(self, 2, test_store=True, s_args=[[],["--test-store-name=bang"]]) - c = cluster[0].connect(protocol=self.tx_protocol) - tx = c.session(transactional=True) - s = tx.sender("q;{create:always,node:{durable:true}}") - for m in ["foo","TEST_STORE_DO bang: throw","bar"]: s.send(qm.Message(m, durable=True)) - def commit_sync(): tx.commit(); tx.sync() - self.assertRaises(qm.TransactionAborted, commit_sync) - for b in cluster: b.assert_browse_backup("q", []) - self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q TEST_STORE_DO bang: throw tx=1>\n<enqueue q bar tx=1>\n<abort tx=1>\n") - self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q TEST_STORE_DO bang: throw tx=1>\n<abort tx=1>\n") - def test_tx_join_leave(self): """Test cluster members joining/leaving cluster. Also check that tx-queues are cleaned up at end of transaction.""" @@ -1568,13 +1402,11 @@ class TransactionTests(HaBrokerTest): tx = cluster[0].connect(protocol=self.tx_protocol).session(transactional=True) s = tx.sender("q;{create:always}") s.send("a", sync=True) - self.assertEqual([1,1,1], [len(b.agent.tx_queues()) for b in cluster]) cluster[1].kill(final=False) s.send("b") tx.commit() tx.connection.close() for b in [cluster[0],cluster[2]]: - self.assert_tx_clean(b) b.assert_browse_backup("q", ["a","b"], msg=b) # Joining tx = cluster[0].connect(protocol=self.tx_protocol).session(transactional=True) @@ -1583,7 +1415,6 @@ class TransactionTests(HaBrokerTest): cluster.restart(1) # Not a part of the current transaction. tx.commit() tx.connection.close() - for b in cluster: self.assert_tx_clean(b) # The new member is not in the tx but receives the results normal replication. for b in cluster: b.assert_browse_backup("q", ["a", "b", "foo"], msg=b) @@ -1596,7 +1427,6 @@ class TransactionTests(HaBrokerTest): for s in sessions: sn = s.sender("qq;{create:always,node:{durable:true}}") sn.send(qm.Message("foo", durable=True)) - self.assertEqual(n, len(cluster[1].agent.tx_queues())) threads = [ Thread(target=s.commit) for s in sessions] for t in threads: t.start() cluster[0].ready(timeout=1) # Check for deadlock |