summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/ha/BrokerReplicator.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp56
1 files changed, 26 insertions, 30 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 7928b6ab71..3957ef5a0c 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -270,7 +270,8 @@ template <class EventType> std::string key() {
}
boost::shared_ptr<BrokerReplicator> BrokerReplicator::create(
- HaBroker& hb, const boost::shared_ptr<broker::Link>& l) {
+ HaBroker& hb, const boost::shared_ptr<broker::Link>& l)
+{
boost::shared_ptr<BrokerReplicator> br(new BrokerReplicator(hb, l));
br->initialize();
return br;
@@ -330,13 +331,21 @@ void BrokerReplicator::initialize() {
BrokerReplicator::~BrokerReplicator() {}
namespace {
-void collectQueueReplicators(
- const boost::shared_ptr<Exchange>& ex,
- set<boost::shared_ptr<QueueReplicator> >& collect)
-{
- boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
- if (qr) collect.insert(qr);
-}
+struct QueueReplicators : public std::deque<boost::shared_ptr<QueueReplicator> > {
+ QueueReplicators(const ExchangeRegistry& er) { addAll(er); }
+
+ /** Add the exchange if it is a QueueReplicator. */
+ void add(const boost::shared_ptr<Exchange>& ex) {
+ boost::shared_ptr<QueueReplicator> qr =
+ boost::dynamic_pointer_cast<QueueReplicator>(ex);
+ if (qr) push_back(qr);
+ }
+ /** Add all QueueReplicator in the ExchangeRegistry. */
+ void addAll(const ExchangeRegistry& er) {
+ // Make copy of exchanges so we can work outside the registry lock.
+ er.eachExchange(boost::bind(&QueueReplicators::add, this, _1));
+ }
+};
} // namespace
void BrokerReplicator::shutdown() {
@@ -877,35 +886,22 @@ void BrokerReplicator::forced(broker::Connection& c, const std::string& message)
string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
-void BrokerReplicator::disconnectedQueueReplicator(boost::shared_ptr<Exchange> ex) {
- boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
- if (qr) {
- qr->disconnect();
- if (TxReplicator::isTxQueue(qr->getQueue()->getName())) {
- // Transactions are aborted on failover so clean up tx-queues
- deleteQueue(qr->getQueue()->getName());
- }
+void BrokerReplicator::disconnectedQueueReplicator(
+ const boost::shared_ptr<QueueReplicator>& qr)
+{
+ qr->disconnect();
+ if (TxReplicator::isTxQueue(qr->getQueue()->getName())) {
+ // Transactions are aborted on failover so clean up tx-queues
+ deleteQueue(qr->getQueue()->getName());
}
}
-typedef vector<boost::shared_ptr<Exchange> > ExchangeVector;
-
-// Callback function for accumulating exchange candidates
-namespace {
-void exchangeAccumulatorCallback(ExchangeVector& ev, const Exchange::shared_ptr& i) {
- ev.push_back(i);
-}
-}
-
// Called by ConnectionObserver::disconnected, disconnected from the network side.
void BrokerReplicator::disconnected() {
QPID_LOG(info, logPrefix << "Disconnected from primary " << primary);
connect = 0;
-
- // Make copy of exchanges so we can work outside the registry lock.
- ExchangeVector exs;
- exchanges.eachExchange(boost::bind(&exchangeAccumulatorCallback, boost::ref(exs), _1));
- for_each(exs.begin(), exs.end(),
+ QueueReplicators qrs(broker.getExchanges());
+ for_each(qrs.begin(), qrs.end(),
boost::bind(&BrokerReplicator::disconnectedQueueReplicator, this, _1));
}