diff options
author | Alan Conway <aconway@apache.org> | 2012-04-19 21:44:59 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-04-19 21:44:59 +0000 |
commit | 94d19ea9de7107141423e408c0f59fd61a297844 (patch) | |
tree | 99c12a30f5297eca954f4fb8f075cda421ce8980 | |
parent | 2c52e54d4d95b299949efa7780d2b55ed2e9a662 (diff) | |
download | qpid-python-94d19ea9de7107141423e408c0f59fd61a297844.tar.gz |
QPID-3606: More consistent logging and error handling for HA code.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1328124 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/CMakeLists.txt | 1 | ||||
-rw-r--r-- | qpid/cpp/src/Makefile.am | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueuedMessage.cpp | 34 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueuedMessage.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 55 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicateLevel.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 71 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 2 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 2 |
14 files changed, 135 insertions, 73 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index 0c90a29d52..4b740766da 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -1113,6 +1113,7 @@ set (qpidbroker_SOURCES qpid/broker/NameGenerator.cpp qpid/broker/NullMessageStore.cpp qpid/broker/QueueBindings.cpp + qpid/broker/QueuedMessage.cpp qpid/broker/QueueEvents.cpp qpid/broker/QueuePolicy.cpp qpid/broker/QueueRegistry.cpp diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 5dcc4cd210..d6d4088622 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -634,6 +634,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/QueuePolicy.h \ qpid/broker/QueueRegistry.cpp \ qpid/broker/QueueRegistry.h \ + qpid/broker/QueuedMessage.cpp \ qpid/broker/QueuedMessage.h \ qpid/broker/QueueFlowLimit.h \ qpid/broker/QueueFlowLimit.cpp \ diff --git a/qpid/cpp/src/qpid/broker/QueuedMessage.cpp b/qpid/cpp/src/qpid/broker/QueuedMessage.cpp new file mode 100644 index 0000000000..d40cc901ff --- /dev/null +++ b/qpid/cpp/src/qpid/broker/QueuedMessage.cpp @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "QueuedMessage.h" +#include "Queue.h" +#include <iostream> + +namespace qpid { +namespace broker { + +std::ostream& operator<<(std::ostream& o, const QueuedMessage& qm) { + o << (qm.queue ? qm.queue->getName() : std::string()) << "[" << qm.position <<"]"; + return o; +} + + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/QueuedMessage.h b/qpid/cpp/src/qpid/broker/QueuedMessage.h index 806da8e720..a1435aba42 100644 --- a/qpid/cpp/src/qpid/broker/QueuedMessage.h +++ b/qpid/cpp/src/qpid/broker/QueuedMessage.h @@ -22,6 +22,7 @@ #define _QueuedMessage_ #include "qpid/broker/Message.h" +#include <iosfwd> namespace qpid { namespace broker { @@ -47,6 +48,7 @@ inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) { return a.position < b.position; } +std::ostream& operator<<(std::ostream&, const QueuedMessage&); }} diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index eabf502f8b..53550cfcd1 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -52,15 +52,14 @@ Backup::Backup(HaBroker& hb, const Settings& s) : } void Backup::initialize(const Url& url) { - assert(!url.empty()); - QPID_LOG(notice, "HA: Backup started: " << url); + if (url.empty()) throw Url::Invalid("HA broker URL is empty"); + QPID_LOG(notice, "HA: Backup initialized: " << url); string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; // Declare the link std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare( url[0].host, url[0].port, protocol, false, // durable settings.mechanism, settings.username, settings.password); - assert(result.second); // FIXME aconway 2011-11-23: error handling link = result.first; link->setUrl(url); @@ -74,7 +73,7 @@ void Backup::setBrokerUrl(const Url& url) { if (url.empty()) return; sys::Mutex::ScopedLock l(lock); if (link) { // URL changed after we initialized. - QPID_LOG(info, "HA: Backup failover URL set to " << url); + QPID_LOG(info, "HA: Backup broker URL set to " << url); link->setUrl(url); } else { diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 0c29a518a2..70afd2bfa7 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -233,7 +233,6 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH QPID_LOG(debug, "HA: Backup activated configuration bridge: " << queueName); } -// FIXME aconway 2011-12-02: error handling in route. void BrokerReplicator::route(Deliverable& msg) { const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders(); Variant::List list; @@ -266,12 +265,12 @@ void BrokerReplicator::route(Deliverable& msg) { else if (type == EXCHANGE) doResponseExchange(values); else if (type == BINDING) doResponseBind(values); else if (type == HA_BROKER) doResponseHaBroker(values); - else QPID_LOG(error, "HA: Backup received unknown response type=" << type - << " values=" << values); } - } else QPID_LOG(error, "HA: Backup received unexpected message: " << *headers); + } } catch (const std::exception& e) { - QPID_LOG(error, "HA: Backup replication error: " << e.what() << ": while handling: " << list); + QPID_LOG(critical, "HA: Backup configuration replication failed: " << e.what() + << ": while handling: " << list); + throw; } } @@ -521,7 +520,8 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) { void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) { if (replicateLevel(queue->getSettings()) == RL_ALL) { boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link)); - broker.getExchanges().registerExchange(qr); + if (!broker.getExchanges().registerExchange(qr)) + throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName())); qr->activate(); } } diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 24798aec6f..7d82fb63bd 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -94,7 +94,7 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, // NOTE: resetting backup allows client connections, so any // primary state should be set up here before backup.reset() backup.reset(); - QPID_LOG(notice, "HA: Primary promoted from backup"); + QPID_LOG(notice, "HA: Promoted to primary"); mgmtObject->set_status(PRIMARY); } break; @@ -146,7 +146,7 @@ void HaBroker::setClientUrl(const Url& url, const sys::Mutex::ScopedLock& l) { void HaBroker::updateClientUrl(const sys::Mutex::ScopedLock&) { Url url = clientUrl.empty() ? brokerUrl : clientUrl; - assert(!url.empty()); + if (url.empty()) throw Url::Invalid("HA client URL is empty"); mgmtObject->set_publicBrokers(url.str()); knownBrokers.clear(); knownBrokers.push_back(url); @@ -154,7 +154,7 @@ void HaBroker::updateClientUrl(const sys::Mutex::ScopedLock&) { } void HaBroker::setBrokerUrl(const Url& url, const sys::Mutex::ScopedLock& l) { - if (url.empty()) throw Exception("Invalid empty URL for HA broker failover"); + if (url.empty()) throw Url::Invalid("HA broker URL is empty"); QPID_LOG(debug, "HA: Setting broker URL to: " << url); brokerUrl = url; mgmtObject->set_brokers(brokerUrl.str()); diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index 65ad3de4a0..99b30fd36b 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -57,6 +57,7 @@ class HaBroker : public management::Manageable // Log a critical error message and shut down the broker. void shutdown(const std::string& message); + private: void setClientUrl(const Url&, const sys::Mutex::ScopedLock&); void setBrokerUrl(const Url&, const sys::Mutex::ScopedLock&); diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 83f3d28b6d..633619be13 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -30,6 +30,7 @@ #include "qpid/framing/SequenceSet.h" #include "qpid/framing/FieldTable.h" #include "qpid/log/Statement.h" +#include "qpid/Msg.h" #include <boost/shared_ptr.hpp> namespace { @@ -53,8 +54,8 @@ std::string QueueReplicator::replicatorName(const std::string& queueName) { QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l) : Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l) { - logPrefix = "HA: Backup of queue " + queue->getName() + ": "; - QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings()); + logPrefix = "HA: Backup of " + queue->getName() + ": "; + QPID_LOG(info, logPrefix << "Created"); } // This must be separate from the constructor so we can call shared_from_this. @@ -74,7 +75,7 @@ void QueueReplicator::activate() { 0, // sync? // Include shared_ptr to self to ensure we are not deleted // before initializeBridge is called. - boost::bind(&QueueReplicator::initializeBridge, this, _1, _2, shared_from_this()) + boost::bind(&QueueReplicator::initializeBridge, shared_from_this(), _1, _2) ); } @@ -88,9 +89,7 @@ void QueueReplicator::deactivate() { } // Called in a broker connection thread when the bridge is created. -// shared_ptr to self ensures we are not deleted before initializeBridge is called. -void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler, - boost::shared_ptr<QueueReplicator> /*self*/) { +void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { sys::Mutex::ScopedLock l(lock); bridgeName = bridge.getName(); framing::AMQP_ServerProxy peer(sessionHandler.out); @@ -140,23 +139,33 @@ void QueueReplicator::dequeue(SequenceNumber n, const sys::Mutex::ScopedLock&) // Called in connection thread of the queues bridge to primary. void QueueReplicator::route(Deliverable& msg) { - const std::string& key = msg.getMessage().getRoutingKey(); - sys::Mutex::ScopedLock l(lock); - if (key == DEQUEUE_EVENT_KEY) { - SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage()); - QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues); - //TODO: should be able to optimise the following - for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++) - dequeue(*i, l); - } else if (key == POSITION_EVENT_KEY) { - SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage()); - QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition() - << " to " << position); - assert(queue->getPosition() <= position); - queue->setPosition(position); - } else { - msg.deliverTo(queue); - QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition()); + try { + const std::string& key = msg.getMessage().getRoutingKey(); + sys::Mutex::ScopedLock l(lock); + if (key == DEQUEUE_EVENT_KEY) { + SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage()); + QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues); + //TODO: should be able to optimise the following + for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++) + dequeue(*i, l); + } else if (key == POSITION_EVENT_KEY) { + SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage()); + QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition() + << " to " << position); + if (queue->getPosition() > position) { + throw Exception( + QPID_MSG(logPrefix << "Invalid position update from " + << queue->getPosition() << " to " << position)); + } + queue->setPosition(position); + } else { + msg.deliverTo(queue); + QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition()); + } + } + catch (const std::exception& e) { + QPID_LOG(critical, logPrefix << "Replication failed: " << e.what()); + throw; } } diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index a1ebbd788a..bcbac988fa 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -70,8 +70,7 @@ class QueueReplicator : public broker::Exchange, bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); private: - void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler, - boost::shared_ptr<QueueReplicator> self); + void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler); void dequeue(framing::SequenceNumber, const sys::Mutex::ScopedLock&); std::string logPrefix; diff --git a/qpid/cpp/src/qpid/ha/ReplicateLevel.cpp b/qpid/cpp/src/qpid/ha/ReplicateLevel.cpp index 2c3614bb63..4981577225 100644 --- a/qpid/cpp/src/qpid/ha/ReplicateLevel.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicateLevel.cpp @@ -20,6 +20,7 @@ */ #include "ReplicateLevel.h" #include "qpid/Exception.h" +#include "qpid/Msg.h" #include <iostream> #include <assert.h> @@ -31,7 +32,7 @@ using namespace std; // Note replicateLevel is called during plugin-initialization which // happens in the static construction phase so these constants need // to be POD, they can't be class objects -// +// namespace { const char* S_NONE="none"; const char* S_CONFIGURATION="configuration"; @@ -47,17 +48,15 @@ bool replicateLevel(const string& level, ReplicateLevel& out) { ReplicateLevel replicateLevel(const string& level) { ReplicateLevel rl; - if (!replicateLevel(level, rl)) { - assert(0); + if (!replicateLevel(level, rl)) throw Exception("Invalid value for replication level: "+level); - } return rl; } string str(ReplicateLevel l) { const char* names[] = { S_NONE, S_CONFIGURATION, S_ALL }; - assert(l >= RL_NONE); - assert(l <= RL_ALL); + if (l > RL_ALL) + throw Exception(QPID_MSG("Invalid value for replication level: " << l)); return names[l]; } diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index af6180305d..91a4538bc4 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -87,10 +87,13 @@ ReplicatingSubscription::ReplicatingSubscription( events(new Queue(mask(name))), consumer(new DelegatingConsumer(*this)) { + // Separate the remote part from a "local-remote" address. + string address = parent->getSession().getConnection().getUrl(); + size_t i = address.find('-'); + if (i != string::npos) address = address.substr(i+1); + logPrefix = "HA: Primary "; stringstream ss; - ss << "HA: Primary: " << getQueue()->getName() << " at " - << parent->getSession().getConnection().getUrl() << ": "; - logPrefix = ss.str(); + logSuffix = " (" + address + ")"; // FIXME aconway 2011-12-09: Failover optimization removed. // There was code here to re-use messages already on the backup @@ -99,7 +102,7 @@ ReplicatingSubscription::ReplicatingSubscription( // can be re-introduced later. Last revision with the optimization: // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters. - QPID_LOG(debug, logPrefix << "Created backup subscription " << getName()); + QPID_LOG(debug, logPrefix << "created backup subscription " << getName() << logSuffix); // FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0 // so we will start consuming from the lowest numbered message. @@ -109,23 +112,36 @@ ReplicatingSubscription::ReplicatingSubscription( // Message is delivered in the subscription's connection thread. bool ReplicatingSubscription::deliver(QueuedMessage& m) { - // Add position events for the subscribed queue, not for the internal event queue. - if (m.queue && m.queue == getQueue().get()) { - sys::Mutex::ScopedLock l(lock); - assert(position == m.position); - // m.position is the position of the newly enqueued m on the local queue. - // backupPosition is latest position on the backup queue (before enqueueing m.) - assert(m.position > backupPosition); - if (m.position - backupPosition > 1) { - // Position has advanced because of messages dequeued ahead of us. - SequenceNumber send(m.position); - --send; // Send the position before m was enqueued. - sendPositionEvent(send, l); + try { + // Add position events for the subscribed queue, not for the internal event queue. + if (m.queue && m.queue == getQueue().get()) { + sys::Mutex::ScopedLock l(lock); + if (position != m.position) + throw Exception( + QPID_MSG("Expected position " << position + << " but got " << m.position)); + // m.position is the position of the newly enqueued m on the local queue. + // backupPosition is latest position on the backup queue (before enqueueing m.) + if (m.position <= backupPosition) + throw Exception( + QPID_MSG("Expected position > " << backupPosition + << " but got " << m.position)); + + if (m.position - backupPosition > 1) { + // Position has advanced because of messages dequeued ahead of us. + SequenceNumber send(m.position); + --send; // Send the position before m was enqueued. + sendPositionEvent(send, l); + } + backupPosition = m.position; + QPID_LOG(trace, logPrefix << "replicating " << m << logSuffix); } - backupPosition = m.position; - QPID_LOG(trace, logPrefix << "Replicating message " << m.position); + return ConsumerImpl::deliver(m); + } catch (const std::exception& e) { + QPID_LOG(critical, logPrefix << "error replicating " << getQueue()->getName() + << logSuffix << ": " << e.what()); + throw; } - return ConsumerImpl::deliver(m); } ReplicatingSubscription::~ReplicatingSubscription() {} @@ -139,7 +155,7 @@ void ReplicatingSubscription::complete( { // Handle completions for the subscribed queue, not the internal event queue. if (qm.queue && qm.queue == getQueue().get()) { - QPID_LOG(trace, logPrefix << "Completed message " << qm.position); + QPID_LOG(trace, logPrefix << "completed " << qm << logSuffix); Delayed::iterator i= delayed.find(qm.position); // The same message can be completed twice, by acknowledged and // dequeued, remove it from the set so it only gets completed @@ -157,7 +173,7 @@ void ReplicatingSubscription::complete( void ReplicatingSubscription::enqueued(const QueuedMessage& qm) { sys::Mutex::ScopedLock l(lock); // Delay completion - QPID_LOG(trace, logPrefix << "Delaying completion of message " << qm.position); + QPID_LOG(trace, logPrefix << "delaying completion of " << qm << logSuffix); qm.payload->getIngressCompletion().startCompleter(); assert(delayed.find(qm.position) == delayed.end()); delayed[qm.position] = qm; @@ -168,7 +184,7 @@ void ReplicatingSubscription::enqueued(const QueuedMessage& qm) { void ReplicatingSubscription::cancelComplete( const Delayed::value_type& v, const sys::Mutex::ScopedLock&) { - QPID_LOG(trace, logPrefix << "Cancel completed message " << v.second.position); + QPID_LOG(trace, logPrefix << "cancel completed " << v.second << logSuffix); v.second.payload->getIngressCompletion().finishCompleter(); } @@ -179,7 +195,7 @@ void ReplicatingSubscription::cancel() boost::dynamic_pointer_cast<QueueObserver>(shared_from_this())); { sys::Mutex::ScopedLock l(lock); - QPID_LOG(debug, logPrefix <<"Cancelled backup subscription " << getName()); + QPID_LOG(debug, logPrefix << "cancel backup subscription " << getName() << logSuffix); for_each(delayed.begin(), delayed.end(), boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l))); delayed.clear(); @@ -201,7 +217,8 @@ bool ReplicatingSubscription::hideDeletedError() { return true; } // Called with lock held. Called in subscription's connection thread. void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l) { - QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues); + QPID_LOG(trace, logPrefix << "sending dequeues " << dequeues + << " from " << getQueue()->getName() << logSuffix); string buf(dequeues.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); dequeues.encode(buffer); @@ -216,7 +233,7 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& qm) { { sys::Mutex::ScopedLock l(lock); - QPID_LOG(trace, logPrefix << "Dequeued message " << qm.position); + QPID_LOG(trace, logPrefix << "dequeued " << qm << logSuffix); dequeues.add(qm.position); // If we have not yet sent this message to the backup, then // complete it now as it will never be accepted. @@ -229,8 +246,8 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& qm) void ReplicatingSubscription::sendPositionEvent( SequenceNumber position, const sys::Mutex::ScopedLock&l ) { - QPID_LOG(trace, logPrefix << "Sending position " << position - << ", was " << backupPosition); + QPID_LOG(trace, logPrefix << "sending position " << position + << ", was " << backupPosition << logSuffix); string buf(backupPosition.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); position.encode(buffer); diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index cbcb230bc1..f9176915f6 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -94,7 +94,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, bool doDispatch(); private: typedef std::map<framing::SequenceNumber, broker::QueuedMessage> Delayed; - std::string logPrefix; + std::string logPrefix, logSuffix; boost::shared_ptr<broker::Queue> events; boost::shared_ptr<broker::Consumer> consumer; Delayed delayed; diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 0c8ac569b8..f26f236efa 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -34,7 +34,7 @@ class HaBroker(Broker): assert BrokerTest.ha_lib, "Cannot locate HA plug-in" args = copy(args) args += ["--load-module", BrokerTest.ha_lib, - "--log-enable=info+", "--log-enable=debug+:ha::", + "--log-enable=info+", "--log-enable=trace+:ha::", # FIXME aconway 2012-04-18: trace # FIXME aconway 2012-02-13: workaround slow link failover. "--link-maintenace-interval=0.1", "--ha-cluster=%s"%ha_cluster] |