summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-09-03 20:52:51 +0000
committerAlan Conway <aconway@apache.org>2013-09-03 20:52:51 +0000
commitbec603bc053e95788c8936d9a17253bced049920 (patch)
treeccda8680b0e18d2b690a0cb64b5c5eb226422b8f /qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
parent5d510ffc8f13c649ff9bd77cc5c4f6139aaffee5 (diff)
downloadqpid-python-bec603bc053e95788c8936d9a17253bced049920.tar.gz
QPID:4327: HA support for TX transactions - fix cleanup at transaction end.
Was not removing transaction exchange at end of transaction. Fix to transaction end logic. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1519846 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp16
1 files changed, 10 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index 832d10483e..df9fd7cdb1 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -47,7 +47,7 @@ using namespace qpid::framing;
class PrimaryTxObserver::Exchange : public broker::Exchange {
public:
Exchange(const boost::shared_ptr<PrimaryTxObserver>& tx_) :
- broker::Exchange(TRANSACTION_REPLICATOR_PREFIX+tx_->getId().str()),
+ broker::Exchange(tx_->getExchangeName()),
tx(tx_)
{
dispatch[TxPrepareOkEvent::KEY] =
@@ -75,10 +75,13 @@ class PrimaryTxObserver::Exchange : public broker::Exchange {
DispatchMap dispatch;
boost::shared_ptr<PrimaryTxObserver> tx;
};
+
const string PrimaryTxObserver::Exchange::TYPE_NAME(string(QPID_HA_PREFIX)+"primary-tx-observer");
PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
- haBroker(hb), broker(hb.getBroker()), id(true), failed(false)
+ haBroker(hb), broker(hb.getBroker()), id(true),
+ exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()),
+ failed(false), ended(false)
{
logPrefix = "Primary transaction "+shortStr(id)+": ";
@@ -150,6 +153,7 @@ void PrimaryTxObserver::commit() {
sys::Mutex::ScopedLock l(lock);
QPID_LOG(debug, logPrefix << "Commit");
txQueue->deliver(TxCommitEvent().message());
+ ended = true;
end(l);
}
@@ -157,15 +161,15 @@ void PrimaryTxObserver::rollback() {
sys::Mutex::ScopedLock l(lock);
QPID_LOG(debug, logPrefix << "Rollback");
txQueue->deliver(TxRollbackEvent().message());
+ ended = true;
end(l);
}
void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) {
// Don't destroy the tx-queue if there are connected subscriptions.
- if (unfinished.empty()) {
- // Destroying the queue will result in destruction of this when
- // the queues observer references are cleared.
+ if (ended && unfinished.empty()) {
haBroker.deleteQueue(txQueue->getName());
+ broker.getExchanges().destroy(getExchangeName());
}
}
@@ -190,7 +194,7 @@ void PrimaryTxObserver::cancel(const ReplicatingSubscription& rs) {
sys::Mutex::ScopedLock l(lock);
types::Uuid backup = rs.getBrokerInfo().getSystemId();
if (unprepared.find(backup) != unprepared.end()) {
- failed = true; // Canceled before prepared.
+ ended = failed = true; // Canceled before prepared.
unprepared.erase(backup); // Consider it prepared-fail
}
unfinished.erase(backup);