diff options
author | Alan Conway <aconway@apache.org> | 2013-08-05 19:33:35 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2013-08-05 19:33:35 +0000 |
commit | 8b4ce07e63b3ed12b43ae82fc487657c6e18f5e4 (patch) | |
tree | 7961e97040227d8253bb8cdc6bf7de11ec92d8de /qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp | |
parent | 866bf57249e41266cd713a5f73e3d18d216fad31 (diff) | |
download | qpid-python-8b4ce07e63b3ed12b43ae82fc487657c6e18f5e4.tar.gz |
QPID-4327: HA Handle brokers joining and leaving during a transaction.
During a transaction:
- A broker leaving aborts the transaction.
- A broker joining does not participate in the transaction
- but does receive the results of the TX via normal replication.
Clean up tx-queues when the transaction completes.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1510678 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp | 40 |
1 files changed, 33 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp index 96be5e552e..7307f15fbe 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp @@ -31,6 +31,7 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include <boost/lexical_cast.hpp> +#include <algorithm> namespace qpid { namespace framing { @@ -87,9 +88,11 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) : // when the tx-queue is deleted. // BrokerInfo::Set infoSet(haBroker.getMembership().otherBackups()); - std::transform(infoSet.begin(), infoSet.end(), inserter(backups, backups.begin()), - boost::bind(&BrokerInfo::getSystemId, _1)); - QPID_LOG(debug, logPrefix << "Started on " << backups); + std::transform(infoSet.begin(), infoSet.end(), inserter(members, members.begin()), + boost::bind(&BrokerInfo::getSystemId, _1)); + + QPID_LOG(debug, logPrefix << "Started TX " << id); + QPID_LOG(debug, logPrefix << "Members: " << members); pair<QueuePtr, bool> result = broker.getQueues().declare( @@ -97,8 +100,15 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) : QueueSettings(/*durable*/false, /*autodelete*/true)); assert(result.second); txQueue = result.first; + txQueue->deliver(TxMembersEvent(members).message()); + // Do this last, it will start concurrent callbacks. + haBroker.getMembership().addCallback( + boost::bind(&PrimaryTxObserver::membership, this, _1)); } +PrimaryTxObserver::~PrimaryTxObserver() {} + + void PrimaryTxObserver::initialize() { broker.getExchanges().registerExchange( boost::shared_ptr<Exchange>(new Exchange(shared_from_this()))); @@ -125,7 +135,7 @@ void PrimaryTxObserver::deduplicate(sys::Mutex::ScopedLock&) { boost::shared_ptr<Primary> primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole())); assert(primary); // Tell replicating subscriptions to skip IDs in the transaction. - for (UuidSet::iterator b = backups.begin(); b != backups.end(); ++b) + for (UuidSet::iterator b = members.begin(); b != members.end(); ++b) for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q) primary->skip(*b, q->first, q->second); } @@ -136,7 +146,8 @@ bool PrimaryTxObserver::prepare() { QPID_LOG(debug, logPrefix << "Prepare"); deduplicate(l); txQueue->deliver(TxPrepareEvent().message()); - while (!isPrepared(l)) lock.wait(); + while (prepared != members && !failed) + lock.wait(); return !failed; } @@ -144,12 +155,20 @@ void PrimaryTxObserver::commit() { sys::Mutex::ScopedLock l(lock); QPID_LOG(debug, logPrefix << "Commit"); txQueue->deliver(TxCommitEvent().message()); + destroy(); } void PrimaryTxObserver::rollback() { sys::Mutex::ScopedLock l(lock); QPID_LOG(debug, logPrefix << "Rollback"); txQueue->deliver(TxRollbackEvent().message()); + destroy(); +} + +void PrimaryTxObserver::destroy() { + // Destroying the queue will result in destruction of this when + // the queues observer references are cleared. + haBroker.deleteQueue(txQueue->getName()); } void PrimaryTxObserver::txPrepareOkEvent(const string& data) { @@ -169,8 +188,15 @@ void PrimaryTxObserver::txPrepareFailEvent(const string& data) { lock.notify(); } -bool PrimaryTxObserver::isPrepared(sys::Mutex::ScopedLock&) { - return (prepared == backups || failed); +void PrimaryTxObserver::membership(const BrokerInfo::Map& update) { + sys::Mutex::ScopedLock l(lock); + for (UuidSet::iterator i = members.begin(); i != members.end(); ++i) + if (!update.count(*i)) { // A broker is down + failed = true; + lock.notify(); + return; + } } + }} // namespace qpid::ha |