diff options
author | Alan Conway <aconway@apache.org> | 2012-01-19 23:05:08 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-01-19 23:05:08 +0000 |
commit | 6b8cae88b5eafc03773bacfa97b07eda7e1973cc (patch) | |
tree | d67965fa666902fb398c75155ab08233662044d0 | |
parent | 0010e075f124b72498419b9431bfac5c83c5eb82 (diff) | |
download | qpid-python-6b8cae88b5eafc03773bacfa97b07eda7e1973cc.tar.gz |
QPID-3603: Cleaned up HA log messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1233655 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 29 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaPlugin.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 18 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/WiringReplicator.cpp | 65 |
6 files changed, 61 insertions, 61 deletions
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 17da13ed1e..48ece928b6 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -43,23 +43,20 @@ using types::Variant; using std::string; Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) { - // FIXME aconway 2011-11-24: identifying the primary. - if (s.brokerUrl != "primary") { // FIXME aconway 2011-11-22: temporary hack to identify primary. - Url url(s.brokerUrl); - QPID_LOG(info, "HA: Acting as backup"); - string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; + Url url(s.brokerUrl); + string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; - // FIXME aconway 2011-11-17: TBD: link management, discovery, fail-over. - // Declare the link - std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare( - url[0].host, url[0].port, protocol, - false, // durable - s.mechanism, s.username, s.password); - assert(result.second); // FIXME aconway 2011-11-23: error handling - link = result.first; - boost::shared_ptr<WiringReplicator> wr(new WiringReplicator(link)); - broker.getExchanges().registerExchange(wr); - } + // FIXME aconway 2011-11-17: TBD: link management, discovery, fail-over. + // Declare the link + std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare( + url[0].host, url[0].port, protocol, + false, // durable + s.mechanism, s.username, s.password); + assert(result.second); // FIXME aconway 2011-11-23: error handling + link = result.first; + boost::shared_ptr<WiringReplicator> wr(new WiringReplicator(link)); + broker.getExchanges().registerExchange(wr); } + }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 43940897cc..92c431ea61 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -61,7 +61,9 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) } QPID_LOG(notice, "HA: Initialized: client-url=" << clientUrl << " broker-url=" << brokerUrl); - backup.reset(new Backup(broker, s)); + // FIXME aconway 2011-11-22: temporary hack to identify primary. + if (s.brokerUrl != "primary") + backup.reset(new Backup(broker, s)); // Register a factory for replicating subscriptions. broker.getConsumerFactories().add( boost::shared_ptr<ReplicatingSubscription::Factory>( diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp index 80f21e4320..fc9e48411d 100644 --- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp +++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp @@ -58,7 +58,7 @@ struct HaPlugin : public Plugin { if (broker && settings.enabled) { haBroker.reset(new ha::HaBroker(*broker, settings)); } else - QPID_LOG(info, "HA: Disabled"); + QPID_LOG(notice, "HA: Disabled"); } }; diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 1f582adda7..d3eefe7369 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -42,7 +42,6 @@ namespace qpid { namespace ha { using namespace broker; -// FIXME aconway 2011-12-02: separate file for string constantS? const std::string QueueReplicator::DEQUEUE_EVENT_KEY("qpid.dequeue-event"); QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l) @@ -85,7 +84,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, settings); peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); - QPID_LOG(debug, "HA: Activated route from queue " << args.i_src << " to " << args.i_dest); + QPID_LOG(debug, "HA: Backup activated bridge from queue " << args.i_src << " to " << args.i_dest); } void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid::framing::FieldTable* /*args*/) @@ -102,18 +101,15 @@ void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid if (current < *i) { //haven't got that far yet, record the dequeue dequeued.add(*i); - QPID_LOG(trace, "HA: Recording dequeue of message at " << - QueuePos(queue.get(), *i)); + QPID_LOG(trace, "HA: Recording dequeue of " << QueuePos(queue.get(), *i)); } else { QueuedMessage message; if (queue->acquireMessageAt(*i, message)) { queue->dequeue(0, message); - QPID_LOG(info, "HA: Dequeued message "<< QueuePos(message)); + QPID_LOG(trace, "HA: Backup dequeued: "<< QueuePos(message)); } else { - // FIXME aconway 2011-11-29: error handling - // Is this an error? Will happen if queue has initial dequeues. - QPID_LOG(error, "HA: Unable to dequeue message at " - << QueuePos(queue.get(), *i)); + // This can happen if we're replicating a queue that has initial dequeues. + QPID_LOG(trace, "HA: Backup message already dequeued: "<< QueuePos(queue.get(), *i)); } } } @@ -122,10 +118,10 @@ void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid //dequeued before our subscription reached them while (dequeued.contains(++current)) { dequeued.remove(current); - QPID_LOG(debug, "HA: Skipping dequeued message at " << current << " from " << queue->getName()); + QPID_LOG(trace, "HA: Backup skipping dequeued message: " << QueuePos(queue.get(), current)); queue->setPosition(current); } - QPID_LOG(info, "HA: Enqueued message on " << queue->getName() << "; currently at " << current); + QPID_LOG(trace, "HA: Backup enqueued message: " << QueuePos(queue.get(), current)); msg.deliverTo(queue); } } diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 65b1ee65e8..620d27f8ae 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -33,8 +33,6 @@ using namespace framing; using namespace broker; using namespace std; -// FIXME aconway 2011-11-28: review all arugment names, prefixes etc. -// Do we want a common HA prefix? const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription"); const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high-sequence-number"); const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low-sequence-number"); @@ -211,7 +209,7 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& m) { sys::Mutex::ScopedLock l(lock); range.add(m.position); - // FIXME aconway 2011-11-29: q[pos] + // FIXME aconway 2011-11-29: q[pos] logging QPID_LOG(trace, "HA: Updated dequeue event to include " << QueuePos(m) << "; subscription is at " << position); } notify(); diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp index 73effd9a7a..62d456b21f 100644 --- a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp @@ -115,7 +115,6 @@ const string S_WIRING="wiring"; const string S_ALL="all"; ReplicateLevel replicateLevel(const string& str) { - // FIXME aconway 2011-11-24: case insenstive comparison. ReplicateLevel rl = RL_NONE; if (str == S_WIRING) rl = RL_WIRING; else if (str == S_ALL) rl = RL_ALL; @@ -176,7 +175,7 @@ WiringReplicator::~WiringReplicator() {} WiringReplicator::WiringReplicator(const boost::shared_ptr<Link>& l) : Exchange(QPID_WIRING_REPLICATOR), broker(*l->getBroker()), link(l) { - QPID_LOG(debug, "HA: Starting replication from " << + QPID_LOG(info, "HA: Backup replicating from " << link->getTransport() << ":" << link->getHost() << ":" << link->getPort()); broker.getLinks().declare( link->getHost(), link->getPort(), @@ -212,10 +211,10 @@ void WiringReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH sendQuery(QUEUE, queueName, sessionHandler); sendQuery(EXCHANGE, queueName, sessionHandler); sendQuery(BINDING, queueName, sessionHandler); - QPID_LOG(debug, "HA: Activated wiring replicator") + QPID_LOG(debug, "HA: Backup activated wiring bridge: " << queueName); } -// FIXME aconway 2011-12-02: error handling in route. Be forging but log warnings? +// FIXME aconway 2011-12-02: error handling in route. void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) { Variant::List list; try { @@ -230,7 +229,8 @@ void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const fram Variant::Map& map = i->asMap(); Variant::Map& schema = map[SCHEMA_ID].asMap(); Variant::Map& values = map[VALUES].asMap(); - QPID_LOG(trace, "HA: Configuration event: schema=" << schema << " values=" << values); + QPID_LOG(debug, "HA: Backup received event: schema=" << schema + << " values=" << values); if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values); else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values); else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values); @@ -238,7 +238,9 @@ void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const fram else if (match<EventBind>(schema)) doEventBind(values); // FIXME aconway 2011-11-21: handle unbind & all other events. else if (match<EventSubscribe>(schema)) {} // Deliberately ignored. - else throw(Exception(QPID_MSG("WiringReplicator received unexpected event, schema=" << schema))); + // FIXME aconway 2011-12-02: error handling + else throw(Exception(QPID_MSG("Backup received unexpected event, schema=" + << schema))); } } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { @@ -246,18 +248,20 @@ void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const fram Variant::Map& values = i->asMap()[VALUES].asMap(); framing::FieldTable args; amqp_0_10::translate(values[ARGUMENTS].asMap(), args); - QPID_LOG(trace, "HA: Configuration response type=" << type << " values=" << values); + QPID_LOG(trace, "HA: Backup received response type=" << type + << " values=" << values); if (type == QUEUE) doResponseQueue(values); else if (type == EXCHANGE) doResponseExchange(values); else if (type == BINDING) doResponseBind(values); else throw Exception(QPID_MSG("HA: Unexpected response type: " << type)); } } else { - QPID_LOG(warning, QPID_MSG("HA: Expecting remote configuration message, got: " << *headers)); + QPID_LOG(error, QPID_MSG("HA: Backup received unexpected message: " + << *headers)); } } catch (const std::exception& e) { - QPID_LOG(warning, "HA: Error replicating configuration: " << e.what()); - QPID_LOG(debug, "HA: Error processing configuration message: " << list); + QPID_LOG(error, "HA: Backup replication error: " << e.what()); + QPID_LOG(error, "HA: Backup replication error while processing: " << list); } } @@ -267,9 +271,7 @@ void WiringReplicator::doEventQueueDeclare(Variant::Map& values) { if (values[DISP] == CREATED && replicateLevel(argsMap)) { framing::FieldTable args; amqp_0_10::translate(argsMap, args); - - QPID_LOG(debug, "HA: Creating queue from event " << name); - std::pair<boost::shared_ptr<Queue>, bool> result = + std::pair<boost::shared_ptr<Queue>, bool> result = broker.createQueue( name, values[DURABLE].asBool(), @@ -284,10 +286,11 @@ void WiringReplicator::doEventQueueDeclare(Variant::Map& values) { // re-create from event. // Events are always up to date, whereas responses may be // out of date. - QPID_LOG(debug, "HA: New queue replica " << name); + QPID_LOG(debug, "HA: Created backup queue from event: " << name); startQueueReplicator(result.first); } else { - QPID_LOG(warning, "HA: Replicated queue " << name << " already exists"); + // FIXME aconway 2011-12-02: what's the right way to handle this? + QPID_LOG(warning, "HA: Queue already exists on backup: " << name); } } } @@ -296,7 +299,7 @@ void WiringReplicator::doEventQueueDelete(Variant::Map& values) { string name = values[QNAME].asString(); boost::shared_ptr<Queue> queue = broker.getQueues().find(name); if (queue && replicateLevel(queue->getSettings())) { - QPID_LOG(debug, "HA: Deleting queue from event: " << name); + QPID_LOG(debug, "HA: Deleting backup queue from event: " << name); broker.deleteQueue( name, values[USER].asString(), @@ -310,18 +313,20 @@ void WiringReplicator::doEventExchangeDeclare(Variant::Map& values) { string name = values[EXNAME].asString(); framing::FieldTable args; amqp_0_10::translate(argsMap, args); - QPID_LOG(debug, "HA: New exchange replica " << name); - if (!broker.createExchange( + if (broker.createExchange( name, values[EXTYPE].asString(), values[DURABLE].asBool(), values[ALTEX].asString(), args, values[USER].asString(), - values[RHOST].asString()).second) { + values[RHOST].asString()).second) + { + QPID_LOG(debug, "HA: created backup exchange from event: " << name); + } else { // FIXME aconway 2011-11-22: should delete pre-exisitng exchange // and re-create from event. See comment in doEventQueueDeclare. - QPID_LOG(warning, "HA: Replicated exchange " << name << " already exists"); + QPID_LOG(warning, "HA: Exchange already exists on backup: " << name); } } } @@ -331,7 +336,7 @@ void WiringReplicator::doEventExchangeDelete(Variant::Map& values) { try { boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name); if (exchange && replicateLevel(exchange->getArgs())) { - QPID_LOG(debug, "HA: Deleting exchange:" << name); + QPID_LOG(debug, "HA: Deleting backup exchange:" << name); broker.deleteExchange( name, values[USER].asString(), @@ -378,12 +383,12 @@ void WiringReplicator::doResponseQueue(Variant::Map& values) { ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/); if (result.second) { - QPID_LOG(debug, "HA: New queue replica: " << values[NAME] << " (in catch-up)"); + QPID_LOG(debug, "HA: Created backup queue from response: " << values[NAME]); startQueueReplicator(result.first); } else { // FIXME aconway 2011-11-22: Normal to find queue already // exists if we're failing over. - QPID_LOG(warning, "HA: Replicated queue " << values[NAME] << " already exists (in catch-up)"); + QPID_LOG(warning, "HA: Queue already exists on backup: " << name); } } @@ -392,16 +397,18 @@ void WiringReplicator::doResponseExchange(Variant::Map& values) { if (!replicateLevel(argsMap)) return; framing::FieldTable args; amqp_0_10::translate(argsMap, args); - QPID_LOG(debug, "HA: New exchange replica " << values[NAME] << " (in catch-up)"); - if (!broker.createExchange( + if (broker.createExchange( values[NAME].asString(), values[TYPE].asString(), values[DURABLE].asBool(), ""/*TODO: need to include alternate-exchange*/, args, ""/*TODO: who is the user?*/, - ""/*TODO: what should we use as connection id?*/).second) { - QPID_LOG(warning, "HA: Replicated exchange " << values[QNAME] << " already exists (in catch-up)"); + ""/*TODO: what should we use as connection id?*/).second) + { + QPID_LOG(debug, "HA: Created backup exchange from response: " << values[NAME]); + } else { + QPID_LOG(warning, "HA: Exchange already exists on backup: " << values[QNAME]); } } @@ -440,10 +447,10 @@ void WiringReplicator::doResponseBind(Variant::Map& values) { framing::FieldTable args; amqp_0_10::translate(values[ARGUMENTS].asMap(), args); string key = values[KEY].asString(); - QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName() + exchange->bind(queue, key, &args); + QPID_LOG(debug, "HA: Created backup binding from response: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); - exchange->bind(queue, key, &args); } } |