summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/PrimaryTxObserver.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/PrimaryTxObserver.cpp')
-rw-r--r--cpp/src/qpid/ha/PrimaryTxObserver.cpp16
1 files changed, 10 insertions, 6 deletions
diff --git a/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index 832d10483e..df9fd7cdb1 100644
--- a/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -47,7 +47,7 @@ using namespace qpid::framing;
class PrimaryTxObserver::Exchange : public broker::Exchange {
public:
Exchange(const boost::shared_ptr<PrimaryTxObserver>& tx_) :
- broker::Exchange(TRANSACTION_REPLICATOR_PREFIX+tx_->getId().str()),
+ broker::Exchange(tx_->getExchangeName()),
tx(tx_)
{
dispatch[TxPrepareOkEvent::KEY] =
@@ -75,10 +75,13 @@ class PrimaryTxObserver::Exchange : public broker::Exchange {
DispatchMap dispatch;
boost::shared_ptr<PrimaryTxObserver> tx;
};
+
const string PrimaryTxObserver::Exchange::TYPE_NAME(string(QPID_HA_PREFIX)+"primary-tx-observer");
PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
- haBroker(hb), broker(hb.getBroker()), id(true), failed(false)
+ haBroker(hb), broker(hb.getBroker()), id(true),
+ exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()),
+ failed(false), ended(false)
{
logPrefix = "Primary transaction "+shortStr(id)+": ";
@@ -150,6 +153,7 @@ void PrimaryTxObserver::commit() {
sys::Mutex::ScopedLock l(lock);
QPID_LOG(debug, logPrefix << "Commit");
txQueue->deliver(TxCommitEvent().message());
+ ended = true;
end(l);
}
@@ -157,15 +161,15 @@ void PrimaryTxObserver::rollback() {
sys::Mutex::ScopedLock l(lock);
QPID_LOG(debug, logPrefix << "Rollback");
txQueue->deliver(TxRollbackEvent().message());
+ ended = true;
end(l);
}
void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) {
// Don't destroy the tx-queue if there are connected subscriptions.
- if (unfinished.empty()) {
- // Destroying the queue will result in destruction of this when
- // the queues observer references are cleared.
+ if (ended && unfinished.empty()) {
haBroker.deleteQueue(txQueue->getName());
+ broker.getExchanges().destroy(getExchangeName());
}
}
@@ -190,7 +194,7 @@ void PrimaryTxObserver::cancel(const ReplicatingSubscription& rs) {
sys::Mutex::ScopedLock l(lock);
types::Uuid backup = rs.getBrokerInfo().getSystemId();
if (unprepared.find(backup) != unprepared.end()) {
- failed = true; // Canceled before prepared.
+ ended = failed = true; // Canceled before prepared.
unprepared.erase(backup); // Consider it prepared-fail
}
unfinished.erase(backup);