summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-06-22 19:28:30 +0000
committerAlan Conway <aconway@apache.org>2012-06-22 19:28:30 +0000
commit3637c9d02c28dad758662f7a3c863c57caa4f1f3 (patch)
tree3ed39b8781c670b27ce1c052c1edf2fb9039b1eb
parent5fa99445dc43cb0af8aeed08dd32f803c2329252 (diff)
downloadqpid-python-3637c9d02c28dad758662f7a3c863c57caa4f1f3.tar.gz
NO-JIRA: Cleanup in HA code.
- Better printed representation for queue ranges. - Removed dead code in ha module related to activated/deactivated backups. - Improved log messages in ha/BrokerReplicator.cpp git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1353000 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp72
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp16
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h8
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.h10
-rw-r--r--qpid/cpp/src/qpid/ha/QueueRange.h2
5 files changed, 41 insertions, 67 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index bff7a188b1..8f09c5db8f 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -298,36 +298,33 @@ void BrokerReplicator::route(Deliverable& msg) {
}
void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
- string name = values[QNAME].asString();
Variant::Map argsMap = asMapVoid(values[ARGS]);
- if (!replicationTest.isReplicated(
- CONFIGURATION,
- values[ARGS].asMap(),
- values[AUTODEL].asBool(),
- values[EXCL].asBool()))
- return;
- if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) {
+ bool autoDel = values[AUTODEL].asBool();
+ bool excl = values[EXCL].asBool();
+ if (values[DISP] == CREATED &&
+ replicationTest.isReplicated(CONFIGURATION, argsMap, autoDel, excl))
+ {
+ string name = values[QNAME].asString();
+ QPID_LOG(debug, logPrefix << "Queue declare event: " << name);
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
// If we already have a queue with this name, replace it.
// The queue was definitely created on the primary.
if (broker.getQueues().find(name)) {
+ QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name);
broker.getQueues().destroy(name);
- QPID_LOG(warning, logPrefix << "Queue declare event, replaced exsiting: "
- << name);
}
std::pair<boost::shared_ptr<Queue>, bool> result =
broker.createQueue(
name,
values[DURABLE].asBool(),
- values[AUTODEL].asBool(),
+ autoDel,
0, // no owner regardless of exclusivity on primary
values[ALTEX].asString(),
args,
values[USER].asString(),
values[RHOST].asString());
assert(result.second); // Should be true since we destroyed existing queue above
- QPID_LOG(debug, logPrefix << "Queue declare event, starting replication: " << name);
startQueueReplicator(result.first);
}
}
@@ -345,21 +342,16 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
// sessions may be closed by a "queue deleted" exception.
string name = values[QNAME].asString();
boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
- if (!queue) {
- QPID_LOG(warning, logPrefix << "Queue delete event, does not exist: " << name);
- } else if (!replicationTest.replicateLevel(queue->getSettings())) {
- QPID_LOG(warning, logPrefix << "Queue delete event, not replicated: " << name);
- } else {
+ if (queue && replicationTest.replicateLevel(queue->getSettings())) {
+ QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name);
if (qr) {
qr->deactivate();
- haBroker.deactivatedBackup(name);
// QueueReplicator's bridge is now queued for destruction but may not
// actually be destroyed.
broker.getExchanges().destroy(qr->getName());
}
broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString());
- QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
}
}
@@ -368,14 +360,15 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
if (!replicationTest.replicateLevel(argsMap)) return; // Not a replicated exchange.
if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) {
string name = values[EXNAME].asString();
+ QPID_LOG(debug, logPrefix << "Exchange declare event: " << name);
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
// If we already have a exchange with this name, replace it.
// The exchange was definitely created on the primary.
if (broker.getExchanges().find(name)) {
broker.getExchanges().destroy(name);
- QPID_LOG(warning, logPrefix << "Exchange declare event, replaced exsiting: " << name)
- }
+ QPID_LOG(warning, logPrefix << "Replaced exsiting exchange: " << name);
+ }
std::pair<boost::shared_ptr<Exchange>, bool> result =
broker.createExchange(
name,
@@ -386,7 +379,6 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
values[USER].asString(),
values[RHOST].asString());
assert(result.second);
- QPID_LOG(debug, logPrefix << "Exchange declare event: " << name);
}
}
@@ -419,10 +411,10 @@ void BrokerReplicator::doEventBind(Variant::Map& values) {
framing::FieldTable args;
amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
- exchange->bind(queue, key, &args);
QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
+ exchange->bind(queue, key, &args);
}
}
@@ -439,16 +431,17 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) {
framing::FieldTable args;
amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
- exchange->unbind(queue, key, &args);
QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
+ exchange->unbind(queue, key, &args);
}
}
void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) {
Variant::List members = values[MEMBERS].asList();
- haBroker.getMembership().assign(members);
+ QPID_LOG(debug, logPrefix << "Membership update event: " << members);
+ haBroker.setMembership(members);
}
void BrokerReplicator::doResponseQueue(Variant::Map& values) {
@@ -459,9 +452,10 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
values[AUTODELETE].asBool(),
values[EXCLUSIVE].asBool()))
return;
+ string name(values[NAME].asString());
+ QPID_LOG(debug, logPrefix << "Queue response: " << name);
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
- string name(values[NAME].asString());
std::pair<boost::shared_ptr<Queue>, bool> result =
broker.createQueue(
name,
@@ -473,28 +467,28 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
""/*TODO: who is the user?*/,
""/*TODO: what should we use as connection id?*/);
// It is normal for the queue to already exist if we are failing over.
- QPID_LOG(debug, logPrefix << "Queue response, "
- << (result.second ? "starting replication: " : "already replicated: ")
- << name);
- if (result.second) startQueueReplicator(result.first);
+ if (result.second)
+ startQueueReplicator(result.first);
+ else
+ QPID_LOG(debug, logPrefix << "Queue already replicated: " << name);
}
void BrokerReplicator::doResponseExchange(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
if (!replicationTest.replicateLevel(argsMap)) return;
+ string name = values[NAME].asString();
+ QPID_LOG(debug, logPrefix << "Exchange response: " << name);
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
bool created = broker.createExchange(
- values[NAME].asString(),
+ name,
values[TYPE].asString(),
values[DURABLE].asBool(),
""/*TODO: need to include alternate-exchange*/,
args,
""/*TODO: who is the user?*/,
""/*TODO: what should we use as connection id?*/).second;
- QPID_LOG(debug, logPrefix << "Exchange response, "
- << (created ? "created replica: " : "already exists: ")
- << values[NAME].asString());
+ QPID_LOG_IF(debug, !created, logPrefix << "Exchange already exists: " << name);
}
namespace {
@@ -528,13 +522,13 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) {
if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
queue && replicationTest.replicateLevel(queue->getSettings()))
{
+ string key = values[KEY].asString();
+ QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName
+ << " queue:" << qName
+ << " key:" << key);
framing::FieldTable args;
amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
- string key = values[KEY].asString();
exchange->bind(queue, key, &args);
- QPID_LOG(debug, logPrefix << "Bind response: exchange=" << exchange->getName()
- << " queue=" << queue->getName()
- << " key=" << key);
}
}
@@ -545,6 +539,7 @@ const string REPLICATE_DEFAULT="replicateDefault";
// Received the ha-broker configuration object for the primary broker.
void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
try {
+ QPID_LOG(debug, logPrefix << "HA Broker response: " << values);
ReplicateLevel mine = haBroker.getSettings().replicateDefault.get();
ReplicateLevel primary = replicationTest.replicateLevel(
values[REPLICATE_DEFAULT].asString());
@@ -566,7 +561,6 @@ void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queu
if (!broker.getExchanges().registerExchange(qr))
throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName()));
qr->activate();
- haBroker.activatedBackup(queue->getName());
}
}
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 3967a45515..4752d51190 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -297,20 +297,4 @@ void HaBroker::setLinkProperties(Mutex::ScopedLock&) {
broker.setLinkClientProperties(linkProperties);
}
-void HaBroker::activatedBackup(const std::string& queue) {
- Mutex::ScopedLock l(lock);
- activeBackups.insert(queue);
-}
-
-void HaBroker::deactivatedBackup(const std::string& queue) {
- Mutex::ScopedLock l(lock);
- activeBackups.erase(queue);
-}
-
-// FIXME aconway 2012-05-31: strip out.
-HaBroker::QueueNames HaBroker::getActiveBackups() const {
- Mutex::ScopedLock l(lock);
- return activeBackups;
-}
-
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index aa15944259..7ba1599c09 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -84,13 +84,6 @@ class HaBroker : public management::Manageable
Backup* getBackup() { return backup.get(); }
ReplicationTest getReplicationTest() const { return replicationTest; }
- // Keep track of the set of actively replicated queues on a backup
- // so that it can be transferred to the Primary on promotion.
- typedef std::set<std::string> QueueNames;
- void activatedBackup(const std::string& queue);
- void deactivatedBackup(const std::string& queue);
- QueueNames getActiveBackups() const;
-
boost::shared_ptr<ConnectionObserver> getObserver() { return observer; }
const BrokerInfo& getBrokerInfo() const { return brokerInfo; }
@@ -124,7 +117,6 @@ class HaBroker : public management::Manageable
Url clientUrl, brokerUrl;
std::vector<Url> knownBrokers;
BrokerStatus status;
- QueueNames activeBackups;
boost::shared_ptr<ConnectionObserver> observer;
BrokerInfo brokerInfo;
Membership membership;
diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h
index 3187d80438..3de579a88d 100644
--- a/qpid/cpp/src/qpid/ha/Primary.h
+++ b/qpid/cpp/src/qpid/ha/Primary.h
@@ -85,11 +85,15 @@ class Primary
HaBroker& haBroker;
std::string logPrefix;
bool active;
+ /**
+ * Set of expected backups that must be ready before we declare ourselves
+ * active
+ */
BackupSet initialBackups;
/**
- * Backups is a map of all the remote backups we know about: any expected
- * backups plus all actual backups that have connected. We do not remove
- * entries when a backup disconnects. @see Primary::closed()
+ * Map of all the remote backups we know about: any expected backups plus
+ * all actual backups that have connected. We do not remove entries when a
+ * backup disconnects. @see Primary::closed()
*/
BackupMap backups;
boost::shared_ptr<broker::ConnectionObserver> connectionObserver;
diff --git a/qpid/cpp/src/qpid/ha/QueueRange.h b/qpid/cpp/src/qpid/ha/QueueRange.h
index 3ca034e411..3e83ff795e 100644
--- a/qpid/cpp/src/qpid/ha/QueueRange.h
+++ b/qpid/cpp/src/qpid/ha/QueueRange.h
@@ -61,7 +61,7 @@ struct QueueRange {
inline std::ostream& operator<<(std::ostream& o, const QueueRange& qr) {
- if (qr.front > qr.back) return o << "[-" << qr.back << "]";
+ if (qr.front > qr.back) return o << "[-," << qr.back << "]";
else return o << "[" << qr.front << "," << qr.back << "]";
}