summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-08-05 19:33:35 +0000
committerAlan Conway <aconway@apache.org>2013-08-05 19:33:35 +0000
commit8b4ce07e63b3ed12b43ae82fc487657c6e18f5e4 (patch)
tree7961e97040227d8253bb8cdc6bf7de11ec92d8de /qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
parent866bf57249e41266cd713a5f73e3d18d216fad31 (diff)
downloadqpid-python-8b4ce07e63b3ed12b43ae82fc487657c6e18f5e4.tar.gz
QPID-4327: HA Handle brokers joining and leaving during a transaction.
During a transaction: - A broker leaving aborts the transaction. - A broker joining does not participate in the transaction - but does receive the results of the TX via normal replication. Clean up tx-queues when the transaction completes. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1510678 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp40
1 files changed, 33 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index 96be5e552e..7307f15fbe 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -31,6 +31,7 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include <boost/lexical_cast.hpp>
+#include <algorithm>
namespace qpid {
namespace framing {
@@ -87,9 +88,11 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
// when the tx-queue is deleted.
//
BrokerInfo::Set infoSet(haBroker.getMembership().otherBackups());
- std::transform(infoSet.begin(), infoSet.end(), inserter(backups, backups.begin()),
- boost::bind(&BrokerInfo::getSystemId, _1));
- QPID_LOG(debug, logPrefix << "Started on " << backups);
+ std::transform(infoSet.begin(), infoSet.end(), inserter(members, members.begin()),
+ boost::bind(&BrokerInfo::getSystemId, _1));
+
+ QPID_LOG(debug, logPrefix << "Started TX " << id);
+ QPID_LOG(debug, logPrefix << "Members: " << members);
pair<QueuePtr, bool> result =
broker.getQueues().declare(
@@ -97,8 +100,15 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
QueueSettings(/*durable*/false, /*autodelete*/true));
assert(result.second);
txQueue = result.first;
+ txQueue->deliver(TxMembersEvent(members).message());
+ // Do this last, it will start concurrent callbacks.
+ haBroker.getMembership().addCallback(
+ boost::bind(&PrimaryTxObserver::membership, this, _1));
}
+PrimaryTxObserver::~PrimaryTxObserver() {}
+
+
void PrimaryTxObserver::initialize() {
broker.getExchanges().registerExchange(
boost::shared_ptr<Exchange>(new Exchange(shared_from_this())));
@@ -125,7 +135,7 @@ void PrimaryTxObserver::deduplicate(sys::Mutex::ScopedLock&) {
boost::shared_ptr<Primary> primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole()));
assert(primary);
// Tell replicating subscriptions to skip IDs in the transaction.
- for (UuidSet::iterator b = backups.begin(); b != backups.end(); ++b)
+ for (UuidSet::iterator b = members.begin(); b != members.end(); ++b)
for (QueueIdsMap::iterator q = enqueues.begin(); q != enqueues.end(); ++q)
primary->skip(*b, q->first, q->second);
}
@@ -136,7 +146,8 @@ bool PrimaryTxObserver::prepare() {
QPID_LOG(debug, logPrefix << "Prepare");
deduplicate(l);
txQueue->deliver(TxPrepareEvent().message());
- while (!isPrepared(l)) lock.wait();
+ while (prepared != members && !failed)
+ lock.wait();
return !failed;
}
@@ -144,12 +155,20 @@ void PrimaryTxObserver::commit() {
sys::Mutex::ScopedLock l(lock);
QPID_LOG(debug, logPrefix << "Commit");
txQueue->deliver(TxCommitEvent().message());
+ destroy();
}
void PrimaryTxObserver::rollback() {
sys::Mutex::ScopedLock l(lock);
QPID_LOG(debug, logPrefix << "Rollback");
txQueue->deliver(TxRollbackEvent().message());
+ destroy();
+}
+
+void PrimaryTxObserver::destroy() {
+ // Destroying the queue will result in destruction of this when
+ // the queues observer references are cleared.
+ haBroker.deleteQueue(txQueue->getName());
}
void PrimaryTxObserver::txPrepareOkEvent(const string& data) {
@@ -169,8 +188,15 @@ void PrimaryTxObserver::txPrepareFailEvent(const string& data) {
lock.notify();
}
-bool PrimaryTxObserver::isPrepared(sys::Mutex::ScopedLock&) {
- return (prepared == backups || failed);
+void PrimaryTxObserver::membership(const BrokerInfo::Map& update) {
+ sys::Mutex::ScopedLock l(lock);
+ for (UuidSet::iterator i = members.begin(); i != members.end(); ++i)
+ if (!update.count(*i)) { // A broker is down
+ failed = true;
+ lock.notify();
+ return;
+ }
}
+
}} // namespace qpid::ha