From 1731c3ba99577fa515985609a675afd89e5c91e4 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 29 Oct 2013 15:23:49 +0000 Subject: QPID-5139: HA transactions block a thread, can deadlock the broker PrimaryTxObserver::prepare used to block pending responses from each backup. With concurrent transactions this can deadlock the broker: once all worker threads are blocked in prepare, responses from backups cannot be received. This commit generalizes the async completion mechanism for messages to allow async completion of arbitrary commands. It leaves the special-case code for messages undisturbed but adds a second path (starting from SessionState::handleCommand) for async completion of other commands. In particular it implements tx.commit to allow async completion. TxBuffer is now an AsyncCompletion and commitLocal() is split into - startCommit() called by SemanticState::commit() - endCommit() called when the commit command completes TxAccept no longer holds pre-computed ranges, compute fresh each time. - Avoid range iterators going out of date during a delayed commit. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1536754 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/TransactionObserverTest.cpp | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'qpid/cpp/src/tests/TransactionObserverTest.cpp') diff --git a/qpid/cpp/src/tests/TransactionObserverTest.cpp b/qpid/cpp/src/tests/TransactionObserverTest.cpp index 2a7d94b1ae..80ef494c21 100644 --- a/qpid/cpp/src/tests/TransactionObserverTest.cpp +++ b/qpid/cpp/src/tests/TransactionObserverTest.cpp @@ -79,8 +79,10 @@ struct MockBrokerObserver : public BrokerObserver { MockBrokerObserver(bool prep_=true) : prep(prep_) {} void startTx(const intrusive_ptr& buffer) { - tx.reset(new MockTransactionObserver(prep)); - buffer->setObserver(tx); + if (!tx) { // Don't overwrite first tx with automatically started second tx. + tx.reset(new MockTransactionObserver(prep)); + buffer->setObserver(tx); + } } }; @@ -94,7 +96,7 @@ Session simpleTxTransaction(MessagingFixture& fix) { return txSession; } -QPID_AUTO_TEST_CASE(tesTxtCommit) { +QPID_AUTO_TEST_CASE(testTxCommit) { MessagingFixture fix; shared_ptr brokerObserver(new MockBrokerObserver); fix.broker->getBrokerObservers().add(brokerObserver); @@ -114,6 +116,7 @@ QPID_AUTO_TEST_CASE(testTxFail) { fix.broker->getBrokerObservers().add(brokerObserver); Session txSession = simpleTxTransaction(fix); try { + ScopedSuppressLogging sl; // Suppress messages for expected error. txSession.commit(); BOOST_FAIL("Expected exception"); } catch(...) {} -- cgit v1.2.1