summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/Primary.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-08-30 14:43:06 +0000
committerAlan Conway <aconway@apache.org>2013-08-30 14:43:06 +0000
commit54cdb4dcada8cfeb23d756e4980e701ebb382c13 (patch)
treef9ce23279cffe298d1a3953489355214b827e530 /qpid/cpp/src/qpid/ha/Primary.cpp
parent27d31ba355acfef3ec66c23e48864e88a358014b (diff)
downloadqpid-python-54cdb4dcada8cfeb23d756e4980e701ebb382c13.tar.gz
QPID-4327: HA clean up transaction artifacts at end of TX.
- Backups delete transactions on failover. - TxReplicator cancel subscriptions when transaction is finished. - TxReplicator rollback if destroyed prematurely. - Handle special case of no backups for a tx. - ha_tests.py: new and modified tests to cover the new functionality. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1518982 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/ha/Primary.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp21
1 files changed, 16 insertions, 5 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index c71342cbc6..606e6452d3 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -214,6 +214,8 @@ void Primary::readyReplica(const ReplicatingSubscription& rs) {
}
void Primary::addReplica(ReplicatingSubscription& rs) {
+ // Note this is called before the ReplicatingSubscription has been activated
+ // on the queue.
sys::Mutex::ScopedLock l(lock);
replicas[make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())] = &rs;
}
@@ -231,6 +233,12 @@ void Primary::skip(
void Primary::removeReplica(const ReplicatingSubscription& rs) {
sys::Mutex::ScopedLock l(lock);
replicas.erase(make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue()));
+
+ TxMap::const_iterator i = txMap.find(rs.getQueue()->getName());
+ if (i != txMap.end()) {
+ boost::shared_ptr<PrimaryTxObserver> tx = i->second.lock();
+ if (tx) tx->cancel(rs);
+ }
}
// NOTE: Called with queue registry lock held.
@@ -387,16 +395,19 @@ void Primary::setCatchupQueues(const RemoteBackupPtr& backup, bool createGuards)
backup->startCatchup();
}
-void Primary::startTx(const boost::shared_ptr<broker::TxBuffer>& tx) {
+shared_ptr<PrimaryTxObserver> Primary::makeTxObserver() {
shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker));
observer->initialize();
- tx->setObserver(observer);
+ txMap[observer->getTxQueue()->getName()] = observer;
+ return observer;
+}
+
+void Primary::startTx(const boost::shared_ptr<broker::TxBuffer>& tx) {
+ tx->setObserver(makeTxObserver());
}
void Primary::startDtx(const boost::shared_ptr<broker::DtxBuffer>& dtx) {
- shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker));
- observer->initialize();
- dtx->setObserver(observer);
+ dtx->setObserver(makeTxObserver());
}
}} // namespace qpid::ha