summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp21
1 files changed, 15 insertions, 6 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 5f51bb9dad..58ac4b91b3 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -109,6 +109,7 @@
#include "qpid/framing/ClusterShutdownBody.h"
#include "qpid/framing/ClusterUpdateOfferBody.h"
#include "qpid/framing/ClusterUpdateRequestBody.h"
+#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Helpers.h"
#include "qpid/log/Statement.h"
#include "qpid/management/IdAllocator.h"
@@ -294,23 +295,27 @@ void Cluster::deliver(
MemberId from(nodeid, pid);
framing::Buffer buf(static_cast<char*>(msg), msg_len);
Event e(Event::decodeCopy(from, buf));
+ LATENCY_TRACK(if (e.getConnectionId().getMember() == self) mcast.cpgLatency.finish());
deliverEvent(e);
}
+LATENCY_TRACK(sys::LatencyTracker<const char*> eventQueueLatencyTracker("EventQueue");)
+LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> frameQueueLatencyTracker("FrameQueue");)
+
void Cluster::deliverEvent(const Event& e) {
- LATENCY_START(Event, "enqueue event", e.getData());
+ LATENCY_TRACK(eventQueueLatencyTracker.start(e.getData());)
deliverEventQueue.push(e);
}
void Cluster::deliverFrame(const EventFrame& e) {
- LATENCY_START(EventFrame, "enqueue frame", e.frame.getBody());
+ LATENCY_TRACK(frameQueueLatencyTracker.start(e.frame.getBody()));
deliverFrameQueue.push(e);
}
// Handler for deliverEventQueue.
// This thread decodes frames from events.
void Cluster::deliveredEvent(const Event& e) {
- LATENCY_STAGE(Event, "dequeue event", e.getData());
+ LATENCY_TRACK(eventQueueLatencyTracker.finish(e.getData()));
QPID_LOG(trace, *this << " DLVR: " << e);
if (e.isCluster()) {
EventFrame ef(e, e.getFrame());
@@ -329,7 +334,6 @@ void Cluster::deliveredEvent(const Event& e) {
}
else // Discard connection events if discarding is set.
QPID_LOG(trace, *this << " DROP: " << e);
- LATENCY_END(Event, "processed event", e.getData());
}
void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) {
@@ -337,18 +341,22 @@ void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) {
error.error(connection, type, map.getFrameSeq(), map.getMembers());
}
+LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> doOutputTracker("DoOutput");)
+
// Handler for deliverFrameQueue.
// This thread executes the main logic.
void Cluster::deliveredFrame(const EventFrame& e) {
- LATENCY_STAGE(EventFrame, "dequeued frame", e.frame.getBody());
+ LATENCY_TRACK(frameQueueLatencyTracker.finish(e.frame.getBody()));
+ LATENCY_TRACK(if (e.frame.getBody()->type() == CONTENT_BODY) doOutputTracker.start(e.frame.getBody()));
Mutex::ScopedLock l(lock);
// Process each frame through the error checker.
error.delivered(e);
while (error.canProcess()) // There is a frame ready to process.
processFrame(error.getNext(), l);
- LATENCY_END(EventFrame, "processed frame", e.frame.getBody());
}
+LATENCY_TRACK(sys::LatencyStatistic processLatency("Process");)
+
void Cluster::processFrame(const EventFrame& e, Lock& l) {
if (e.isCluster()) {
QPID_LOG(trace, *this << " DLVR: " << e);
@@ -357,6 +365,7 @@ void Cluster::processFrame(const EventFrame& e, Lock& l) {
throw Exception(QPID_MSG("Invalid cluster control"));
}
else if (state >= CATCHUP) {
+ LATENCY_TRACK(LatencyScope ls(processLatency));
map.incrementFrameSeq();
QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e);
ConnectionPtr connection = getConnection(e.connectionId, l);