diff options
author | Alan Conway <aconway@apache.org> | 2012-06-22 19:28:30 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-06-22 19:28:30 +0000 |
commit | 3637c9d02c28dad758662f7a3c863c57caa4f1f3 (patch) | |
tree | 3ed39b8781c670b27ce1c052c1edf2fb9039b1eb | |
parent | 5fa99445dc43cb0af8aeed08dd32f803c2329252 (diff) | |
download | qpid-python-3637c9d02c28dad758662f7a3c863c57caa4f1f3.tar.gz |
NO-JIRA: Cleanup in HA code.
- Better printed representation for queue ranges.
- Removed dead code in ha module related to activated/deactivated backups.
- Improved log messages in ha/BrokerReplicator.cpp
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1353000 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 72 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 16 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.h | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.h | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueRange.h | 2 |
5 files changed, 41 insertions, 67 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index bff7a188b1..8f09c5db8f 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -298,36 +298,33 @@ void BrokerReplicator::route(Deliverable& msg) { } void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { - string name = values[QNAME].asString(); Variant::Map argsMap = asMapVoid(values[ARGS]); - if (!replicationTest.isReplicated( - CONFIGURATION, - values[ARGS].asMap(), - values[AUTODEL].asBool(), - values[EXCL].asBool())) - return; - if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) { + bool autoDel = values[AUTODEL].asBool(); + bool excl = values[EXCL].asBool(); + if (values[DISP] == CREATED && + replicationTest.isReplicated(CONFIGURATION, argsMap, autoDel, excl)) + { + string name = values[QNAME].asString(); + QPID_LOG(debug, logPrefix << "Queue declare event: " << name); framing::FieldTable args; amqp_0_10::translate(argsMap, args); // If we already have a queue with this name, replace it. // The queue was definitely created on the primary. if (broker.getQueues().find(name)) { + QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name); broker.getQueues().destroy(name); - QPID_LOG(warning, logPrefix << "Queue declare event, replaced exsiting: " - << name); } std::pair<boost::shared_ptr<Queue>, bool> result = broker.createQueue( name, values[DURABLE].asBool(), - values[AUTODEL].asBool(), + autoDel, 0, // no owner regardless of exclusivity on primary values[ALTEX].asString(), args, values[USER].asString(), values[RHOST].asString()); assert(result.second); // Should be true since we destroyed existing queue above - QPID_LOG(debug, logPrefix << "Queue declare event, starting replication: " << name); startQueueReplicator(result.first); } } @@ -345,21 +342,16 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { // sessions may be closed by a "queue deleted" exception. string name = values[QNAME].asString(); boost::shared_ptr<Queue> queue = broker.getQueues().find(name); - if (!queue) { - QPID_LOG(warning, logPrefix << "Queue delete event, does not exist: " << name); - } else if (!replicationTest.replicateLevel(queue->getSettings())) { - QPID_LOG(warning, logPrefix << "Queue delete event, not replicated: " << name); - } else { + if (queue && replicationTest.replicateLevel(queue->getSettings())) { + QPID_LOG(debug, logPrefix << "Queue delete event: " << name); boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name); if (qr) { qr->deactivate(); - haBroker.deactivatedBackup(name); // QueueReplicator's bridge is now queued for destruction but may not // actually be destroyed. broker.getExchanges().destroy(qr->getName()); } broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString()); - QPID_LOG(debug, logPrefix << "Queue delete event: " << name); } } @@ -368,14 +360,15 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { if (!replicationTest.replicateLevel(argsMap)) return; // Not a replicated exchange. if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) { string name = values[EXNAME].asString(); + QPID_LOG(debug, logPrefix << "Exchange declare event: " << name); framing::FieldTable args; amqp_0_10::translate(argsMap, args); // If we already have a exchange with this name, replace it. // The exchange was definitely created on the primary. if (broker.getExchanges().find(name)) { broker.getExchanges().destroy(name); - QPID_LOG(warning, logPrefix << "Exchange declare event, replaced exsiting: " << name) - } + QPID_LOG(warning, logPrefix << "Replaced exsiting exchange: " << name); + } std::pair<boost::shared_ptr<Exchange>, bool> result = broker.createExchange( name, @@ -386,7 +379,6 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { values[USER].asString(), values[RHOST].asString()); assert(result.second); - QPID_LOG(debug, logPrefix << "Exchange declare event: " << name); } } @@ -419,10 +411,10 @@ void BrokerReplicator::doEventBind(Variant::Map& values) { framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); - exchange->bind(queue, key, &args); QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); + exchange->bind(queue, key, &args); } } @@ -439,16 +431,17 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) { framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); - exchange->unbind(queue, key, &args); QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); + exchange->unbind(queue, key, &args); } } void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) { Variant::List members = values[MEMBERS].asList(); - haBroker.getMembership().assign(members); + QPID_LOG(debug, logPrefix << "Membership update event: " << members); + haBroker.setMembership(members); } void BrokerReplicator::doResponseQueue(Variant::Map& values) { @@ -459,9 +452,10 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { values[AUTODELETE].asBool(), values[EXCLUSIVE].asBool())) return; + string name(values[NAME].asString()); + QPID_LOG(debug, logPrefix << "Queue response: " << name); framing::FieldTable args; amqp_0_10::translate(argsMap, args); - string name(values[NAME].asString()); std::pair<boost::shared_ptr<Queue>, bool> result = broker.createQueue( name, @@ -473,28 +467,28 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/); // It is normal for the queue to already exist if we are failing over. - QPID_LOG(debug, logPrefix << "Queue response, " - << (result.second ? "starting replication: " : "already replicated: ") - << name); - if (result.second) startQueueReplicator(result.first); + if (result.second) + startQueueReplicator(result.first); + else + QPID_LOG(debug, logPrefix << "Queue already replicated: " << name); } void BrokerReplicator::doResponseExchange(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); if (!replicationTest.replicateLevel(argsMap)) return; + string name = values[NAME].asString(); + QPID_LOG(debug, logPrefix << "Exchange response: " << name); framing::FieldTable args; amqp_0_10::translate(argsMap, args); bool created = broker.createExchange( - values[NAME].asString(), + name, values[TYPE].asString(), values[DURABLE].asBool(), ""/*TODO: need to include alternate-exchange*/, args, ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/).second; - QPID_LOG(debug, logPrefix << "Exchange response, " - << (created ? "created replica: " : "already exists: ") - << values[NAME].asString()); + QPID_LOG_IF(debug, !created, logPrefix << "Exchange already exists: " << name); } namespace { @@ -528,13 +522,13 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) { if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && queue && replicationTest.replicateLevel(queue->getSettings())) { + string key = values[KEY].asString(); + QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName + << " queue:" << qName + << " key:" << key); framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); - string key = values[KEY].asString(); exchange->bind(queue, key, &args); - QPID_LOG(debug, logPrefix << "Bind response: exchange=" << exchange->getName() - << " queue=" << queue->getName() - << " key=" << key); } } @@ -545,6 +539,7 @@ const string REPLICATE_DEFAULT="replicateDefault"; // Received the ha-broker configuration object for the primary broker. void BrokerReplicator::doResponseHaBroker(Variant::Map& values) { try { + QPID_LOG(debug, logPrefix << "HA Broker response: " << values); ReplicateLevel mine = haBroker.getSettings().replicateDefault.get(); ReplicateLevel primary = replicationTest.replicateLevel( values[REPLICATE_DEFAULT].asString()); @@ -566,7 +561,6 @@ void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queu if (!broker.getExchanges().registerExchange(qr)) throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName())); qr->activate(); - haBroker.activatedBackup(queue->getName()); } } diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 3967a45515..4752d51190 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -297,20 +297,4 @@ void HaBroker::setLinkProperties(Mutex::ScopedLock&) { broker.setLinkClientProperties(linkProperties); } -void HaBroker::activatedBackup(const std::string& queue) { - Mutex::ScopedLock l(lock); - activeBackups.insert(queue); -} - -void HaBroker::deactivatedBackup(const std::string& queue) { - Mutex::ScopedLock l(lock); - activeBackups.erase(queue); -} - -// FIXME aconway 2012-05-31: strip out. -HaBroker::QueueNames HaBroker::getActiveBackups() const { - Mutex::ScopedLock l(lock); - return activeBackups; -} - }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index aa15944259..7ba1599c09 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -84,13 +84,6 @@ class HaBroker : public management::Manageable Backup* getBackup() { return backup.get(); } ReplicationTest getReplicationTest() const { return replicationTest; } - // Keep track of the set of actively replicated queues on a backup - // so that it can be transferred to the Primary on promotion. - typedef std::set<std::string> QueueNames; - void activatedBackup(const std::string& queue); - void deactivatedBackup(const std::string& queue); - QueueNames getActiveBackups() const; - boost::shared_ptr<ConnectionObserver> getObserver() { return observer; } const BrokerInfo& getBrokerInfo() const { return brokerInfo; } @@ -124,7 +117,6 @@ class HaBroker : public management::Manageable Url clientUrl, brokerUrl; std::vector<Url> knownBrokers; BrokerStatus status; - QueueNames activeBackups; boost::shared_ptr<ConnectionObserver> observer; BrokerInfo brokerInfo; Membership membership; diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h index 3187d80438..3de579a88d 100644 --- a/qpid/cpp/src/qpid/ha/Primary.h +++ b/qpid/cpp/src/qpid/ha/Primary.h @@ -85,11 +85,15 @@ class Primary HaBroker& haBroker; std::string logPrefix; bool active; + /** + * Set of expected backups that must be ready before we declare ourselves + * active + */ BackupSet initialBackups; /** - * Backups is a map of all the remote backups we know about: any expected - * backups plus all actual backups that have connected. We do not remove - * entries when a backup disconnects. @see Primary::closed() + * Map of all the remote backups we know about: any expected backups plus + * all actual backups that have connected. We do not remove entries when a + * backup disconnects. @see Primary::closed() */ BackupMap backups; boost::shared_ptr<broker::ConnectionObserver> connectionObserver; diff --git a/qpid/cpp/src/qpid/ha/QueueRange.h b/qpid/cpp/src/qpid/ha/QueueRange.h index 3ca034e411..3e83ff795e 100644 --- a/qpid/cpp/src/qpid/ha/QueueRange.h +++ b/qpid/cpp/src/qpid/ha/QueueRange.h @@ -61,7 +61,7 @@ struct QueueRange { inline std::ostream& operator<<(std::ostream& o, const QueueRange& qr) { - if (qr.front > qr.back) return o << "[-" << qr.back << "]"; + if (qr.front > qr.back) return o << "[-," << qr.back << "]"; else return o << "[" << qr.front << "," << qr.back << "]"; } |