summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/PrimaryTxObserver.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-09-03 20:52:51 +0000
committerAlan Conway <aconway@apache.org>2013-09-03 20:52:51 +0000
commit9850ccbcf9e29e62fb9df3f87f361aa9c57169a2 (patch)
treefca87ad1027287b6bf91f6a1f792a96c40ea0719 /cpp/src/qpid/ha/PrimaryTxObserver.cpp
parent5346479b7bba01bc2803df7e34ed1e0b3c7b8b4d (diff)
downloadqpid-python-9850ccbcf9e29e62fb9df3f87f361aa9c57169a2.tar.gz
QPID:4327: HA support for TX transactions - fix cleanup at transaction end.
Was not removing transaction exchange at end of transaction. Fix to transaction end logic. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1519846 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/ha/PrimaryTxObserver.cpp')
-rw-r--r--cpp/src/qpid/ha/PrimaryTxObserver.cpp16
1 files changed, 10 insertions, 6 deletions
diff --git a/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index 832d10483e..df9fd7cdb1 100644
--- a/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -47,7 +47,7 @@ using namespace qpid::framing;
class PrimaryTxObserver::Exchange : public broker::Exchange {
public:
Exchange(const boost::shared_ptr<PrimaryTxObserver>& tx_) :
- broker::Exchange(TRANSACTION_REPLICATOR_PREFIX+tx_->getId().str()),
+ broker::Exchange(tx_->getExchangeName()),
tx(tx_)
{
dispatch[TxPrepareOkEvent::KEY] =
@@ -75,10 +75,13 @@ class PrimaryTxObserver::Exchange : public broker::Exchange {
DispatchMap dispatch;
boost::shared_ptr<PrimaryTxObserver> tx;
};
+
const string PrimaryTxObserver::Exchange::TYPE_NAME(string(QPID_HA_PREFIX)+"primary-tx-observer");
PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
- haBroker(hb), broker(hb.getBroker()), id(true), failed(false)
+ haBroker(hb), broker(hb.getBroker()), id(true),
+ exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()),
+ failed(false), ended(false)
{
logPrefix = "Primary transaction "+shortStr(id)+": ";
@@ -150,6 +153,7 @@ void PrimaryTxObserver::commit() {
sys::Mutex::ScopedLock l(lock);
QPID_LOG(debug, logPrefix << "Commit");
txQueue->deliver(TxCommitEvent().message());
+ ended = true;
end(l);
}
@@ -157,15 +161,15 @@ void PrimaryTxObserver::rollback() {
sys::Mutex::ScopedLock l(lock);
QPID_LOG(debug, logPrefix << "Rollback");
txQueue->deliver(TxRollbackEvent().message());
+ ended = true;
end(l);
}
void PrimaryTxObserver::end(sys::Mutex::ScopedLock&) {
// Don't destroy the tx-queue if there are connected subscriptions.
- if (unfinished.empty()) {
- // Destroying the queue will result in destruction of this when
- // the queues observer references are cleared.
+ if (ended && unfinished.empty()) {
haBroker.deleteQueue(txQueue->getName());
+ broker.getExchanges().destroy(getExchangeName());
}
}
@@ -190,7 +194,7 @@ void PrimaryTxObserver::cancel(const ReplicatingSubscription& rs) {
sys::Mutex::ScopedLock l(lock);
types::Uuid backup = rs.getBrokerInfo().getSystemId();
if (unprepared.find(backup) != unprepared.end()) {
- failed = true; // Canceled before prepared.
+ ended = failed = true; // Canceled before prepared.
unprepared.erase(backup); // Consider it prepared-fail
}
unfinished.erase(backup);