summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/PrimaryTxObserver.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/PrimaryTxObserver.cpp')
-rw-r--r--cpp/src/qpid/ha/PrimaryTxObserver.cpp46
1 files changed, 24 insertions, 22 deletions
diff --git a/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index 7307f15fbe..f13edfb31e 100644
--- a/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -87,12 +87,13 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
// Latecomers that have replicated the transaction will be rolled back
// when the tx-queue is deleted.
//
- BrokerInfo::Set infoSet(haBroker.getMembership().otherBackups());
- std::transform(infoSet.begin(), infoSet.end(), inserter(members, members.begin()),
+ BrokerInfo::Set backups(haBroker.getMembership().otherBackups());
+ std::transform(backups.begin(), backups.end(), inserter(members, members.begin()),
boost::bind(&BrokerInfo::getSystemId, _1));
QPID_LOG(debug, logPrefix << "Started TX " << id);
QPID_LOG(debug, logPrefix << "Members: " << members);
+ unprepared = unfinished = members;
pair<QueuePtr, bool> result =
broker.getQueues().declare(
@@ -102,8 +103,6 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
txQueue = result.first;
txQueue->deliver(TxMembersEvent(members).message());
// Do this last, it will start concurrent callbacks.
- haBroker.getMembership().addCallback(
- boost::bind(&PrimaryTxObserver::membership, this, _1));
}
PrimaryTxObserver::~PrimaryTxObserver() {}
@@ -146,8 +145,7 @@ bool PrimaryTxObserver::prepare() {
QPID_LOG(debug, logPrefix << "Prepare");
deduplicate(l);
txQueue->deliver(TxPrepareEvent().message());
- while (prepared != members && !failed)
- lock.wait();
+ while (!unprepared.empty() && !failed) lock.wait();
return !failed;
}
@@ -155,27 +153,30 @@ void PrimaryTxObserver::commit() {
sys::Mutex::ScopedLock l(lock);
QPID_LOG(debug, logPrefix << "Commit");
txQueue->deliver(TxCommitEvent().message());
- destroy();
+ end(l);
}
void PrimaryTxObserver::rollback() {
sys::Mutex::ScopedLock l(lock);
QPID_LOG(debug, logPrefix << "Rollback");
txQueue->deliver(TxRollbackEvent().message());
- destroy();
+ end(l);
}
-void PrimaryTxObserver::destroy() {
- // Destroying the queue will result in destruction of this when
- // the queues observer references are cleared.
- haBroker.deleteQueue(txQueue->getName());
+void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) {
+ // Don't destroy the tx-queue if there are connected subscriptions.
+ if (unfinished.empty()) {
+ // Destroying the queue will result in destruction of this when
+ // the queues observer references are cleared.
+ haBroker.deleteQueue(txQueue->getName());
+ }
}
void PrimaryTxObserver::txPrepareOkEvent(const string& data) {
sys::Mutex::ScopedLock l(lock);
types::Uuid backup = decodeStr<TxPrepareOkEvent>(data).broker;
QPID_LOG(debug, logPrefix << "Backup prepared ok: " << backup);
- prepared.insert(backup);
+ unprepared.erase(backup);
lock.notify();
}
@@ -183,20 +184,21 @@ void PrimaryTxObserver::txPrepareFailEvent(const string& data) {
sys::Mutex::ScopedLock l(lock);
types::Uuid backup = decodeStr<TxPrepareFailEvent>(data).broker;
QPID_LOG(error, logPrefix << "Backup prepare failed: " << backup);
- prepared.insert(backup);
+ unprepared.erase(backup);
failed = true;
lock.notify();
}
-void PrimaryTxObserver::membership(const BrokerInfo::Map& update) {
+void PrimaryTxObserver::cancel(const ReplicatingSubscription& rs) {
sys::Mutex::ScopedLock l(lock);
- for (UuidSet::iterator i = members.begin(); i != members.end(); ++i)
- if (!update.count(*i)) { // A broker is down
- failed = true;
- lock.notify();
- return;
- }
+ types::Uuid backup = rs.getBrokerInfo().getSystemId();
+ if (unprepared.find(backup) != unprepared.end()) {
+ failed = true; // Canceled before prepared.
+ unprepared.erase(backup); // Consider it prepared-fail
+ }
+ unfinished.erase(backup);
+ lock.notify();
+ end(l);
}
-
}} // namespace qpid::ha