summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-04-19 21:44:59 +0000
committerAlan Conway <aconway@apache.org>2012-04-19 21:44:59 +0000
commit94d19ea9de7107141423e408c0f59fd61a297844 (patch)
tree99c12a30f5297eca954f4fb8f075cda421ce8980
parent2c52e54d4d95b299949efa7780d2b55ed2e9a662 (diff)
downloadqpid-python-94d19ea9de7107141423e408c0f59fd61a297844.tar.gz
QPID-3606: More consistent logging and error handling for HA code.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1328124 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/CMakeLists.txt1
-rw-r--r--qpid/cpp/src/Makefile.am1
-rw-r--r--qpid/cpp/src/qpid/broker/QueuedMessage.cpp34
-rw-r--r--qpid/cpp/src/qpid/broker/QueuedMessage.h2
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp7
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp12
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp6
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h1
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp55
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h3
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicateLevel.cpp11
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp71
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h2
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py2
14 files changed, 135 insertions, 73 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index 0c90a29d52..4b740766da 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -1113,6 +1113,7 @@ set (qpidbroker_SOURCES
qpid/broker/NameGenerator.cpp
qpid/broker/NullMessageStore.cpp
qpid/broker/QueueBindings.cpp
+ qpid/broker/QueuedMessage.cpp
qpid/broker/QueueEvents.cpp
qpid/broker/QueuePolicy.cpp
qpid/broker/QueueRegistry.cpp
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 5dcc4cd210..d6d4088622 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -634,6 +634,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/QueuePolicy.h \
qpid/broker/QueueRegistry.cpp \
qpid/broker/QueueRegistry.h \
+ qpid/broker/QueuedMessage.cpp \
qpid/broker/QueuedMessage.h \
qpid/broker/QueueFlowLimit.h \
qpid/broker/QueueFlowLimit.cpp \
diff --git a/qpid/cpp/src/qpid/broker/QueuedMessage.cpp b/qpid/cpp/src/qpid/broker/QueuedMessage.cpp
new file mode 100644
index 0000000000..d40cc901ff
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/QueuedMessage.cpp
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "QueuedMessage.h"
+#include "Queue.h"
+#include <iostream>
+
+namespace qpid {
+namespace broker {
+
+std::ostream& operator<<(std::ostream& o, const QueuedMessage& qm) {
+ o << (qm.queue ? qm.queue->getName() : std::string()) << "[" << qm.position <<"]";
+ return o;
+}
+
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/QueuedMessage.h b/qpid/cpp/src/qpid/broker/QueuedMessage.h
index 806da8e720..a1435aba42 100644
--- a/qpid/cpp/src/qpid/broker/QueuedMessage.h
+++ b/qpid/cpp/src/qpid/broker/QueuedMessage.h
@@ -22,6 +22,7 @@
#define _QueuedMessage_
#include "qpid/broker/Message.h"
+#include <iosfwd>
namespace qpid {
namespace broker {
@@ -47,6 +48,7 @@ inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) {
return a.position < b.position;
}
+std::ostream& operator<<(std::ostream&, const QueuedMessage&);
}}
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index eabf502f8b..53550cfcd1 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -52,15 +52,14 @@ Backup::Backup(HaBroker& hb, const Settings& s) :
}
void Backup::initialize(const Url& url) {
- assert(!url.empty());
- QPID_LOG(notice, "HA: Backup started: " << url);
+ if (url.empty()) throw Url::Invalid("HA broker URL is empty");
+ QPID_LOG(notice, "HA: Backup initialized: " << url);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
// Declare the link
std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
url[0].host, url[0].port, protocol,
false, // durable
settings.mechanism, settings.username, settings.password);
- assert(result.second); // FIXME aconway 2011-11-23: error handling
link = result.first;
link->setUrl(url);
@@ -74,7 +73,7 @@ void Backup::setBrokerUrl(const Url& url) {
if (url.empty()) return;
sys::Mutex::ScopedLock l(lock);
if (link) { // URL changed after we initialized.
- QPID_LOG(info, "HA: Backup failover URL set to " << url);
+ QPID_LOG(info, "HA: Backup broker URL set to " << url);
link->setUrl(url);
}
else {
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 0c29a518a2..70afd2bfa7 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -233,7 +233,6 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH
QPID_LOG(debug, "HA: Backup activated configuration bridge: " << queueName);
}
-// FIXME aconway 2011-12-02: error handling in route.
void BrokerReplicator::route(Deliverable& msg) {
const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders();
Variant::List list;
@@ -266,12 +265,12 @@ void BrokerReplicator::route(Deliverable& msg) {
else if (type == EXCHANGE) doResponseExchange(values);
else if (type == BINDING) doResponseBind(values);
else if (type == HA_BROKER) doResponseHaBroker(values);
- else QPID_LOG(error, "HA: Backup received unknown response type=" << type
- << " values=" << values);
}
- } else QPID_LOG(error, "HA: Backup received unexpected message: " << *headers);
+ }
} catch (const std::exception& e) {
- QPID_LOG(error, "HA: Backup replication error: " << e.what() << ": while handling: " << list);
+ QPID_LOG(critical, "HA: Backup configuration replication failed: " << e.what()
+ << ": while handling: " << list);
+ throw;
}
}
@@ -521,7 +520,8 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) {
if (replicateLevel(queue->getSettings()) == RL_ALL) {
boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
- broker.getExchanges().registerExchange(qr);
+ if (!broker.getExchanges().registerExchange(qr))
+ throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName()));
qr->activate();
}
}
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 24798aec6f..7d82fb63bd 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -94,7 +94,7 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
// NOTE: resetting backup allows client connections, so any
// primary state should be set up here before backup.reset()
backup.reset();
- QPID_LOG(notice, "HA: Primary promoted from backup");
+ QPID_LOG(notice, "HA: Promoted to primary");
mgmtObject->set_status(PRIMARY);
}
break;
@@ -146,7 +146,7 @@ void HaBroker::setClientUrl(const Url& url, const sys::Mutex::ScopedLock& l) {
void HaBroker::updateClientUrl(const sys::Mutex::ScopedLock&) {
Url url = clientUrl.empty() ? brokerUrl : clientUrl;
- assert(!url.empty());
+ if (url.empty()) throw Url::Invalid("HA client URL is empty");
mgmtObject->set_publicBrokers(url.str());
knownBrokers.clear();
knownBrokers.push_back(url);
@@ -154,7 +154,7 @@ void HaBroker::updateClientUrl(const sys::Mutex::ScopedLock&) {
}
void HaBroker::setBrokerUrl(const Url& url, const sys::Mutex::ScopedLock& l) {
- if (url.empty()) throw Exception("Invalid empty URL for HA broker failover");
+ if (url.empty()) throw Url::Invalid("HA broker URL is empty");
QPID_LOG(debug, "HA: Setting broker URL to: " << url);
brokerUrl = url;
mgmtObject->set_brokers(brokerUrl.str());
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index 65ad3de4a0..99b30fd36b 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -57,6 +57,7 @@ class HaBroker : public management::Manageable
// Log a critical error message and shut down the broker.
void shutdown(const std::string& message);
+
private:
void setClientUrl(const Url&, const sys::Mutex::ScopedLock&);
void setBrokerUrl(const Url&, const sys::Mutex::ScopedLock&);
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 83f3d28b6d..633619be13 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -30,6 +30,7 @@
#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/log/Statement.h"
+#include "qpid/Msg.h"
#include <boost/shared_ptr.hpp>
namespace {
@@ -53,8 +54,8 @@ std::string QueueReplicator::replicatorName(const std::string& queueName) {
QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l)
: Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l)
{
- logPrefix = "HA: Backup of queue " + queue->getName() + ": ";
- QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings());
+ logPrefix = "HA: Backup of " + queue->getName() + ": ";
+ QPID_LOG(info, logPrefix << "Created");
}
// This must be separate from the constructor so we can call shared_from_this.
@@ -74,7 +75,7 @@ void QueueReplicator::activate() {
0, // sync?
// Include shared_ptr to self to ensure we are not deleted
// before initializeBridge is called.
- boost::bind(&QueueReplicator::initializeBridge, this, _1, _2, shared_from_this())
+ boost::bind(&QueueReplicator::initializeBridge, shared_from_this(), _1, _2)
);
}
@@ -88,9 +89,7 @@ void QueueReplicator::deactivate() {
}
// Called in a broker connection thread when the bridge is created.
-// shared_ptr to self ensures we are not deleted before initializeBridge is called.
-void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler,
- boost::shared_ptr<QueueReplicator> /*self*/) {
+void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
sys::Mutex::ScopedLock l(lock);
bridgeName = bridge.getName();
framing::AMQP_ServerProxy peer(sessionHandler.out);
@@ -140,23 +139,33 @@ void QueueReplicator::dequeue(SequenceNumber n, const sys::Mutex::ScopedLock&)
// Called in connection thread of the queues bridge to primary.
void QueueReplicator::route(Deliverable& msg)
{
- const std::string& key = msg.getMessage().getRoutingKey();
- sys::Mutex::ScopedLock l(lock);
- if (key == DEQUEUE_EVENT_KEY) {
- SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
- QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues);
- //TODO: should be able to optimise the following
- for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++)
- dequeue(*i, l);
- } else if (key == POSITION_EVENT_KEY) {
- SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage());
- QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition()
- << " to " << position);
- assert(queue->getPosition() <= position);
- queue->setPosition(position);
- } else {
- msg.deliverTo(queue);
- QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition());
+ try {
+ const std::string& key = msg.getMessage().getRoutingKey();
+ sys::Mutex::ScopedLock l(lock);
+ if (key == DEQUEUE_EVENT_KEY) {
+ SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
+ QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues);
+ //TODO: should be able to optimise the following
+ for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++)
+ dequeue(*i, l);
+ } else if (key == POSITION_EVENT_KEY) {
+ SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage());
+ QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition()
+ << " to " << position);
+ if (queue->getPosition() > position) {
+ throw Exception(
+ QPID_MSG(logPrefix << "Invalid position update from "
+ << queue->getPosition() << " to " << position));
+ }
+ queue->setPosition(position);
+ } else {
+ msg.deliverTo(queue);
+ QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition());
+ }
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(critical, logPrefix << "Replication failed: " << e.what());
+ throw;
}
}
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index a1ebbd788a..bcbac988fa 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -70,8 +70,7 @@ class QueueReplicator : public broker::Exchange,
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
private:
- void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler,
- boost::shared_ptr<QueueReplicator> self);
+ void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
void dequeue(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
std::string logPrefix;
diff --git a/qpid/cpp/src/qpid/ha/ReplicateLevel.cpp b/qpid/cpp/src/qpid/ha/ReplicateLevel.cpp
index 2c3614bb63..4981577225 100644
--- a/qpid/cpp/src/qpid/ha/ReplicateLevel.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicateLevel.cpp
@@ -20,6 +20,7 @@
*/
#include "ReplicateLevel.h"
#include "qpid/Exception.h"
+#include "qpid/Msg.h"
#include <iostream>
#include <assert.h>
@@ -31,7 +32,7 @@ using namespace std;
// Note replicateLevel is called during plugin-initialization which
// happens in the static construction phase so these constants need
// to be POD, they can't be class objects
-//
+//
namespace {
const char* S_NONE="none";
const char* S_CONFIGURATION="configuration";
@@ -47,17 +48,15 @@ bool replicateLevel(const string& level, ReplicateLevel& out) {
ReplicateLevel replicateLevel(const string& level) {
ReplicateLevel rl;
- if (!replicateLevel(level, rl)) {
- assert(0);
+ if (!replicateLevel(level, rl))
throw Exception("Invalid value for replication level: "+level);
- }
return rl;
}
string str(ReplicateLevel l) {
const char* names[] = { S_NONE, S_CONFIGURATION, S_ALL };
- assert(l >= RL_NONE);
- assert(l <= RL_ALL);
+ if (l > RL_ALL)
+ throw Exception(QPID_MSG("Invalid value for replication level: " << l));
return names[l];
}
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index af6180305d..91a4538bc4 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -87,10 +87,13 @@ ReplicatingSubscription::ReplicatingSubscription(
events(new Queue(mask(name))),
consumer(new DelegatingConsumer(*this))
{
+ // Separate the remote part from a "local-remote" address.
+ string address = parent->getSession().getConnection().getUrl();
+ size_t i = address.find('-');
+ if (i != string::npos) address = address.substr(i+1);
+ logPrefix = "HA: Primary ";
stringstream ss;
- ss << "HA: Primary: " << getQueue()->getName() << " at "
- << parent->getSession().getConnection().getUrl() << ": ";
- logPrefix = ss.str();
+ logSuffix = " (" + address + ")";
// FIXME aconway 2011-12-09: Failover optimization removed.
// There was code here to re-use messages already on the backup
@@ -99,7 +102,7 @@ ReplicatingSubscription::ReplicatingSubscription(
// can be re-introduced later. Last revision with the optimization:
// r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
- QPID_LOG(debug, logPrefix << "Created backup subscription " << getName());
+ QPID_LOG(debug, logPrefix << "created backup subscription " << getName() << logSuffix);
// FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0
// so we will start consuming from the lowest numbered message.
@@ -109,23 +112,36 @@ ReplicatingSubscription::ReplicatingSubscription(
// Message is delivered in the subscription's connection thread.
bool ReplicatingSubscription::deliver(QueuedMessage& m) {
- // Add position events for the subscribed queue, not for the internal event queue.
- if (m.queue && m.queue == getQueue().get()) {
- sys::Mutex::ScopedLock l(lock);
- assert(position == m.position);
- // m.position is the position of the newly enqueued m on the local queue.
- // backupPosition is latest position on the backup queue (before enqueueing m.)
- assert(m.position > backupPosition);
- if (m.position - backupPosition > 1) {
- // Position has advanced because of messages dequeued ahead of us.
- SequenceNumber send(m.position);
- --send; // Send the position before m was enqueued.
- sendPositionEvent(send, l);
+ try {
+ // Add position events for the subscribed queue, not for the internal event queue.
+ if (m.queue && m.queue == getQueue().get()) {
+ sys::Mutex::ScopedLock l(lock);
+ if (position != m.position)
+ throw Exception(
+ QPID_MSG("Expected position " << position
+ << " but got " << m.position));
+ // m.position is the position of the newly enqueued m on the local queue.
+ // backupPosition is latest position on the backup queue (before enqueueing m.)
+ if (m.position <= backupPosition)
+ throw Exception(
+ QPID_MSG("Expected position > " << backupPosition
+ << " but got " << m.position));
+
+ if (m.position - backupPosition > 1) {
+ // Position has advanced because of messages dequeued ahead of us.
+ SequenceNumber send(m.position);
+ --send; // Send the position before m was enqueued.
+ sendPositionEvent(send, l);
+ }
+ backupPosition = m.position;
+ QPID_LOG(trace, logPrefix << "replicating " << m << logSuffix);
}
- backupPosition = m.position;
- QPID_LOG(trace, logPrefix << "Replicating message " << m.position);
+ return ConsumerImpl::deliver(m);
+ } catch (const std::exception& e) {
+ QPID_LOG(critical, logPrefix << "error replicating " << getQueue()->getName()
+ << logSuffix << ": " << e.what());
+ throw;
}
- return ConsumerImpl::deliver(m);
}
ReplicatingSubscription::~ReplicatingSubscription() {}
@@ -139,7 +155,7 @@ void ReplicatingSubscription::complete(
{
// Handle completions for the subscribed queue, not the internal event queue.
if (qm.queue && qm.queue == getQueue().get()) {
- QPID_LOG(trace, logPrefix << "Completed message " << qm.position);
+ QPID_LOG(trace, logPrefix << "completed " << qm << logSuffix);
Delayed::iterator i= delayed.find(qm.position);
// The same message can be completed twice, by acknowledged and
// dequeued, remove it from the set so it only gets completed
@@ -157,7 +173,7 @@ void ReplicatingSubscription::complete(
void ReplicatingSubscription::enqueued(const QueuedMessage& qm) {
sys::Mutex::ScopedLock l(lock);
// Delay completion
- QPID_LOG(trace, logPrefix << "Delaying completion of message " << qm.position);
+ QPID_LOG(trace, logPrefix << "delaying completion of " << qm << logSuffix);
qm.payload->getIngressCompletion().startCompleter();
assert(delayed.find(qm.position) == delayed.end());
delayed[qm.position] = qm;
@@ -168,7 +184,7 @@ void ReplicatingSubscription::enqueued(const QueuedMessage& qm) {
void ReplicatingSubscription::cancelComplete(
const Delayed::value_type& v, const sys::Mutex::ScopedLock&)
{
- QPID_LOG(trace, logPrefix << "Cancel completed message " << v.second.position);
+ QPID_LOG(trace, logPrefix << "cancel completed " << v.second << logSuffix);
v.second.payload->getIngressCompletion().finishCompleter();
}
@@ -179,7 +195,7 @@ void ReplicatingSubscription::cancel()
boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
{
sys::Mutex::ScopedLock l(lock);
- QPID_LOG(debug, logPrefix <<"Cancelled backup subscription " << getName());
+ QPID_LOG(debug, logPrefix << "cancel backup subscription " << getName() << logSuffix);
for_each(delayed.begin(), delayed.end(),
boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l)));
delayed.clear();
@@ -201,7 +217,8 @@ bool ReplicatingSubscription::hideDeletedError() { return true; }
// Called with lock held. Called in subscription's connection thread.
void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l)
{
- QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
+ QPID_LOG(trace, logPrefix << "sending dequeues " << dequeues
+ << " from " << getQueue()->getName() << logSuffix);
string buf(dequeues.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
dequeues.encode(buffer);
@@ -216,7 +233,7 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
{
{
sys::Mutex::ScopedLock l(lock);
- QPID_LOG(trace, logPrefix << "Dequeued message " << qm.position);
+ QPID_LOG(trace, logPrefix << "dequeued " << qm << logSuffix);
dequeues.add(qm.position);
// If we have not yet sent this message to the backup, then
// complete it now as it will never be accepted.
@@ -229,8 +246,8 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
void ReplicatingSubscription::sendPositionEvent(
SequenceNumber position, const sys::Mutex::ScopedLock&l )
{
- QPID_LOG(trace, logPrefix << "Sending position " << position
- << ", was " << backupPosition);
+ QPID_LOG(trace, logPrefix << "sending position " << position
+ << ", was " << backupPosition << logSuffix);
string buf(backupPosition.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
position.encode(buffer);
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index cbcb230bc1..f9176915f6 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -94,7 +94,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
bool doDispatch();
private:
typedef std::map<framing::SequenceNumber, broker::QueuedMessage> Delayed;
- std::string logPrefix;
+ std::string logPrefix, logSuffix;
boost::shared_ptr<broker::Queue> events;
boost::shared_ptr<broker::Consumer> consumer;
Delayed delayed;
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 0c8ac569b8..f26f236efa 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -34,7 +34,7 @@ class HaBroker(Broker):
assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
args = copy(args)
args += ["--load-module", BrokerTest.ha_lib,
- "--log-enable=info+", "--log-enable=debug+:ha::",
+ "--log-enable=info+", "--log-enable=trace+:ha::", # FIXME aconway 2012-04-18: trace
# FIXME aconway 2012-02-13: workaround slow link failover.
"--link-maintenace-interval=0.1",
"--ha-cluster=%s"%ha_cluster]