summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-05-19 21:18:52 +0000
committerAlan Conway <aconway@apache.org>2009-05-19 21:18:52 +0000
commit285ca60cf814ce4b96813e929ced910d53097aef (patch)
tree347254ab8c4b2c27d6a095ccdac7ed444ed993b0 /cpp/src/qpid/cluster/Cluster.cpp
parentfe0a36ba0edb47757a7bc7331764631ebd20205e (diff)
downloadqpid-python-285ca60cf814ce4b96813e929ced910d53097aef.tar.gz
Instrumentation for measuring latencies.
Compiled out of normal builds, enable with -DQPID_LATENCY_TRACKER. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@776463 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp21
1 files changed, 15 insertions, 6 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 5f51bb9dad..58ac4b91b3 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -109,6 +109,7 @@
#include "qpid/framing/ClusterShutdownBody.h"
#include "qpid/framing/ClusterUpdateOfferBody.h"
#include "qpid/framing/ClusterUpdateRequestBody.h"
+#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Helpers.h"
#include "qpid/log/Statement.h"
#include "qpid/management/IdAllocator.h"
@@ -294,23 +295,27 @@ void Cluster::deliver(
MemberId from(nodeid, pid);
framing::Buffer buf(static_cast<char*>(msg), msg_len);
Event e(Event::decodeCopy(from, buf));
+ LATENCY_TRACK(if (e.getConnectionId().getMember() == self) mcast.cpgLatency.finish());
deliverEvent(e);
}
+LATENCY_TRACK(sys::LatencyTracker<const char*> eventQueueLatencyTracker("EventQueue");)
+LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> frameQueueLatencyTracker("FrameQueue");)
+
void Cluster::deliverEvent(const Event& e) {
- LATENCY_START(Event, "enqueue event", e.getData());
+ LATENCY_TRACK(eventQueueLatencyTracker.start(e.getData());)
deliverEventQueue.push(e);
}
void Cluster::deliverFrame(const EventFrame& e) {
- LATENCY_START(EventFrame, "enqueue frame", e.frame.getBody());
+ LATENCY_TRACK(frameQueueLatencyTracker.start(e.frame.getBody()));
deliverFrameQueue.push(e);
}
// Handler for deliverEventQueue.
// This thread decodes frames from events.
void Cluster::deliveredEvent(const Event& e) {
- LATENCY_STAGE(Event, "dequeue event", e.getData());
+ LATENCY_TRACK(eventQueueLatencyTracker.finish(e.getData()));
QPID_LOG(trace, *this << " DLVR: " << e);
if (e.isCluster()) {
EventFrame ef(e, e.getFrame());
@@ -329,7 +334,6 @@ void Cluster::deliveredEvent(const Event& e) {
}
else // Discard connection events if discarding is set.
QPID_LOG(trace, *this << " DROP: " << e);
- LATENCY_END(Event, "processed event", e.getData());
}
void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) {
@@ -337,18 +341,22 @@ void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) {
error.error(connection, type, map.getFrameSeq(), map.getMembers());
}
+LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> doOutputTracker("DoOutput");)
+
// Handler for deliverFrameQueue.
// This thread executes the main logic.
void Cluster::deliveredFrame(const EventFrame& e) {
- LATENCY_STAGE(EventFrame, "dequeued frame", e.frame.getBody());
+ LATENCY_TRACK(frameQueueLatencyTracker.finish(e.frame.getBody()));
+ LATENCY_TRACK(if (e.frame.getBody()->type() == CONTENT_BODY) doOutputTracker.start(e.frame.getBody()));
Mutex::ScopedLock l(lock);
// Process each frame through the error checker.
error.delivered(e);
while (error.canProcess()) // There is a frame ready to process.
processFrame(error.getNext(), l);
- LATENCY_END(EventFrame, "processed frame", e.frame.getBody());
}
+LATENCY_TRACK(sys::LatencyStatistic processLatency("Process");)
+
void Cluster::processFrame(const EventFrame& e, Lock& l) {
if (e.isCluster()) {
QPID_LOG(trace, *this << " DLVR: " << e);
@@ -357,6 +365,7 @@ void Cluster::processFrame(const EventFrame& e, Lock& l) {
throw Exception(QPID_MSG("Invalid cluster control"));
}
else if (state >= CATCHUP) {
+ LATENCY_TRACK(LatencyScope ls(processLatency));
map.incrementFrameSeq();
QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e);
ConnectionPtr connection = getConnection(e.connectionId, l);