diff options
author | Alan Conway <aconway@apache.org> | 2013-10-29 15:23:49 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2013-10-29 15:23:49 +0000 |
commit | 1731c3ba99577fa515985609a675afd89e5c91e4 (patch) | |
tree | 8432209a8e11f28fca72d8d7972016572da0f3c6 /qpid/cpp/src/qpid/ha/Primary.cpp | |
parent | 7033bf67ab672fafc57d374f8a727cc8e4b7c54e (diff) | |
download | qpid-python-1731c3ba99577fa515985609a675afd89e5c91e4.tar.gz |
QPID-5139: HA transactions block a thread, can deadlock the broker
PrimaryTxObserver::prepare used to block pending responses from each backup. With
concurrent transactions this can deadlock the broker: once all worker threads
are blocked in prepare, responses from backups cannot be received.
This commit generalizes the async completion mechanism for messages to allow
async completion of arbitrary commands. It leaves the special-case code for
messages undisturbed but adds a second path (starting from
SessionState::handleCommand) for async completion of other commands.
In particular it implements tx.commit to allow async completion.
TxBuffer is now an AsyncCompletion and commitLocal() is split into
- startCommit() called by SemanticState::commit()
- endCommit() called when the commit command completes
TxAccept no longer holds pre-computed ranges, compute fresh each time.
- Avoid range iterators going out of date during a delayed commit.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1536754 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/ha/Primary.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 30 |
1 files changed, 17 insertions, 13 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 3a7ab3b0fc..0c1858ceb1 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -232,15 +232,16 @@ void Primary::skip( if (i != replicas.end()) i->second->addSkip(ids); } +// Called from ReplicatingSubscription::cancel void Primary::removeReplica(const ReplicatingSubscription& rs) { - sys::Mutex::ScopedLock l(lock); - replicas.erase(make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())); - - TxMap::const_iterator i = txMap.find(rs.getQueue()->getName()); - if (i != txMap.end()) { - boost::shared_ptr<PrimaryTxObserver> tx = i->second.lock(); - if (tx) tx->cancel(rs); + boost::shared_ptr<PrimaryTxObserver> tx; + { + sys::Mutex::ScopedLock l(lock); + replicas.erase(make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())); + TxMap::const_iterator i = txMap.find(rs.getQueue()->getName()); + if (i != txMap.end()) tx = i->second.lock(); } + if (tx) tx->cancel(rs); // Outside of lock. } // NOTE: Called with queue registry lock held. @@ -401,19 +402,22 @@ void Primary::setCatchupQueues(const RemoteBackupPtr& backup, bool createGuards) backup->startCatchup(); } -shared_ptr<PrimaryTxObserver> Primary::makeTxObserver() { - shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker)); +shared_ptr<PrimaryTxObserver> Primary::makeTxObserver( + const boost::intrusive_ptr<broker::TxBuffer>& txBuffer) +{ + shared_ptr<PrimaryTxObserver> observer( + new PrimaryTxObserver(*this, haBroker, txBuffer)); observer->initialize(); txMap[observer->getTxQueue()->getName()] = observer; return observer; } -void Primary::startTx(const boost::intrusive_ptr<broker::TxBuffer>& tx) { - tx->setObserver(makeTxObserver()); +void Primary::startTx(const boost::intrusive_ptr<broker::TxBuffer>& txBuffer) { + txBuffer->setObserver(makeTxObserver(txBuffer)); } -void Primary::startDtx(const boost::intrusive_ptr<broker::DtxBuffer>& dtx) { - dtx->setObserver(makeTxObserver()); +void Primary::startDtx(const boost::intrusive_ptr<broker::DtxBuffer>& ) { + QPID_LOG(notice, "DTX transactions in a HA cluster are not yet atomic"); } }} // namespace qpid::ha |