diff options
Diffstat (limited to 'cpp/src/qpid/ha/BrokerReplicator.cpp')
| -rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.cpp | 51 |
1 files changed, 19 insertions, 32 deletions
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp index 36bf89fb81..e7a0218dd8 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -878,54 +878,41 @@ bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, co string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; } -void BrokerReplicator::autoDeleteCheck(boost::shared_ptr<Exchange> ex) { +void BrokerReplicator::disconnectedExchange(boost::shared_ptr<Exchange> ex) { boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex)); - if (!qr) return; - assert(qr); - if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) { - if (qr->getQueue()->getSettings().autoDeleteDelay) { - // Start the auto-delete timer - qr->getQueue()->releaseFromUse(); - qr->getQueue()->scheduleAutoDelete(); + if (qr) { + qr->disconnect(); + if (TxReplicator::isTxQueue(qr->getQueue()->getName())) { + // Transactions are aborted on failover so clean up tx-queues + deleteQueue(qr->getQueue()->getName()); } - else { - // Delete immediately. Don't purge, the primary is gone so we need - // to reroute the deleted messages. - deleteQueue(qr->getQueue()->getName(), false); + else if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) { + if (qr->getQueue()->getSettings().autoDeleteDelay) { + // Start the auto-delete timer + qr->getQueue()->releaseFromUse(); + qr->getQueue()->scheduleAutoDelete(); + } + else { + // Delete immediately. Don't purge, the primary is gone so we need + // to reroute the deleted messages. + deleteQueue(qr->getQueue()->getName(), false); + } } } } typedef vector<boost::shared_ptr<Exchange> > ExchangeVector; -typedef vector<boost::shared_ptr<Queue> > QueueVector; // Called by ConnectionObserver::disconnected, disconnected from the network side. void BrokerReplicator::disconnected() { QPID_LOG(info, logPrefix << "Disconnected from primary " << primary); connection = 0; - // Make copys of queues & exchanges so we can work outside the registry lock. - + // Make copy of exchanges so we can work outside the registry lock. ExchangeVector exs; exchanges.eachExchange(boost::bind(&ExchangeVector::push_back, &exs, _1)); for_each(exs.begin(), exs.end(), - boost::bind(&BrokerReplicator::autoDeleteCheck, this, _1)); - - QueueVector qs; - queues.eachQueue(boost::bind(&QueueVector::push_back, &qs, _1)); - for_each(qs.begin(), qs.end(), - boost::bind(&BrokerReplicator::disconnectedQueue, this, _1)); -} - -// Called for queues existing when the backup is disconnected. -void BrokerReplicator::disconnectedQueue(const boost::shared_ptr<Queue>& q) { - QPID_LOG(critical, "BrokerReplicator::disconnectedQueue" << q->getName()); - boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(q->getName()); - if (qr) { - qr->disconnect(); - if (TxReplicator::isTxQueue(q->getName())) - deleteQueue(q->getName()); - } + boost::bind(&BrokerReplicator::disconnectedExchange, this, _1)); } void BrokerReplicator::setMembership(const Variant::List& brokers) { |
