summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha/BrokerReplicator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha/BrokerReplicator.cpp')
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.cpp72
1 files changed, 33 insertions, 39 deletions
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp
index bff7a188b1..8f09c5db8f 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -298,36 +298,33 @@ void BrokerReplicator::route(Deliverable& msg) {
}
void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
- string name = values[QNAME].asString();
Variant::Map argsMap = asMapVoid(values[ARGS]);
- if (!replicationTest.isReplicated(
- CONFIGURATION,
- values[ARGS].asMap(),
- values[AUTODEL].asBool(),
- values[EXCL].asBool()))
- return;
- if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) {
+ bool autoDel = values[AUTODEL].asBool();
+ bool excl = values[EXCL].asBool();
+ if (values[DISP] == CREATED &&
+ replicationTest.isReplicated(CONFIGURATION, argsMap, autoDel, excl))
+ {
+ string name = values[QNAME].asString();
+ QPID_LOG(debug, logPrefix << "Queue declare event: " << name);
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
// If we already have a queue with this name, replace it.
// The queue was definitely created on the primary.
if (broker.getQueues().find(name)) {
+ QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name);
broker.getQueues().destroy(name);
- QPID_LOG(warning, logPrefix << "Queue declare event, replaced exsiting: "
- << name);
}
std::pair<boost::shared_ptr<Queue>, bool> result =
broker.createQueue(
name,
values[DURABLE].asBool(),
- values[AUTODEL].asBool(),
+ autoDel,
0, // no owner regardless of exclusivity on primary
values[ALTEX].asString(),
args,
values[USER].asString(),
values[RHOST].asString());
assert(result.second); // Should be true since we destroyed existing queue above
- QPID_LOG(debug, logPrefix << "Queue declare event, starting replication: " << name);
startQueueReplicator(result.first);
}
}
@@ -345,21 +342,16 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
// sessions may be closed by a "queue deleted" exception.
string name = values[QNAME].asString();
boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
- if (!queue) {
- QPID_LOG(warning, logPrefix << "Queue delete event, does not exist: " << name);
- } else if (!replicationTest.replicateLevel(queue->getSettings())) {
- QPID_LOG(warning, logPrefix << "Queue delete event, not replicated: " << name);
- } else {
+ if (queue && replicationTest.replicateLevel(queue->getSettings())) {
+ QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name);
if (qr) {
qr->deactivate();
- haBroker.deactivatedBackup(name);
// QueueReplicator's bridge is now queued for destruction but may not
// actually be destroyed.
broker.getExchanges().destroy(qr->getName());
}
broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString());
- QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
}
}
@@ -368,14 +360,15 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
if (!replicationTest.replicateLevel(argsMap)) return; // Not a replicated exchange.
if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) {
string name = values[EXNAME].asString();
+ QPID_LOG(debug, logPrefix << "Exchange declare event: " << name);
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
// If we already have a exchange with this name, replace it.
// The exchange was definitely created on the primary.
if (broker.getExchanges().find(name)) {
broker.getExchanges().destroy(name);
- QPID_LOG(warning, logPrefix << "Exchange declare event, replaced exsiting: " << name)
- }
+ QPID_LOG(warning, logPrefix << "Replaced exsiting exchange: " << name);
+ }
std::pair<boost::shared_ptr<Exchange>, bool> result =
broker.createExchange(
name,
@@ -386,7 +379,6 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
values[USER].asString(),
values[RHOST].asString());
assert(result.second);
- QPID_LOG(debug, logPrefix << "Exchange declare event: " << name);
}
}
@@ -419,10 +411,10 @@ void BrokerReplicator::doEventBind(Variant::Map& values) {
framing::FieldTable args;
amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
- exchange->bind(queue, key, &args);
QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
+ exchange->bind(queue, key, &args);
}
}
@@ -439,16 +431,17 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) {
framing::FieldTable args;
amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
- exchange->unbind(queue, key, &args);
QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
+ exchange->unbind(queue, key, &args);
}
}
void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) {
Variant::List members = values[MEMBERS].asList();
- haBroker.getMembership().assign(members);
+ QPID_LOG(debug, logPrefix << "Membership update event: " << members);
+ haBroker.setMembership(members);
}
void BrokerReplicator::doResponseQueue(Variant::Map& values) {
@@ -459,9 +452,10 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
values[AUTODELETE].asBool(),
values[EXCLUSIVE].asBool()))
return;
+ string name(values[NAME].asString());
+ QPID_LOG(debug, logPrefix << "Queue response: " << name);
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
- string name(values[NAME].asString());
std::pair<boost::shared_ptr<Queue>, bool> result =
broker.createQueue(
name,
@@ -473,28 +467,28 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
""/*TODO: who is the user?*/,
""/*TODO: what should we use as connection id?*/);
// It is normal for the queue to already exist if we are failing over.
- QPID_LOG(debug, logPrefix << "Queue response, "
- << (result.second ? "starting replication: " : "already replicated: ")
- << name);
- if (result.second) startQueueReplicator(result.first);
+ if (result.second)
+ startQueueReplicator(result.first);
+ else
+ QPID_LOG(debug, logPrefix << "Queue already replicated: " << name);
}
void BrokerReplicator::doResponseExchange(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
if (!replicationTest.replicateLevel(argsMap)) return;
+ string name = values[NAME].asString();
+ QPID_LOG(debug, logPrefix << "Exchange response: " << name);
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
bool created = broker.createExchange(
- values[NAME].asString(),
+ name,
values[TYPE].asString(),
values[DURABLE].asBool(),
""/*TODO: need to include alternate-exchange*/,
args,
""/*TODO: who is the user?*/,
""/*TODO: what should we use as connection id?*/).second;
- QPID_LOG(debug, logPrefix << "Exchange response, "
- << (created ? "created replica: " : "already exists: ")
- << values[NAME].asString());
+ QPID_LOG_IF(debug, !created, logPrefix << "Exchange already exists: " << name);
}
namespace {
@@ -528,13 +522,13 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) {
if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
queue && replicationTest.replicateLevel(queue->getSettings()))
{
+ string key = values[KEY].asString();
+ QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName
+ << " queue:" << qName
+ << " key:" << key);
framing::FieldTable args;
amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
- string key = values[KEY].asString();
exchange->bind(queue, key, &args);
- QPID_LOG(debug, logPrefix << "Bind response: exchange=" << exchange->getName()
- << " queue=" << queue->getName()
- << " key=" << key);
}
}
@@ -545,6 +539,7 @@ const string REPLICATE_DEFAULT="replicateDefault";
// Received the ha-broker configuration object for the primary broker.
void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
try {
+ QPID_LOG(debug, logPrefix << "HA Broker response: " << values);
ReplicateLevel mine = haBroker.getSettings().replicateDefault.get();
ReplicateLevel primary = replicationTest.replicateLevel(
values[REPLICATE_DEFAULT].asString());
@@ -566,7 +561,6 @@ void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queu
if (!broker.getExchanges().registerExchange(qr))
throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName()));
qr->activate();
- haBroker.activatedBackup(queue->getName());
}
}