diff options
author | Alan Conway <aconway@apache.org> | 2013-08-30 14:43:06 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2013-08-30 14:43:06 +0000 |
commit | 54cdb4dcada8cfeb23d756e4980e701ebb382c13 (patch) | |
tree | f9ce23279cffe298d1a3953489355214b827e530 /qpid/cpp/src/qpid/ha/Primary.cpp | |
parent | 27d31ba355acfef3ec66c23e48864e88a358014b (diff) | |
download | qpid-python-54cdb4dcada8cfeb23d756e4980e701ebb382c13.tar.gz |
QPID-4327: HA clean up transaction artifacts at end of TX.
- Backups delete transactions on failover.
- TxReplicator cancel subscriptions when transaction is finished.
- TxReplicator rollback if destroyed prematurely.
- Handle special case of no backups for a tx.
- ha_tests.py: new and modified tests to cover the new functionality.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1518982 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/ha/Primary.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 21 |
1 files changed, 16 insertions, 5 deletions
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index c71342cbc6..606e6452d3 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -214,6 +214,8 @@ void Primary::readyReplica(const ReplicatingSubscription& rs) { } void Primary::addReplica(ReplicatingSubscription& rs) { + // Note this is called before the ReplicatingSubscription has been activated + // on the queue. sys::Mutex::ScopedLock l(lock); replicas[make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())] = &rs; } @@ -231,6 +233,12 @@ void Primary::skip( void Primary::removeReplica(const ReplicatingSubscription& rs) { sys::Mutex::ScopedLock l(lock); replicas.erase(make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())); + + TxMap::const_iterator i = txMap.find(rs.getQueue()->getName()); + if (i != txMap.end()) { + boost::shared_ptr<PrimaryTxObserver> tx = i->second.lock(); + if (tx) tx->cancel(rs); + } } // NOTE: Called with queue registry lock held. @@ -387,16 +395,19 @@ void Primary::setCatchupQueues(const RemoteBackupPtr& backup, bool createGuards) backup->startCatchup(); } -void Primary::startTx(const boost::shared_ptr<broker::TxBuffer>& tx) { +shared_ptr<PrimaryTxObserver> Primary::makeTxObserver() { shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker)); observer->initialize(); - tx->setObserver(observer); + txMap[observer->getTxQueue()->getName()] = observer; + return observer; +} + +void Primary::startTx(const boost::shared_ptr<broker::TxBuffer>& tx) { + tx->setObserver(makeTxObserver()); } void Primary::startDtx(const boost::shared_ptr<broker::DtxBuffer>& dtx) { - shared_ptr<PrimaryTxObserver> observer(new PrimaryTxObserver(haBroker)); - observer->initialize(); - dtx->setObserver(observer); + dtx->setObserver(makeTxObserver()); } }} // namespace qpid::ha |