summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/BrokerReplicator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/BrokerReplicator.cpp')
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.cpp33
1 files changed, 15 insertions, 18 deletions
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp
index 73ab5327fc..14e6e1a5d1 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -302,12 +302,12 @@ void BrokerReplicator::route(Deliverable& msg) {
else if (type == HA_BROKER) doResponseHaBroker(values);
}
if (MessageTransfer::isLastQMFResponse(msg.getMessage(), EXCHANGE)) {
- QPID_LOG(debug, logPrefix << "Initial exchange configuration complete.");
+ QPID_LOG(debug, logPrefix << "All exchange responses received.")
cleaner.cleanExchanges(); // Clean up exchanges that no longer exist in the primary
alternates.clear();
}
if (MessageTransfer::isLastQMFResponse(msg.getMessage(), QUEUE)) {
- QPID_LOG(debug, logPrefix << "Initial queue configuration complete.");
+ QPID_LOG(debug, logPrefix << "All queue responses received.")
cleaner.cleanQueues(); // Clean up queues that no longer exist in the primary
}
}
@@ -338,16 +338,9 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
deleteQueue(name);
}
settings.populate(args, settings.storeSettings);
- CreateQueueResult result =
- broker.createQueue(
- name,
- settings,
- 0 /*i.e. no owner regardless of exclusivity on master*/,
- values[ALTEX].asString(),
- userId,
- remoteHost);
- assert(result.second);
- startQueueReplicator(result.first);
+ CreateQueueResult result = createQueue(
+ name, values[DURABLE].asBool(), autoDel, args, values[ALTEX].asString());
+ assert(result.second); // Should be created since we destroed the previous queue above.
}
}
@@ -494,7 +487,9 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
CreateQueueResult result =
createQueue(name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args,
getAltExchange(values[ALTEXCHANGE]));
- if (result.second) startQueueReplicator(result.first);
+ // It is normal for the queue to already exist if we are failing over.
+ if (!result.second)
+ QPID_LOG(debug, logPrefix << "Queue already replicated: " << name);
}
void BrokerReplicator::doResponseExchange(Variant::Map& values) {
@@ -596,8 +591,8 @@ void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queu
}
}
-void BrokerReplicator::deactivateQueue(const std::string& name) {
- boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name);
+void BrokerReplicator::deactivateQueue(const std::string& queueName) {
+ boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(queueName);
if (qr) {
qr->deactivate();
// QueueReplicator's bridge is now queued for destruction but may not
@@ -646,10 +641,12 @@ BrokerReplicator::CreateQueueResult BrokerReplicator::createQueue(
string(), // Set alternate exchange below
userId,
remoteHost);
-
- if (!alternateExchange.empty()) {
+ boost::shared_ptr<Queue> queue = result.first;
+ if (!findQueueReplicator(queue->getName())) startQueueReplicator(queue);
+ if (result.second && !alternateExchange.empty()) {
alternates.setAlternate(
- alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1));
+ alternateExchange,
+ boost::bind(&Queue::setAlternateExchange, result.first, _1));
}
return result;
}