diff options
author | Alan Conway <aconway@apache.org> | 2009-05-04 18:38:18 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-05-04 18:38:18 +0000 |
commit | 7536c5e15901106f0e33a949e9945713a4b5a1ff (patch) | |
tree | de31698a8f812b3c2857fb862a974fb314e26df2 /qpid/cpp | |
parent | 1187f8b963d7a2cb6db0bcfe3855e85f8c4ff641 (diff) | |
download | qpid-python-7536c5e15901106f0e33a949e9945713a4b5a1ff.tar.gz |
LatenchTracker: a tool for measuring latencies.
Added measurement points to cluster code.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@771392 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r-- | qpid/cpp/src/cluster.mk | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 21 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/LatencyTracker.h | 104 |
3 files changed, 120 insertions, 8 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index b7ee61e180..965c335306 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -78,7 +78,8 @@ cluster_la_SOURCES = \ qpid/cluster/PollerDispatch.h \ qpid/cluster/ProxyInputHandler.h \ qpid/cluster/Quorum.h \ - qpid/cluster/types.h + qpid/cluster/types.h \ + qpid/sys/LatencyTracker.h cluster_la_LIBADD= -lcpg $(libcman) libqpidbroker.la libqpidclient.la cluster_la_LDFLAGS = $(PLUGINLDFLAGS) diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 221f12990e..09e053dc28 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -48,6 +48,7 @@ #include "qpid/memory.h" #include "qpid/shared_ptr.h" #include "qpid/sys/Thread.h" +#include "qpid/sys/LatencyTracker.h" #include <boost/bind.hpp> #include <boost/cast.hpp> @@ -196,8 +197,8 @@ void Cluster::leave() { leave(l); } -#define LEAVE_TRY(STMT) try { STMT; } \ - catch (const std::exception& e) { \ +#define LEAVE_TRY(STMT) try { STMT; } \ + catch (const std::exception& e) { \ QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \ } do {} while(0) @@ -228,17 +229,20 @@ void Cluster::deliver( } void Cluster::deliverEvent(const Event& e) { + LATENCY_START(Event, "enqueue event", e.getData()); deliverEventQueue.push(e); } void Cluster::deliverFrame(const EventFrame& e) { + LATENCY_START(EventFrame, "enqueue frame", e.frame.getBody()); deliverFrameQueue.push(e); } // Handler for deliverEventQueue. // This thread decodes frames from events. void Cluster::deliveredEvent(const Event& e) { - QPID_LOG(trace, *this << " DLVR: " << e); + LATENCY_STAGE(Event, "dequeue event", e.getData()); + QPID_LOG(trace, *this << " DLVR: " << e); if (e.isCluster()) { EventFrame ef(e, e.getFrame()); // Stop the deliverEventQueue on update offers. @@ -253,9 +257,10 @@ void Cluster::deliveredEvent(const Event& e) { deliverFrame(EventFrame(e, e.getFrame())); else decoder.decode(e, e.getData()); -} + } else // Discard connection events if discarding is set. QPID_LOG(trace, *this << " DROP: " << e); + LATENCY_END(Event, "processed event", e.getData()); } void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) { @@ -266,11 +271,13 @@ void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) { // Handler for deliverFrameQueue. // This thread executes the main logic. void Cluster::deliveredFrame(const EventFrame& e) { + LATENCY_STAGE(EventFrame, "dequeued frame", e.frame.getBody()); Mutex::ScopedLock l(lock); // Process each frame through the error checker. error.delivered(e); while (error.canProcess()) // There is a frame ready to process. processFrame(error.getNext(), l); + LATENCY_END(EventFrame, "processed frame", e.frame.getBody()); } void Cluster::processFrame(const EventFrame& e, Lock& l) { @@ -543,7 +550,7 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, s Lock l(lock); QPID_LOG(debug, *this << " managementMethod [id=" << methodId << "]"); switch (methodId) { - case _qmf::Cluster::METHOD_STOPCLUSTERNODE : + case _qmf::Cluster::METHOD_STOPCLUSTERNODE : { _qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args; stringstream stream; @@ -552,10 +559,10 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, s stopClusterNode(l); } break; - case _qmf::Cluster::METHOD_STOPFULLCLUSTER : + case _qmf::Cluster::METHOD_STOPFULLCLUSTER : stopFullCluster(l); break; - default: + default: return Manageable::STATUS_UNKNOWN_METHOD; } return Manageable::STATUS_OK; diff --git a/qpid/cpp/src/qpid/sys/LatencyTracker.h b/qpid/cpp/src/qpid/sys/LatencyTracker.h new file mode 100644 index 0000000000..3e36525b90 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/LatencyTracker.h @@ -0,0 +1,104 @@ +#ifndef QPID_SYS_LATENCYTRACKER_H +#define QPID_SYS_LATENCYTRACKER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Time.h" + +namespace qpid { +namespace sys { + +/** + * Record latency between events in the lifecycle of an object. + * For testing/debugging purposes: use the macros to declare + * and #define QPID_LATENCY_TRACKER to enable in a build. + */ +template <class T> class LatencyTracker +{ + public: + static void start(const char* name, const void* p) { instance.doStart(name, p); } + static void stage(const char* name, const void* p) { instance.doStage(name, p); } + static void end(const char* name, const void* p) { instance.doEnd(name, p); } + + private: + + LatencyTracker() : object(), times(), totals(), count(), names(), index(), maxIndex() { } + ~LatencyTracker() { print(); } + + void doStart(const char* n, const void* p) { if (!object) { name(n); object=p; times[0] = now(); index = 1; } } + void doStage(const char* n, const void* p) { if (p == object) { name(n); times[index++] = now(); } } + void doEnd(const char* n, const void* p) { if (p == object) { name(n); times[index++] = now(); record(); object = 0; } } + + void name(const char* n) { + if (names[index] == 0) names[index] = n; + assert(names[index] == n); + } + + void record() { + if (maxIndex == 0) maxIndex = index; + assert(maxIndex == index); + for (int i = 0; i < index-1; ++i) + totals[i] += Duration(times[i], times[i+1]); + ++count; + } + + void print() { + printf("\nLatency from %s (%lu samples, %d stages) :\n", names[0], count, maxIndex-1); + for (int i = 0; i < maxIndex-1; ++i) + printf("to %s:\t%luus\n", names[i+1], (totals[i]/count)/TIME_USEC); + } + + static const int SIZE = 1024; + const void* object; + AbsTime times[SIZE]; + unsigned long totals[SIZE]; + unsigned long count; + const char* names[SIZE]; + int index, maxIndex; + + static LatencyTracker instance; +}; + +template <class T> struct LatencyEndOnExit { + const char* name; + const void* ptr; + LatencyEndOnExit(const char* n, const void* p) : name(n), ptr(p) {} + ~LatencyEndOnExit() { LatencyTracker<T>::end(name, ptr); } +}; + +template <class T> LatencyTracker<T> LatencyTracker<T>::instance; + +#if defined(QPID_LATENCY_TRACKER) +#define LATENCY_START(TAG, NAME, PTR) ::qpid::sys::LatencyTracker<TAG>::start(NAME, PTR) +#define LATENCY_STAGE(TAG, NAME, PTR) ::qpid::sys::LatencyTracker<TAG>::stage(NAME, PTR) +#define LATENCY_END(TAG, NAME, PTR) ::qpid::sys::LatencyTracker<TAG>::end(NAME, PTR) +#define LATENCY_END_ON_EXIT(TAG, NAME, PTR) ::qpid::sys::LatencyEndOnExit<TAG>(NAME, PTR) +#else +#define LATENCY_START(TAG, NAME, PTR) void(0) +#define LATENCY_STAGE(TAG, NAME, PTR) void(0) +#define LATENCY_END(TAG, NAME, PTR) void(0) +#define LATENCY_END_ON_EXIT(TAG, NAME, PTR) void(0) +#endif + +}} // namespace qpid::sys + +#endif /*!QPID_SYS_LATENCYTRACKER_H*/ |