diff options
Diffstat (limited to 'cpp/src/qpid/ha/BrokerReplicator.cpp')
-rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.cpp | 33 |
1 files changed, 15 insertions, 18 deletions
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp index 73ab5327fc..14e6e1a5d1 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -302,12 +302,12 @@ void BrokerReplicator::route(Deliverable& msg) { else if (type == HA_BROKER) doResponseHaBroker(values); } if (MessageTransfer::isLastQMFResponse(msg.getMessage(), EXCHANGE)) { - QPID_LOG(debug, logPrefix << "Initial exchange configuration complete."); + QPID_LOG(debug, logPrefix << "All exchange responses received.") cleaner.cleanExchanges(); // Clean up exchanges that no longer exist in the primary alternates.clear(); } if (MessageTransfer::isLastQMFResponse(msg.getMessage(), QUEUE)) { - QPID_LOG(debug, logPrefix << "Initial queue configuration complete."); + QPID_LOG(debug, logPrefix << "All queue responses received.") cleaner.cleanQueues(); // Clean up queues that no longer exist in the primary } } @@ -338,16 +338,9 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { deleteQueue(name); } settings.populate(args, settings.storeSettings); - CreateQueueResult result = - broker.createQueue( - name, - settings, - 0 /*i.e. no owner regardless of exclusivity on master*/, - values[ALTEX].asString(), - userId, - remoteHost); - assert(result.second); - startQueueReplicator(result.first); + CreateQueueResult result = createQueue( + name, values[DURABLE].asBool(), autoDel, args, values[ALTEX].asString()); + assert(result.second); // Should be created since we destroed the previous queue above. } } @@ -494,7 +487,9 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { CreateQueueResult result = createQueue(name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args, getAltExchange(values[ALTEXCHANGE])); - if (result.second) startQueueReplicator(result.first); + // It is normal for the queue to already exist if we are failing over. + if (!result.second) + QPID_LOG(debug, logPrefix << "Queue already replicated: " << name); } void BrokerReplicator::doResponseExchange(Variant::Map& values) { @@ -596,8 +591,8 @@ void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queu } } -void BrokerReplicator::deactivateQueue(const std::string& name) { - boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name); +void BrokerReplicator::deactivateQueue(const std::string& queueName) { + boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(queueName); if (qr) { qr->deactivate(); // QueueReplicator's bridge is now queued for destruction but may not @@ -646,10 +641,12 @@ BrokerReplicator::CreateQueueResult BrokerReplicator::createQueue( string(), // Set alternate exchange below userId, remoteHost); - - if (!alternateExchange.empty()) { + boost::shared_ptr<Queue> queue = result.first; + if (!findQueueReplicator(queue->getName())) startQueueReplicator(queue); + if (result.second && !alternateExchange.empty()) { alternates.setAlternate( - alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1)); + alternateExchange, + boost::bind(&Queue::setAlternateExchange, result.first, _1)); } return result; } |