diff options
Diffstat (limited to 'cpp/src/qpid/ha/PrimaryTxObserver.cpp')
| -rw-r--r-- | cpp/src/qpid/ha/PrimaryTxObserver.cpp | 16 |
1 files changed, 10 insertions, 6 deletions
diff --git a/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/cpp/src/qpid/ha/PrimaryTxObserver.cpp index 832d10483e..df9fd7cdb1 100644 --- a/cpp/src/qpid/ha/PrimaryTxObserver.cpp +++ b/cpp/src/qpid/ha/PrimaryTxObserver.cpp @@ -47,7 +47,7 @@ using namespace qpid::framing; class PrimaryTxObserver::Exchange : public broker::Exchange { public: Exchange(const boost::shared_ptr<PrimaryTxObserver>& tx_) : - broker::Exchange(TRANSACTION_REPLICATOR_PREFIX+tx_->getId().str()), + broker::Exchange(tx_->getExchangeName()), tx(tx_) { dispatch[TxPrepareOkEvent::KEY] = @@ -75,10 +75,13 @@ class PrimaryTxObserver::Exchange : public broker::Exchange { DispatchMap dispatch; boost::shared_ptr<PrimaryTxObserver> tx; }; + const string PrimaryTxObserver::Exchange::TYPE_NAME(string(QPID_HA_PREFIX)+"primary-tx-observer"); PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) : - haBroker(hb), broker(hb.getBroker()), id(true), failed(false) + haBroker(hb), broker(hb.getBroker()), id(true), + exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()), + failed(false), ended(false) { logPrefix = "Primary transaction "+shortStr(id)+": "; @@ -150,6 +153,7 @@ void PrimaryTxObserver::commit() { sys::Mutex::ScopedLock l(lock); QPID_LOG(debug, logPrefix << "Commit"); txQueue->deliver(TxCommitEvent().message()); + ended = true; end(l); } @@ -157,15 +161,15 @@ void PrimaryTxObserver::rollback() { sys::Mutex::ScopedLock l(lock); QPID_LOG(debug, logPrefix << "Rollback"); txQueue->deliver(TxRollbackEvent().message()); + ended = true; end(l); } void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) { // Don't destroy the tx-queue if there are connected subscriptions. - if (unfinished.empty()) { - // Destroying the queue will result in destruction of this when - // the queues observer references are cleared. + if (ended && unfinished.empty()) { haBroker.deleteQueue(txQueue->getName()); + broker.getExchanges().destroy(getExchangeName()); } } @@ -190,7 +194,7 @@ void PrimaryTxObserver::cancel(const ReplicatingSubscription& rs) { sys::Mutex::ScopedLock l(lock); types::Uuid backup = rs.getBrokerInfo().getSystemId(); if (unprepared.find(backup) != unprepared.end()) { - failed = true; // Canceled before prepared. + ended = failed = true; // Canceled before prepared. unprepared.erase(backup); // Consider it prepared-fail } unfinished.erase(backup); |
