diff options
author | Alan Conway <aconway@apache.org> | 2013-08-30 14:43:06 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2013-08-30 14:43:06 +0000 |
commit | 54cdb4dcada8cfeb23d756e4980e701ebb382c13 (patch) | |
tree | f9ce23279cffe298d1a3953489355214b827e530 /qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | |
parent | 27d31ba355acfef3ec66c23e48864e88a358014b (diff) | |
download | qpid-python-54cdb4dcada8cfeb23d756e4980e701ebb382c13.tar.gz |
QPID-4327: HA clean up transaction artifacts at end of TX.
- Backups delete transactions on failover.
- TxReplicator cancel subscriptions when transaction is finished.
- TxReplicator rollback if destroyed prematurely.
- Handle special case of no backups for a tx.
- ha_tests.py: new and modified tests to cover the new functionality.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1518982 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/ha/BrokerReplicator.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 80 |
1 files changed, 49 insertions, 31 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 684f408c4b..36bf89fb81 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -229,8 +229,8 @@ class BrokerReplicator::UpdateTracker { typedef boost::function<void (const std::string&)> CleanFn; UpdateTracker(const std::string& type_, // "queue" or "exchange" - CleanFn f, const ReplicationTest& rt) - : type(type_), cleanFn(f), repTest(rt) {} + CleanFn f) + : type(type_), cleanFn(f) {} /** Destructor cleans up remaining initial queues. */ ~UpdateTracker() { @@ -245,16 +245,10 @@ class BrokerReplicator::UpdateTracker { } /** Add an exchange name */ - void addExchange(Exchange::shared_ptr ex) { - if (repTest.getLevel(*ex)) - initial.insert(ex->getName()); - } + void addExchange(Exchange::shared_ptr ex) { initial.insert(ex->getName()); } /** Add a queue name. */ - void addQueue(Queue::shared_ptr q) { - if (repTest.getLevel(*q)) - initial.insert(q->getName()); - } + void addQueue(Queue::shared_ptr q) { initial.insert(q->getName()); } /** Received an event for name */ void event(const std::string& name) { @@ -281,7 +275,6 @@ class BrokerReplicator::UpdateTracker { std::string type; Names initial, events; CleanFn cleanFn; - ReplicationTest repTest; }; namespace { @@ -349,7 +342,8 @@ BrokerReplicator::~BrokerReplicator() { shutdown(); } namespace { void collectQueueReplicators( - const boost::shared_ptr<Exchange> ex, set<boost::shared_ptr<QueueReplicator> >& collect) + const boost::shared_ptr<Exchange>& ex, + set<boost::shared_ptr<QueueReplicator> >& collect) { boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex)); if (qr) collect.insert(qr); @@ -390,16 +384,13 @@ void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler) exchangeTracker.reset( new UpdateTracker("exchange", - boost::bind(&BrokerReplicator::deleteExchange, this, _1), - replicationTest)); - exchanges.eachExchange( - boost::bind(&UpdateTracker::addExchange, exchangeTracker.get(), _1)); + boost::bind(&BrokerReplicator::deleteExchange, this, _1))); + exchanges.eachExchange(boost::bind(&BrokerReplicator::existingExchange, this, _1)); queueTracker.reset( new UpdateTracker("queue", - boost::bind(&BrokerReplicator::deleteQueue, this, _1, true), - replicationTest)); - queues.eachQueue(boost::bind(&UpdateTracker::addQueue, queueTracker.get(), _1)); + boost::bind(&BrokerReplicator::deleteQueue, this, _1, true))); + queues.eachQueue(boost::bind(&BrokerReplicator::existingQueue, this, _1)); framing::AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); @@ -428,6 +419,21 @@ void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler) sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler); } +// Called for each queue in existence when the backup connects to a primary. +void BrokerReplicator::existingQueue(const boost::shared_ptr<Queue>& q) { + if (replicationTest.getLevel(*q)) { + QPID_LOG(debug, "Existing queue: " << q->getName()); + queueTracker->addQueue(q); + } +} + +void BrokerReplicator::existingExchange(const boost::shared_ptr<Exchange>& ex) { + if (replicationTest.getLevel(*ex)) { + QPID_LOG(debug, "Existing exchange: " << ex->getName()); + exchangeTracker->addExchange(ex); + } +} + void BrokerReplicator::route(Deliverable& msg) { // We transition from JOINING->CATCHUP on the first message received from the primary. // Until now we couldn't be sure if we had a good connection to the primary. @@ -890,24 +896,36 @@ void BrokerReplicator::autoDeleteCheck(boost::shared_ptr<Exchange> ex) { } } -// Callback function for accumulating exchange candidates -namespace { - void exchangeAccumulatorCallback(vector<boost::shared_ptr<Exchange> >& c, const Exchange::shared_ptr& i) { - c.push_back(i); - } -} +typedef vector<boost::shared_ptr<Exchange> > ExchangeVector; +typedef vector<boost::shared_ptr<Queue> > QueueVector; // Called by ConnectionObserver::disconnected, disconnected from the network side. void BrokerReplicator::disconnected() { QPID_LOG(info, logPrefix << "Disconnected from primary " << primary); connection = 0; - // Clean up auto-delete queues - vector<boost::shared_ptr<Exchange> > collect; - // Make a copy so we can work outside the ExchangeRegistry lock - exchanges.eachExchange( - boost::bind(&exchangeAccumulatorCallback, boost::ref(collect), _1)); - for_each(collect.begin(), collect.end(), + + // Make copys of queues & exchanges so we can work outside the registry lock. + + ExchangeVector exs; + exchanges.eachExchange(boost::bind(&ExchangeVector::push_back, &exs, _1)); + for_each(exs.begin(), exs.end(), boost::bind(&BrokerReplicator::autoDeleteCheck, this, _1)); + + QueueVector qs; + queues.eachQueue(boost::bind(&QueueVector::push_back, &qs, _1)); + for_each(qs.begin(), qs.end(), + boost::bind(&BrokerReplicator::disconnectedQueue, this, _1)); +} + +// Called for queues existing when the backup is disconnected. +void BrokerReplicator::disconnectedQueue(const boost::shared_ptr<Queue>& q) { + QPID_LOG(critical, "BrokerReplicator::disconnectedQueue" << q->getName()); + boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(q->getName()); + if (qr) { + qr->disconnect(); + if (TxReplicator::isTxQueue(q->getName())) + deleteQueue(q->getName()); + } } void BrokerReplicator::setMembership(const Variant::List& brokers) { |