diff options
Diffstat (limited to 'qpid/cpp/src/qpid/ha/BrokerReplicator.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 56 |
1 files changed, 26 insertions, 30 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 7928b6ab71..3957ef5a0c 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -270,7 +270,8 @@ template <class EventType> std::string key() { } boost::shared_ptr<BrokerReplicator> BrokerReplicator::create( - HaBroker& hb, const boost::shared_ptr<broker::Link>& l) { + HaBroker& hb, const boost::shared_ptr<broker::Link>& l) +{ boost::shared_ptr<BrokerReplicator> br(new BrokerReplicator(hb, l)); br->initialize(); return br; @@ -330,13 +331,21 @@ void BrokerReplicator::initialize() { BrokerReplicator::~BrokerReplicator() {} namespace { -void collectQueueReplicators( - const boost::shared_ptr<Exchange>& ex, - set<boost::shared_ptr<QueueReplicator> >& collect) -{ - boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex)); - if (qr) collect.insert(qr); -} +struct QueueReplicators : public std::deque<boost::shared_ptr<QueueReplicator> > { + QueueReplicators(const ExchangeRegistry& er) { addAll(er); } + + /** Add the exchange if it is a QueueReplicator. */ + void add(const boost::shared_ptr<Exchange>& ex) { + boost::shared_ptr<QueueReplicator> qr = + boost::dynamic_pointer_cast<QueueReplicator>(ex); + if (qr) push_back(qr); + } + /** Add all QueueReplicator in the ExchangeRegistry. */ + void addAll(const ExchangeRegistry& er) { + // Make copy of exchanges so we can work outside the registry lock. + er.eachExchange(boost::bind(&QueueReplicators::add, this, _1)); + } +}; } // namespace void BrokerReplicator::shutdown() { @@ -877,35 +886,22 @@ void BrokerReplicator::forced(broker::Connection& c, const std::string& message) string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; } -void BrokerReplicator::disconnectedQueueReplicator(boost::shared_ptr<Exchange> ex) { - boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex)); - if (qr) { - qr->disconnect(); - if (TxReplicator::isTxQueue(qr->getQueue()->getName())) { - // Transactions are aborted on failover so clean up tx-queues - deleteQueue(qr->getQueue()->getName()); - } +void BrokerReplicator::disconnectedQueueReplicator( + const boost::shared_ptr<QueueReplicator>& qr) +{ + qr->disconnect(); + if (TxReplicator::isTxQueue(qr->getQueue()->getName())) { + // Transactions are aborted on failover so clean up tx-queues + deleteQueue(qr->getQueue()->getName()); } } -typedef vector<boost::shared_ptr<Exchange> > ExchangeVector; - -// Callback function for accumulating exchange candidates -namespace { -void exchangeAccumulatorCallback(ExchangeVector& ev, const Exchange::shared_ptr& i) { - ev.push_back(i); -} -} - // Called by ConnectionObserver::disconnected, disconnected from the network side. void BrokerReplicator::disconnected() { QPID_LOG(info, logPrefix << "Disconnected from primary " << primary); connect = 0; - - // Make copy of exchanges so we can work outside the registry lock. - ExchangeVector exs; - exchanges.eachExchange(boost::bind(&exchangeAccumulatorCallback, boost::ref(exs), _1)); - for_each(exs.begin(), exs.end(), + QueueReplicators qrs(broker.getExchanges()); + for_each(qrs.begin(), qrs.end(), boost::bind(&BrokerReplicator::disconnectedQueueReplicator, this, _1)); } |