summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-01-19 23:05:08 +0000
committerAlan Conway <aconway@apache.org>2012-01-19 23:05:08 +0000
commit6b8cae88b5eafc03773bacfa97b07eda7e1973cc (patch)
treed67965fa666902fb398c75155ab08233662044d0
parent0010e075f124b72498419b9431bfac5c83c5eb82 (diff)
downloadqpid-python-6b8cae88b5eafc03773bacfa97b07eda7e1973cc.tar.gz
QPID-3603: Cleaned up HA log messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233655 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp29
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp4
-rw-r--r--qpid/cpp/src/qpid/ha/HaPlugin.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp18
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp4
-rw-r--r--qpid/cpp/src/qpid/ha/WiringReplicator.cpp65
6 files changed, 61 insertions, 61 deletions
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 17da13ed1e..48ece928b6 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -43,23 +43,20 @@ using types::Variant;
using std::string;
Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
- // FIXME aconway 2011-11-24: identifying the primary.
- if (s.brokerUrl != "primary") { // FIXME aconway 2011-11-22: temporary hack to identify primary.
- Url url(s.brokerUrl);
- QPID_LOG(info, "HA: Acting as backup");
- string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+ Url url(s.brokerUrl);
+ string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
- // FIXME aconway 2011-11-17: TBD: link management, discovery, fail-over.
- // Declare the link
- std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
- url[0].host, url[0].port, protocol,
- false, // durable
- s.mechanism, s.username, s.password);
- assert(result.second); // FIXME aconway 2011-11-23: error handling
- link = result.first;
- boost::shared_ptr<WiringReplicator> wr(new WiringReplicator(link));
- broker.getExchanges().registerExchange(wr);
- }
+ // FIXME aconway 2011-11-17: TBD: link management, discovery, fail-over.
+ // Declare the link
+ std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
+ url[0].host, url[0].port, protocol,
+ false, // durable
+ s.mechanism, s.username, s.password);
+ assert(result.second); // FIXME aconway 2011-11-23: error handling
+ link = result.first;
+ boost::shared_ptr<WiringReplicator> wr(new WiringReplicator(link));
+ broker.getExchanges().registerExchange(wr);
}
+
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 43940897cc..92c431ea61 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -61,7 +61,9 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
}
QPID_LOG(notice, "HA: Initialized: client-url=" << clientUrl
<< " broker-url=" << brokerUrl);
- backup.reset(new Backup(broker, s));
+ // FIXME aconway 2011-11-22: temporary hack to identify primary.
+ if (s.brokerUrl != "primary")
+ backup.reset(new Backup(broker, s));
// Register a factory for replicating subscriptions.
broker.getConsumerFactories().add(
boost::shared_ptr<ReplicatingSubscription::Factory>(
diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
index 80f21e4320..fc9e48411d 100644
--- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp
+++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
@@ -58,7 +58,7 @@ struct HaPlugin : public Plugin {
if (broker && settings.enabled) {
haBroker.reset(new ha::HaBroker(*broker, settings));
} else
- QPID_LOG(info, "HA: Disabled");
+ QPID_LOG(notice, "HA: Disabled");
}
};
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 1f582adda7..d3eefe7369 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -42,7 +42,6 @@ namespace qpid {
namespace ha {
using namespace broker;
-// FIXME aconway 2011-12-02: separate file for string constantS?
const std::string QueueReplicator::DEQUEUE_EVENT_KEY("qpid.dequeue-event");
QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l)
@@ -85,7 +84,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, settings);
peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
- QPID_LOG(debug, "HA: Activated route from queue " << args.i_src << " to " << args.i_dest);
+ QPID_LOG(debug, "HA: Backup activated bridge from queue " << args.i_src << " to " << args.i_dest);
}
void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid::framing::FieldTable* /*args*/)
@@ -102,18 +101,15 @@ void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid
if (current < *i) {
//haven't got that far yet, record the dequeue
dequeued.add(*i);
- QPID_LOG(trace, "HA: Recording dequeue of message at " <<
- QueuePos(queue.get(), *i));
+ QPID_LOG(trace, "HA: Recording dequeue of " << QueuePos(queue.get(), *i));
} else {
QueuedMessage message;
if (queue->acquireMessageAt(*i, message)) {
queue->dequeue(0, message);
- QPID_LOG(info, "HA: Dequeued message "<< QueuePos(message));
+ QPID_LOG(trace, "HA: Backup dequeued: "<< QueuePos(message));
} else {
- // FIXME aconway 2011-11-29: error handling
- // Is this an error? Will happen if queue has initial dequeues.
- QPID_LOG(error, "HA: Unable to dequeue message at "
- << QueuePos(queue.get(), *i));
+ // This can happen if we're replicating a queue that has initial dequeues.
+ QPID_LOG(trace, "HA: Backup message already dequeued: "<< QueuePos(queue.get(), *i));
}
}
}
@@ -122,10 +118,10 @@ void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid
//dequeued before our subscription reached them
while (dequeued.contains(++current)) {
dequeued.remove(current);
- QPID_LOG(debug, "HA: Skipping dequeued message at " << current << " from " << queue->getName());
+ QPID_LOG(trace, "HA: Backup skipping dequeued message: " << QueuePos(queue.get(), current));
queue->setPosition(current);
}
- QPID_LOG(info, "HA: Enqueued message on " << queue->getName() << "; currently at " << current);
+ QPID_LOG(trace, "HA: Backup enqueued message: " << QueuePos(queue.get(), current));
msg.deliverTo(queue);
}
}
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 65b1ee65e8..620d27f8ae 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -33,8 +33,6 @@ using namespace framing;
using namespace broker;
using namespace std;
-// FIXME aconway 2011-11-28: review all arugment names, prefixes etc.
-// Do we want a common HA prefix?
const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription");
const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high-sequence-number");
const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low-sequence-number");
@@ -211,7 +209,7 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& m)
{
sys::Mutex::ScopedLock l(lock);
range.add(m.position);
- // FIXME aconway 2011-11-29: q[pos]
+ // FIXME aconway 2011-11-29: q[pos] logging
QPID_LOG(trace, "HA: Updated dequeue event to include " << QueuePos(m) << "; subscription is at " << position);
}
notify();
diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
index 73effd9a7a..62d456b21f 100644
--- a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
@@ -115,7 +115,6 @@ const string S_WIRING="wiring";
const string S_ALL="all";
ReplicateLevel replicateLevel(const string& str) {
- // FIXME aconway 2011-11-24: case insenstive comparison.
ReplicateLevel rl = RL_NONE;
if (str == S_WIRING) rl = RL_WIRING;
else if (str == S_ALL) rl = RL_ALL;
@@ -176,7 +175,7 @@ WiringReplicator::~WiringReplicator() {}
WiringReplicator::WiringReplicator(const boost::shared_ptr<Link>& l)
: Exchange(QPID_WIRING_REPLICATOR), broker(*l->getBroker()), link(l)
{
- QPID_LOG(debug, "HA: Starting replication from " <<
+ QPID_LOG(info, "HA: Backup replicating from " <<
link->getTransport() << ":" << link->getHost() << ":" << link->getPort());
broker.getLinks().declare(
link->getHost(), link->getPort(),
@@ -212,10 +211,10 @@ void WiringReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH
sendQuery(QUEUE, queueName, sessionHandler);
sendQuery(EXCHANGE, queueName, sessionHandler);
sendQuery(BINDING, queueName, sessionHandler);
- QPID_LOG(debug, "HA: Activated wiring replicator")
+ QPID_LOG(debug, "HA: Backup activated wiring bridge: " << queueName);
}
-// FIXME aconway 2011-12-02: error handling in route. Be forging but log warnings?
+// FIXME aconway 2011-12-02: error handling in route.
void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) {
Variant::List list;
try {
@@ -230,7 +229,8 @@ void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const fram
Variant::Map& map = i->asMap();
Variant::Map& schema = map[SCHEMA_ID].asMap();
Variant::Map& values = map[VALUES].asMap();
- QPID_LOG(trace, "HA: Configuration event: schema=" << schema << " values=" << values);
+ QPID_LOG(debug, "HA: Backup received event: schema=" << schema
+ << " values=" << values);
if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values);
else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values);
@@ -238,7 +238,9 @@ void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const fram
else if (match<EventBind>(schema)) doEventBind(values);
// FIXME aconway 2011-11-21: handle unbind & all other events.
else if (match<EventSubscribe>(schema)) {} // Deliberately ignored.
- else throw(Exception(QPID_MSG("WiringReplicator received unexpected event, schema=" << schema)));
+ // FIXME aconway 2011-12-02: error handling
+ else throw(Exception(QPID_MSG("Backup received unexpected event, schema="
+ << schema)));
}
} else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
@@ -246,18 +248,20 @@ void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const fram
Variant::Map& values = i->asMap()[VALUES].asMap();
framing::FieldTable args;
amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
- QPID_LOG(trace, "HA: Configuration response type=" << type << " values=" << values);
+ QPID_LOG(trace, "HA: Backup received response type=" << type
+ << " values=" << values);
if (type == QUEUE) doResponseQueue(values);
else if (type == EXCHANGE) doResponseExchange(values);
else if (type == BINDING) doResponseBind(values);
else throw Exception(QPID_MSG("HA: Unexpected response type: " << type));
}
} else {
- QPID_LOG(warning, QPID_MSG("HA: Expecting remote configuration message, got: " << *headers));
+ QPID_LOG(error, QPID_MSG("HA: Backup received unexpected message: "
+ << *headers));
}
} catch (const std::exception& e) {
- QPID_LOG(warning, "HA: Error replicating configuration: " << e.what());
- QPID_LOG(debug, "HA: Error processing configuration message: " << list);
+ QPID_LOG(error, "HA: Backup replication error: " << e.what());
+ QPID_LOG(error, "HA: Backup replication error while processing: " << list);
}
}
@@ -267,9 +271,7 @@ void WiringReplicator::doEventQueueDeclare(Variant::Map& values) {
if (values[DISP] == CREATED && replicateLevel(argsMap)) {
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
-
- QPID_LOG(debug, "HA: Creating queue from event " << name);
- std::pair<boost::shared_ptr<Queue>, bool> result =
+ std::pair<boost::shared_ptr<Queue>, bool> result =
broker.createQueue(
name,
values[DURABLE].asBool(),
@@ -284,10 +286,11 @@ void WiringReplicator::doEventQueueDeclare(Variant::Map& values) {
// re-create from event.
// Events are always up to date, whereas responses may be
// out of date.
- QPID_LOG(debug, "HA: New queue replica " << name);
+ QPID_LOG(debug, "HA: Created backup queue from event: " << name);
startQueueReplicator(result.first);
} else {
- QPID_LOG(warning, "HA: Replicated queue " << name << " already exists");
+ // FIXME aconway 2011-12-02: what's the right way to handle this?
+ QPID_LOG(warning, "HA: Queue already exists on backup: " << name);
}
}
}
@@ -296,7 +299,7 @@ void WiringReplicator::doEventQueueDelete(Variant::Map& values) {
string name = values[QNAME].asString();
boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
if (queue && replicateLevel(queue->getSettings())) {
- QPID_LOG(debug, "HA: Deleting queue from event: " << name);
+ QPID_LOG(debug, "HA: Deleting backup queue from event: " << name);
broker.deleteQueue(
name,
values[USER].asString(),
@@ -310,18 +313,20 @@ void WiringReplicator::doEventExchangeDeclare(Variant::Map& values) {
string name = values[EXNAME].asString();
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
- QPID_LOG(debug, "HA: New exchange replica " << name);
- if (!broker.createExchange(
+ if (broker.createExchange(
name,
values[EXTYPE].asString(),
values[DURABLE].asBool(),
values[ALTEX].asString(),
args,
values[USER].asString(),
- values[RHOST].asString()).second) {
+ values[RHOST].asString()).second)
+ {
+ QPID_LOG(debug, "HA: created backup exchange from event: " << name);
+ } else {
// FIXME aconway 2011-11-22: should delete pre-exisitng exchange
// and re-create from event. See comment in doEventQueueDeclare.
- QPID_LOG(warning, "HA: Replicated exchange " << name << " already exists");
+ QPID_LOG(warning, "HA: Exchange already exists on backup: " << name);
}
}
}
@@ -331,7 +336,7 @@ void WiringReplicator::doEventExchangeDelete(Variant::Map& values) {
try {
boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
if (exchange && replicateLevel(exchange->getArgs())) {
- QPID_LOG(debug, "HA: Deleting exchange:" << name);
+ QPID_LOG(debug, "HA: Deleting backup exchange:" << name);
broker.deleteExchange(
name,
values[USER].asString(),
@@ -378,12 +383,12 @@ void WiringReplicator::doResponseQueue(Variant::Map& values) {
""/*TODO: who is the user?*/,
""/*TODO: what should we use as connection id?*/);
if (result.second) {
- QPID_LOG(debug, "HA: New queue replica: " << values[NAME] << " (in catch-up)");
+ QPID_LOG(debug, "HA: Created backup queue from response: " << values[NAME]);
startQueueReplicator(result.first);
} else {
// FIXME aconway 2011-11-22: Normal to find queue already
// exists if we're failing over.
- QPID_LOG(warning, "HA: Replicated queue " << values[NAME] << " already exists (in catch-up)");
+ QPID_LOG(warning, "HA: Queue already exists on backup: " << name);
}
}
@@ -392,16 +397,18 @@ void WiringReplicator::doResponseExchange(Variant::Map& values) {
if (!replicateLevel(argsMap)) return;
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
- QPID_LOG(debug, "HA: New exchange replica " << values[NAME] << " (in catch-up)");
- if (!broker.createExchange(
+ if (broker.createExchange(
values[NAME].asString(),
values[TYPE].asString(),
values[DURABLE].asBool(),
""/*TODO: need to include alternate-exchange*/,
args,
""/*TODO: who is the user?*/,
- ""/*TODO: what should we use as connection id?*/).second) {
- QPID_LOG(warning, "HA: Replicated exchange " << values[QNAME] << " already exists (in catch-up)");
+ ""/*TODO: what should we use as connection id?*/).second)
+ {
+ QPID_LOG(debug, "HA: Created backup exchange from response: " << values[NAME]);
+ } else {
+ QPID_LOG(warning, "HA: Exchange already exists on backup: " << values[QNAME]);
}
}
@@ -440,10 +447,10 @@ void WiringReplicator::doResponseBind(Variant::Map& values) {
framing::FieldTable args;
amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
string key = values[KEY].asString();
- QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName()
+ exchange->bind(queue, key, &args);
+ QPID_LOG(debug, "HA: Created backup binding from response: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
- exchange->bind(queue, key, &args);
}
}