diff options
Diffstat (limited to 'cpp/src/qpid/ha/PrimaryTxObserver.cpp')
| -rw-r--r-- | cpp/src/qpid/ha/PrimaryTxObserver.cpp | 46 |
1 files changed, 24 insertions, 22 deletions
diff --git a/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/cpp/src/qpid/ha/PrimaryTxObserver.cpp index 7307f15fbe..f13edfb31e 100644 --- a/cpp/src/qpid/ha/PrimaryTxObserver.cpp +++ b/cpp/src/qpid/ha/PrimaryTxObserver.cpp @@ -87,12 +87,13 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) : // Latecomers that have replicated the transaction will be rolled back // when the tx-queue is deleted. // - BrokerInfo::Set infoSet(haBroker.getMembership().otherBackups()); - std::transform(infoSet.begin(), infoSet.end(), inserter(members, members.begin()), + BrokerInfo::Set backups(haBroker.getMembership().otherBackups()); + std::transform(backups.begin(), backups.end(), inserter(members, members.begin()), boost::bind(&BrokerInfo::getSystemId, _1)); QPID_LOG(debug, logPrefix << "Started TX " << id); QPID_LOG(debug, logPrefix << "Members: " << members); + unprepared = unfinished = members; pair<QueuePtr, bool> result = broker.getQueues().declare( @@ -102,8 +103,6 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) : txQueue = result.first; txQueue->deliver(TxMembersEvent(members).message()); // Do this last, it will start concurrent callbacks. - haBroker.getMembership().addCallback( - boost::bind(&PrimaryTxObserver::membership, this, _1)); } PrimaryTxObserver::~PrimaryTxObserver() {} @@ -146,8 +145,7 @@ bool PrimaryTxObserver::prepare() { QPID_LOG(debug, logPrefix << "Prepare"); deduplicate(l); txQueue->deliver(TxPrepareEvent().message()); - while (prepared != members && !failed) - lock.wait(); + while (!unprepared.empty() && !failed) lock.wait(); return !failed; } @@ -155,27 +153,30 @@ void PrimaryTxObserver::commit() { sys::Mutex::ScopedLock l(lock); QPID_LOG(debug, logPrefix << "Commit"); txQueue->deliver(TxCommitEvent().message()); - destroy(); + end(l); } void PrimaryTxObserver::rollback() { sys::Mutex::ScopedLock l(lock); QPID_LOG(debug, logPrefix << "Rollback"); txQueue->deliver(TxRollbackEvent().message()); - destroy(); + end(l); } -void PrimaryTxObserver::destroy() { - // Destroying the queue will result in destruction of this when - // the queues observer references are cleared. - haBroker.deleteQueue(txQueue->getName()); +void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) { + // Don't destroy the tx-queue if there are connected subscriptions. + if (unfinished.empty()) { + // Destroying the queue will result in destruction of this when + // the queues observer references are cleared. + haBroker.deleteQueue(txQueue->getName()); + } } void PrimaryTxObserver::txPrepareOkEvent(const string& data) { sys::Mutex::ScopedLock l(lock); types::Uuid backup = decodeStr<TxPrepareOkEvent>(data).broker; QPID_LOG(debug, logPrefix << "Backup prepared ok: " << backup); - prepared.insert(backup); + unprepared.erase(backup); lock.notify(); } @@ -183,20 +184,21 @@ void PrimaryTxObserver::txPrepareFailEvent(const string& data) { sys::Mutex::ScopedLock l(lock); types::Uuid backup = decodeStr<TxPrepareFailEvent>(data).broker; QPID_LOG(error, logPrefix << "Backup prepare failed: " << backup); - prepared.insert(backup); + unprepared.erase(backup); failed = true; lock.notify(); } -void PrimaryTxObserver::membership(const BrokerInfo::Map& update) { +void PrimaryTxObserver::cancel(const ReplicatingSubscription& rs) { sys::Mutex::ScopedLock l(lock); - for (UuidSet::iterator i = members.begin(); i != members.end(); ++i) - if (!update.count(*i)) { // A broker is down - failed = true; - lock.notify(); - return; - } + types::Uuid backup = rs.getBrokerInfo().getSystemId(); + if (unprepared.find(backup) != unprepared.end()) { + failed = true; // Canceled before prepared. + unprepared.erase(backup); // Consider it prepared-fail + } + unfinished.erase(backup); + lock.notify(); + end(l); } - }} // namespace qpid::ha |
