summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-10-13 15:39:01 +0000
committerAlan Conway <aconway@apache.org>2011-10-13 15:39:01 +0000
commit574665b57db5d55a33c8f89d8b2d612e3f146d29 (patch)
tree0a2cf633a72abf5cfdb440df50463d339f7a6393
parentc5bb7d3a8a0b3eab29c936cef71952c5927f4f94 (diff)
downloadqpid-python-574665b57db5d55a33c8f89d8b2d612e3f146d29.tar.gz
QPID-2920: Clean up cluster log messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1182914 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp14
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp14
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp11
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp11
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp2
5 files changed, 27 insertions, 25 deletions
diff --git a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
index f570a8fe2b..ed02d35d73 100644
--- a/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
@@ -65,14 +65,14 @@ void EventHandler::deliver(
// FIXME aconway 2011-09-29: don't decode own frame bodies. Ignore based on channel.
while (buf.available()) {
frame.decode(buf);
- QPID_LOG(trace, "cluster deliver on " << cpg.getName() << " from "<< PrettyId(sender, self) << ": " << frame);
+ QPID_LOG(trace, "cluster: deliver on " << cpg.getName()
+ << " from "<< PrettyId(sender, self) << ": " << frame);
try {
handle(frame);
} catch (const std::exception& e) {
- // Note: exceptions are assumed to be survivable,
- // fatal errors should log a message and call Core::fatal.
- QPID_LOG(error, e.what());
- // FIXME aconway 2011-09-29: error handling
+ // Non-fatal error. Our state isn't compromized by receiving bad frames.
+ QPID_LOG(error, "cluster: error in deliver on " << cpg.getName()
+ << " from " << PrettyId(sender, self) << ": " << frame);
}
}
}
@@ -105,8 +105,8 @@ void EventHandler::configChange (
const cpg_address *joined, int nJoined)
{
QPID_LOG(notice, "cluster: new membership: " << PrintAddrs(members, nMembers));
- QPID_LOG_IF(notice, nLeft, "cluster: members left: " << PrintAddrs(left, nLeft));
- QPID_LOG_IF(notice, nJoined, "cluster: members joined: " << PrintAddrs(joined, nJoined));
+ QPID_LOG_IF(notice, nLeft, "cluster: left: " << PrintAddrs(left, nLeft));
+ QPID_LOG_IF(notice, nJoined, "cluster: joined: " << PrintAddrs(joined, nJoined));
for (Handlers::iterator i = handlers.begin(); i != handlers.end(); ++i) {
for (int l = 0; l < nLeft; ++l) (*i)->left(left[l]);
for (int j = 0; j < nJoined; ++j) (*i)->joined(joined[j]);
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
index 770b5c0ee3..bef8cb74ed 100644
--- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
@@ -88,7 +88,7 @@ void MessageHandler::enqueue(const std::string& q, uint16_t channel) {
// We only need to build message from other brokers, our own messages
// are held by the MessageHolder.
if (sender() != self()) {
- boost::shared_ptr<Queue> queue = findQueue(q, "Cluster enqueue");
+ boost::shared_ptr<Queue> queue = findQueue(q, "cluster: enqueue");
messageBuilders.announce(sender(), channel, queue);
}
}
@@ -97,16 +97,16 @@ void MessageHandler::enqueue(const std::string& q, uint16_t channel) {
// and scan queue once.
void MessageHandler::acquire(const std::string& q, uint32_t position) {
// FIXME aconway 2011-09-15: systematic logging across cluster module.
- QPID_LOG(trace, "cluster message " << q << "[" << position
+ QPID_LOG(trace, "cluster: message " << q << "[" << position
<< "] acquired by " << PrettyId(sender(), self()));
// Note acquires from other members. My own acquires were executed in
// the connection thread
if (sender() != self()) {
- boost::shared_ptr<Queue> queue = findQueue(q, "Cluster acquire");
+ boost::shared_ptr<Queue> queue = findQueue(q, "cluster: acquire");
QueuedMessage qm;
BrokerContext::ScopedSuppressReplication ssr;
if (!queue->acquireMessageAt(position, qm))
- throw Exception(QPID_MSG("Cluster acquire: message not found: "
+ throw Exception(QPID_MSG("cluster: acquire: message not found: "
<< q << "[" << position << "]"));
assert(qm.position.getValue() == position);
assert(qm.payload);
@@ -119,7 +119,7 @@ void MessageHandler::acquire(const std::string& q, uint32_t position) {
void MessageHandler::dequeue(const std::string& q, uint32_t position) {
// FIXME aconway 2011-09-15: systematic logging across cluster module.
- QPID_LOG(trace, "cluster message " << q << "[" << position
+ QPID_LOG(trace, "cluster: message " << q << "[" << position
<< "] dequeued by " << PrettyId(sender(), self()));
// FIXME aconway 2010-10-28: for local dequeues, we should
@@ -128,7 +128,7 @@ void MessageHandler::dequeue(const std::string& q, uint32_t position) {
// My own dequeues were processed in the connection thread before multicasting.
if (sender() != self()) {
- boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue");
+ boost::shared_ptr<Queue> queue = findQueue(q, "cluster: dequeue");
QueuedMessage qm = QueueContext::get(*queue)->dequeue(position);
BrokerContext::ScopedSuppressReplication ssr;
if (qm.queue) queue->dequeue(0, qm);
@@ -137,7 +137,7 @@ void MessageHandler::dequeue(const std::string& q, uint32_t position) {
void MessageHandler::requeue(const std::string& q, uint32_t position, bool redelivered) {
if (sender() != self()) {
- boost::shared_ptr<Queue> queue = findQueue(q, "Cluster requeue");
+ boost::shared_ptr<Queue> queue = findQueue(q, "cluster: requeue");
QueueContext::get(*queue)->requeue(position, redelivered);
}
}
diff --git a/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp b/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp
index 84b2145691..3a56ab7aff 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp
@@ -51,11 +51,11 @@ Multicaster::Multicaster(Cpg& cpg_,
queue.start();
}
-void Multicaster::mcast(const framing::AMQFrame& data) {
- QPID_LOG(trace, "cluster multicast on " << cpg.getName() << ": " << data);
- BufferRef bufRef = buffers.get(data.encodedSize());
+void Multicaster::mcast(const framing::AMQFrame& frame) {
+ QPID_LOG(trace, "cluster: multicast on " << cpg.getName() << ": " << frame);
+ BufferRef bufRef = buffers.get(frame.encodedSize());
framing::Buffer buf(bufRef.begin(), bufRef.size());
- data.encode(buf);
+ frame.encode(buf);
queue.push(bufRef);
}
@@ -82,7 +82,8 @@ Multicaster::sendMcast(const PollableEventQueue::Batch& buffers) {
return i;
}
catch (const std::exception& e) {
- QPID_LOG(critical, "Multicast error: " << e.what());
+ QPID_LOG(critical, "cluster: multicast error on " << cpg.getName()
+ << ": " << e.what());
queue.stop();
onError();
return buffers.end();
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
index 65a0711a84..ece3e0f2c1 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
@@ -67,7 +67,7 @@ void QueueContext::replicaState(
// Interested in state changes and my own events which lead to
// ownership.
if ((before != after || selfDelivered) && isOwner(after)) {
- QPID_LOG(trace, "cluster start consumers on " << queue.getName() << ", timer "
+ QPID_LOG(trace, "cluster: start consumers on " << queue.getName() << ", timer "
<< (after==SHARED_OWNER? "start" : "stop"));
queue.startConsumers();
if (after == SHARED_OWNER) timer.start();
@@ -94,7 +94,7 @@ void QueueContext::cancel(size_t n) {
consumers = n;
// When consuming threads are stopped, this->stopped will be called.
if (n == 0) {
- QPID_LOG(trace, "cluster stop consumers and timer on " << queue.getName());
+ QPID_LOG(trace, "cluster: all consumers canceled on " << queue.getName());
timer.stop();
queue.stopConsumers();
}
@@ -103,7 +103,7 @@ void QueueContext::cancel(size_t n) {
// Called in timer thread.
void QueueContext::timeout() {
// When all threads have stopped, queue will call stopped()
- QPID_LOG(trace, "cluster timeout, stopping consumers on " << queue.getName());
+ QPID_LOG(trace, "cluster: lock timeout on " << queue.getName());
queue.stopConsumers();
}
@@ -111,8 +111,9 @@ void QueueContext::timeout() {
// Called when no threads are dispatching from the queue.
void QueueContext::stopped() {
sys::Mutex::ScopedLock l(lock);
- QPID_LOG(trace, "cluster timeout, stopped consumers on " << queue.getName()
- << (consumers == 0 ? " unsubscribed" : " resubscribe"));
+ QPID_LOG(trace, "cluster: stopped consumers, "
+ << (consumers == 0 ? "unsubscribe" : "resubscribe")
+ << " to " << queue.getName());
if (consumers == 0)
mcast.mcast(framing::ClusterQueueUnsubscribeBody(
framing::ProtocolVersion(), queue.getName()));
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
index 41b76f23a3..5a3c16c00c 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
@@ -74,7 +74,7 @@ void QueueReplica::resubscribe(const MemberId& member) {
void QueueReplica::update(QueueOwnership before, MemberId member) {
QueueOwnership after = getState();
- QPID_LOG(trace, "cluster queue replica: " << queue->getName() << ": "
+ QPID_LOG(trace, "cluster: queue replica: " << queue->getName() << ": "
<< before << "->" << after << " [" << PrintSubscribers(subscribers, self) << "]");
context->replicaState(before, after, member == self);
}