summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-05-04 18:38:18 +0000
committerAlan Conway <aconway@apache.org>2009-05-04 18:38:18 +0000
commit7536c5e15901106f0e33a949e9945713a4b5a1ff (patch)
treede31698a8f812b3c2857fb862a974fb314e26df2 /qpid/cpp
parent1187f8b963d7a2cb6db0bcfe3855e85f8c4ff641 (diff)
downloadqpid-python-7536c5e15901106f0e33a949e9945713a4b5a1ff.tar.gz
LatenchTracker: a tool for measuring latencies.
Added measurement points to cluster code. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@771392 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/cluster.mk3
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp21
-rw-r--r--qpid/cpp/src/qpid/sys/LatencyTracker.h104
3 files changed, 120 insertions, 8 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk
index b7ee61e180..965c335306 100644
--- a/qpid/cpp/src/cluster.mk
+++ b/qpid/cpp/src/cluster.mk
@@ -78,7 +78,8 @@ cluster_la_SOURCES = \
qpid/cluster/PollerDispatch.h \
qpid/cluster/ProxyInputHandler.h \
qpid/cluster/Quorum.h \
- qpid/cluster/types.h
+ qpid/cluster/types.h \
+ qpid/sys/LatencyTracker.h
cluster_la_LIBADD= -lcpg $(libcman) libqpidbroker.la libqpidclient.la
cluster_la_LDFLAGS = $(PLUGINLDFLAGS)
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 221f12990e..09e053dc28 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -48,6 +48,7 @@
#include "qpid/memory.h"
#include "qpid/shared_ptr.h"
#include "qpid/sys/Thread.h"
+#include "qpid/sys/LatencyTracker.h"
#include <boost/bind.hpp>
#include <boost/cast.hpp>
@@ -196,8 +197,8 @@ void Cluster::leave() {
leave(l);
}
-#define LEAVE_TRY(STMT) try { STMT; } \
- catch (const std::exception& e) { \
+#define LEAVE_TRY(STMT) try { STMT; } \
+ catch (const std::exception& e) { \
QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \
} do {} while(0)
@@ -228,17 +229,20 @@ void Cluster::deliver(
}
void Cluster::deliverEvent(const Event& e) {
+ LATENCY_START(Event, "enqueue event", e.getData());
deliverEventQueue.push(e);
}
void Cluster::deliverFrame(const EventFrame& e) {
+ LATENCY_START(EventFrame, "enqueue frame", e.frame.getBody());
deliverFrameQueue.push(e);
}
// Handler for deliverEventQueue.
// This thread decodes frames from events.
void Cluster::deliveredEvent(const Event& e) {
- QPID_LOG(trace, *this << " DLVR: " << e);
+ LATENCY_STAGE(Event, "dequeue event", e.getData());
+ QPID_LOG(trace, *this << " DLVR: " << e);
if (e.isCluster()) {
EventFrame ef(e, e.getFrame());
// Stop the deliverEventQueue on update offers.
@@ -253,9 +257,10 @@ void Cluster::deliveredEvent(const Event& e) {
deliverFrame(EventFrame(e, e.getFrame()));
else
decoder.decode(e, e.getData());
-}
+ }
else // Discard connection events if discarding is set.
QPID_LOG(trace, *this << " DROP: " << e);
+ LATENCY_END(Event, "processed event", e.getData());
}
void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) {
@@ -266,11 +271,13 @@ void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) {
// Handler for deliverFrameQueue.
// This thread executes the main logic.
void Cluster::deliveredFrame(const EventFrame& e) {
+ LATENCY_STAGE(EventFrame, "dequeued frame", e.frame.getBody());
Mutex::ScopedLock l(lock);
// Process each frame through the error checker.
error.delivered(e);
while (error.canProcess()) // There is a frame ready to process.
processFrame(error.getNext(), l);
+ LATENCY_END(EventFrame, "processed frame", e.frame.getBody());
}
void Cluster::processFrame(const EventFrame& e, Lock& l) {
@@ -543,7 +550,7 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, s
Lock l(lock);
QPID_LOG(debug, *this << " managementMethod [id=" << methodId << "]");
switch (methodId) {
- case _qmf::Cluster::METHOD_STOPCLUSTERNODE :
+ case _qmf::Cluster::METHOD_STOPCLUSTERNODE :
{
_qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args;
stringstream stream;
@@ -552,10 +559,10 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, s
stopClusterNode(l);
}
break;
- case _qmf::Cluster::METHOD_STOPFULLCLUSTER :
+ case _qmf::Cluster::METHOD_STOPFULLCLUSTER :
stopFullCluster(l);
break;
- default:
+ default:
return Manageable::STATUS_UNKNOWN_METHOD;
}
return Manageable::STATUS_OK;
diff --git a/qpid/cpp/src/qpid/sys/LatencyTracker.h b/qpid/cpp/src/qpid/sys/LatencyTracker.h
new file mode 100644
index 0000000000..3e36525b90
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/LatencyTracker.h
@@ -0,0 +1,104 @@
+#ifndef QPID_SYS_LATENCYTRACKER_H
+#define QPID_SYS_LATENCYTRACKER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Time.h"
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Record latency between events in the lifecycle of an object.
+ * For testing/debugging purposes: use the macros to declare
+ * and #define QPID_LATENCY_TRACKER to enable in a build.
+ */
+template <class T> class LatencyTracker
+{
+ public:
+ static void start(const char* name, const void* p) { instance.doStart(name, p); }
+ static void stage(const char* name, const void* p) { instance.doStage(name, p); }
+ static void end(const char* name, const void* p) { instance.doEnd(name, p); }
+
+ private:
+
+ LatencyTracker() : object(), times(), totals(), count(), names(), index(), maxIndex() { }
+ ~LatencyTracker() { print(); }
+
+ void doStart(const char* n, const void* p) { if (!object) { name(n); object=p; times[0] = now(); index = 1; } }
+ void doStage(const char* n, const void* p) { if (p == object) { name(n); times[index++] = now(); } }
+ void doEnd(const char* n, const void* p) { if (p == object) { name(n); times[index++] = now(); record(); object = 0; } }
+
+ void name(const char* n) {
+ if (names[index] == 0) names[index] = n;
+ assert(names[index] == n);
+ }
+
+ void record() {
+ if (maxIndex == 0) maxIndex = index;
+ assert(maxIndex == index);
+ for (int i = 0; i < index-1; ++i)
+ totals[i] += Duration(times[i], times[i+1]);
+ ++count;
+ }
+
+ void print() {
+ printf("\nLatency from %s (%lu samples, %d stages) :\n", names[0], count, maxIndex-1);
+ for (int i = 0; i < maxIndex-1; ++i)
+ printf("to %s:\t%luus\n", names[i+1], (totals[i]/count)/TIME_USEC);
+ }
+
+ static const int SIZE = 1024;
+ const void* object;
+ AbsTime times[SIZE];
+ unsigned long totals[SIZE];
+ unsigned long count;
+ const char* names[SIZE];
+ int index, maxIndex;
+
+ static LatencyTracker instance;
+};
+
+template <class T> struct LatencyEndOnExit {
+ const char* name;
+ const void* ptr;
+ LatencyEndOnExit(const char* n, const void* p) : name(n), ptr(p) {}
+ ~LatencyEndOnExit() { LatencyTracker<T>::end(name, ptr); }
+};
+
+template <class T> LatencyTracker<T> LatencyTracker<T>::instance;
+
+#if defined(QPID_LATENCY_TRACKER)
+#define LATENCY_START(TAG, NAME, PTR) ::qpid::sys::LatencyTracker<TAG>::start(NAME, PTR)
+#define LATENCY_STAGE(TAG, NAME, PTR) ::qpid::sys::LatencyTracker<TAG>::stage(NAME, PTR)
+#define LATENCY_END(TAG, NAME, PTR) ::qpid::sys::LatencyTracker<TAG>::end(NAME, PTR)
+#define LATENCY_END_ON_EXIT(TAG, NAME, PTR) ::qpid::sys::LatencyEndOnExit<TAG>(NAME, PTR)
+#else
+#define LATENCY_START(TAG, NAME, PTR) void(0)
+#define LATENCY_STAGE(TAG, NAME, PTR) void(0)
+#define LATENCY_END(TAG, NAME, PTR) void(0)
+#define LATENCY_END_ON_EXIT(TAG, NAME, PTR) void(0)
+#endif
+
+}} // namespace qpid::sys
+
+#endif /*!QPID_SYS_LATENCYTRACKER_H*/