diff options
Diffstat (limited to 'cpp/src/qpid/ha/BrokerReplicator.cpp')
| -rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.cpp | 72 |
1 files changed, 33 insertions, 39 deletions
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp index bff7a188b1..8f09c5db8f 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -298,36 +298,33 @@ void BrokerReplicator::route(Deliverable& msg) { } void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { - string name = values[QNAME].asString(); Variant::Map argsMap = asMapVoid(values[ARGS]); - if (!replicationTest.isReplicated( - CONFIGURATION, - values[ARGS].asMap(), - values[AUTODEL].asBool(), - values[EXCL].asBool())) - return; - if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) { + bool autoDel = values[AUTODEL].asBool(); + bool excl = values[EXCL].asBool(); + if (values[DISP] == CREATED && + replicationTest.isReplicated(CONFIGURATION, argsMap, autoDel, excl)) + { + string name = values[QNAME].asString(); + QPID_LOG(debug, logPrefix << "Queue declare event: " << name); framing::FieldTable args; amqp_0_10::translate(argsMap, args); // If we already have a queue with this name, replace it. // The queue was definitely created on the primary. if (broker.getQueues().find(name)) { + QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name); broker.getQueues().destroy(name); - QPID_LOG(warning, logPrefix << "Queue declare event, replaced exsiting: " - << name); } std::pair<boost::shared_ptr<Queue>, bool> result = broker.createQueue( name, values[DURABLE].asBool(), - values[AUTODEL].asBool(), + autoDel, 0, // no owner regardless of exclusivity on primary values[ALTEX].asString(), args, values[USER].asString(), values[RHOST].asString()); assert(result.second); // Should be true since we destroyed existing queue above - QPID_LOG(debug, logPrefix << "Queue declare event, starting replication: " << name); startQueueReplicator(result.first); } } @@ -345,21 +342,16 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { // sessions may be closed by a "queue deleted" exception. string name = values[QNAME].asString(); boost::shared_ptr<Queue> queue = broker.getQueues().find(name); - if (!queue) { - QPID_LOG(warning, logPrefix << "Queue delete event, does not exist: " << name); - } else if (!replicationTest.replicateLevel(queue->getSettings())) { - QPID_LOG(warning, logPrefix << "Queue delete event, not replicated: " << name); - } else { + if (queue && replicationTest.replicateLevel(queue->getSettings())) { + QPID_LOG(debug, logPrefix << "Queue delete event: " << name); boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name); if (qr) { qr->deactivate(); - haBroker.deactivatedBackup(name); // QueueReplicator's bridge is now queued for destruction but may not // actually be destroyed. broker.getExchanges().destroy(qr->getName()); } broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString()); - QPID_LOG(debug, logPrefix << "Queue delete event: " << name); } } @@ -368,14 +360,15 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { if (!replicationTest.replicateLevel(argsMap)) return; // Not a replicated exchange. if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) { string name = values[EXNAME].asString(); + QPID_LOG(debug, logPrefix << "Exchange declare event: " << name); framing::FieldTable args; amqp_0_10::translate(argsMap, args); // If we already have a exchange with this name, replace it. // The exchange was definitely created on the primary. if (broker.getExchanges().find(name)) { broker.getExchanges().destroy(name); - QPID_LOG(warning, logPrefix << "Exchange declare event, replaced exsiting: " << name) - } + QPID_LOG(warning, logPrefix << "Replaced exsiting exchange: " << name); + } std::pair<boost::shared_ptr<Exchange>, bool> result = broker.createExchange( name, @@ -386,7 +379,6 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { values[USER].asString(), values[RHOST].asString()); assert(result.second); - QPID_LOG(debug, logPrefix << "Exchange declare event: " << name); } } @@ -419,10 +411,10 @@ void BrokerReplicator::doEventBind(Variant::Map& values) { framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); - exchange->bind(queue, key, &args); QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); + exchange->bind(queue, key, &args); } } @@ -439,16 +431,17 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) { framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); - exchange->unbind(queue, key, &args); QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); + exchange->unbind(queue, key, &args); } } void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) { Variant::List members = values[MEMBERS].asList(); - haBroker.getMembership().assign(members); + QPID_LOG(debug, logPrefix << "Membership update event: " << members); + haBroker.setMembership(members); } void BrokerReplicator::doResponseQueue(Variant::Map& values) { @@ -459,9 +452,10 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { values[AUTODELETE].asBool(), values[EXCLUSIVE].asBool())) return; + string name(values[NAME].asString()); + QPID_LOG(debug, logPrefix << "Queue response: " << name); framing::FieldTable args; amqp_0_10::translate(argsMap, args); - string name(values[NAME].asString()); std::pair<boost::shared_ptr<Queue>, bool> result = broker.createQueue( name, @@ -473,28 +467,28 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/); // It is normal for the queue to already exist if we are failing over. - QPID_LOG(debug, logPrefix << "Queue response, " - << (result.second ? "starting replication: " : "already replicated: ") - << name); - if (result.second) startQueueReplicator(result.first); + if (result.second) + startQueueReplicator(result.first); + else + QPID_LOG(debug, logPrefix << "Queue already replicated: " << name); } void BrokerReplicator::doResponseExchange(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); if (!replicationTest.replicateLevel(argsMap)) return; + string name = values[NAME].asString(); + QPID_LOG(debug, logPrefix << "Exchange response: " << name); framing::FieldTable args; amqp_0_10::translate(argsMap, args); bool created = broker.createExchange( - values[NAME].asString(), + name, values[TYPE].asString(), values[DURABLE].asBool(), ""/*TODO: need to include alternate-exchange*/, args, ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/).second; - QPID_LOG(debug, logPrefix << "Exchange response, " - << (created ? "created replica: " : "already exists: ") - << values[NAME].asString()); + QPID_LOG_IF(debug, !created, logPrefix << "Exchange already exists: " << name); } namespace { @@ -528,13 +522,13 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) { if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && queue && replicationTest.replicateLevel(queue->getSettings())) { + string key = values[KEY].asString(); + QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName + << " queue:" << qName + << " key:" << key); framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); - string key = values[KEY].asString(); exchange->bind(queue, key, &args); - QPID_LOG(debug, logPrefix << "Bind response: exchange=" << exchange->getName() - << " queue=" << queue->getName() - << " key=" << key); } } @@ -545,6 +539,7 @@ const string REPLICATE_DEFAULT="replicateDefault"; // Received the ha-broker configuration object for the primary broker. void BrokerReplicator::doResponseHaBroker(Variant::Map& values) { try { + QPID_LOG(debug, logPrefix << "HA Broker response: " << values); ReplicateLevel mine = haBroker.getSettings().replicateDefault.get(); ReplicateLevel primary = replicationTest.replicateLevel( values[REPLICATE_DEFAULT].asString()); @@ -566,7 +561,6 @@ void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queu if (!broker.getExchanges().registerExchange(qr)) throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName())); qr->activate(); - haBroker.activatedBackup(queue->getName()); } } |
