diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 21 |
1 files changed, 15 insertions, 6 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 5f51bb9dad..58ac4b91b3 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -109,6 +109,7 @@ #include "qpid/framing/ClusterShutdownBody.h" #include "qpid/framing/ClusterUpdateOfferBody.h" #include "qpid/framing/ClusterUpdateRequestBody.h" +#include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Helpers.h" #include "qpid/log/Statement.h" #include "qpid/management/IdAllocator.h" @@ -294,23 +295,27 @@ void Cluster::deliver( MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); Event e(Event::decodeCopy(from, buf)); + LATENCY_TRACK(if (e.getConnectionId().getMember() == self) mcast.cpgLatency.finish()); deliverEvent(e); } +LATENCY_TRACK(sys::LatencyTracker<const char*> eventQueueLatencyTracker("EventQueue");) +LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> frameQueueLatencyTracker("FrameQueue");) + void Cluster::deliverEvent(const Event& e) { - LATENCY_START(Event, "enqueue event", e.getData()); + LATENCY_TRACK(eventQueueLatencyTracker.start(e.getData());) deliverEventQueue.push(e); } void Cluster::deliverFrame(const EventFrame& e) { - LATENCY_START(EventFrame, "enqueue frame", e.frame.getBody()); + LATENCY_TRACK(frameQueueLatencyTracker.start(e.frame.getBody())); deliverFrameQueue.push(e); } // Handler for deliverEventQueue. // This thread decodes frames from events. void Cluster::deliveredEvent(const Event& e) { - LATENCY_STAGE(Event, "dequeue event", e.getData()); + LATENCY_TRACK(eventQueueLatencyTracker.finish(e.getData())); QPID_LOG(trace, *this << " DLVR: " << e); if (e.isCluster()) { EventFrame ef(e, e.getFrame()); @@ -329,7 +334,6 @@ void Cluster::deliveredEvent(const Event& e) { } else // Discard connection events if discarding is set. QPID_LOG(trace, *this << " DROP: " << e); - LATENCY_END(Event, "processed event", e.getData()); } void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) { @@ -337,18 +341,22 @@ void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) { error.error(connection, type, map.getFrameSeq(), map.getMembers()); } +LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> doOutputTracker("DoOutput");) + // Handler for deliverFrameQueue. // This thread executes the main logic. void Cluster::deliveredFrame(const EventFrame& e) { - LATENCY_STAGE(EventFrame, "dequeued frame", e.frame.getBody()); + LATENCY_TRACK(frameQueueLatencyTracker.finish(e.frame.getBody())); + LATENCY_TRACK(if (e.frame.getBody()->type() == CONTENT_BODY) doOutputTracker.start(e.frame.getBody())); Mutex::ScopedLock l(lock); // Process each frame through the error checker. error.delivered(e); while (error.canProcess()) // There is a frame ready to process. processFrame(error.getNext(), l); - LATENCY_END(EventFrame, "processed frame", e.frame.getBody()); } +LATENCY_TRACK(sys::LatencyStatistic processLatency("Process");) + void Cluster::processFrame(const EventFrame& e, Lock& l) { if (e.isCluster()) { QPID_LOG(trace, *this << " DLVR: " << e); @@ -357,6 +365,7 @@ void Cluster::processFrame(const EventFrame& e, Lock& l) { throw Exception(QPID_MSG("Invalid cluster control")); } else if (state >= CATCHUP) { + LATENCY_TRACK(LatencyScope ls(processLatency)); map.incrementFrameSeq(); QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e); ConnectionPtr connection = getConnection(e.connectionId, l); |