diff options
author | Alan Conway <aconway@apache.org> | 2013-09-03 20:52:51 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2013-09-03 20:52:51 +0000 |
commit | bec603bc053e95788c8936d9a17253bced049920 (patch) | |
tree | ccda8680b0e18d2b690a0cb64b5c5eb226422b8f /qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp | |
parent | 5d510ffc8f13c649ff9bd77cc5c4f6139aaffee5 (diff) | |
download | qpid-python-bec603bc053e95788c8936d9a17253bced049920.tar.gz |
QPID:4327: HA support for TX transactions - fix cleanup at transaction end.
Was not removing transaction exchange at end of transaction.
Fix to transaction end logic.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1519846 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp | 16 |
1 files changed, 10 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp index 832d10483e..df9fd7cdb1 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp @@ -47,7 +47,7 @@ using namespace qpid::framing; class PrimaryTxObserver::Exchange : public broker::Exchange { public: Exchange(const boost::shared_ptr<PrimaryTxObserver>& tx_) : - broker::Exchange(TRANSACTION_REPLICATOR_PREFIX+tx_->getId().str()), + broker::Exchange(tx_->getExchangeName()), tx(tx_) { dispatch[TxPrepareOkEvent::KEY] = @@ -75,10 +75,13 @@ class PrimaryTxObserver::Exchange : public broker::Exchange { DispatchMap dispatch; boost::shared_ptr<PrimaryTxObserver> tx; }; + const string PrimaryTxObserver::Exchange::TYPE_NAME(string(QPID_HA_PREFIX)+"primary-tx-observer"); PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) : - haBroker(hb), broker(hb.getBroker()), id(true), failed(false) + haBroker(hb), broker(hb.getBroker()), id(true), + exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()), + failed(false), ended(false) { logPrefix = "Primary transaction "+shortStr(id)+": "; @@ -150,6 +153,7 @@ void PrimaryTxObserver::commit() { sys::Mutex::ScopedLock l(lock); QPID_LOG(debug, logPrefix << "Commit"); txQueue->deliver(TxCommitEvent().message()); + ended = true; end(l); } @@ -157,15 +161,15 @@ void PrimaryTxObserver::rollback() { sys::Mutex::ScopedLock l(lock); QPID_LOG(debug, logPrefix << "Rollback"); txQueue->deliver(TxRollbackEvent().message()); + ended = true; end(l); } void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) { // Don't destroy the tx-queue if there are connected subscriptions. - if (unfinished.empty()) { - // Destroying the queue will result in destruction of this when - // the queues observer references are cleared. + if (ended && unfinished.empty()) { haBroker.deleteQueue(txQueue->getName()); + broker.getExchanges().destroy(getExchangeName()); } } @@ -190,7 +194,7 @@ void PrimaryTxObserver::cancel(const ReplicatingSubscription& rs) { sys::Mutex::ScopedLock l(lock); types::Uuid backup = rs.getBrokerInfo().getSystemId(); if (unprepared.find(backup) != unprepared.end()) { - failed = true; // Canceled before prepared. + ended = failed = true; // Canceled before prepared. unprepared.erase(backup); // Consider it prepared-fail } unfinished.erase(backup); |