diff options
author | Alan Conway <aconway@apache.org> | 2009-05-19 21:18:52 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-05-19 21:18:52 +0000 |
commit | 285ca60cf814ce4b96813e929ced910d53097aef (patch) | |
tree | 347254ab8c4b2c27d6a095ccdac7ed444ed993b0 /cpp | |
parent | fe0a36ba0edb47757a7bc7331764631ebd20205e (diff) | |
download | qpid-python-285ca60cf814ce4b96813e929ced910d53097aef.tar.gz |
Instrumentation for measuring latencies.
Compiled out of normal builds, enable with -DQPID_LATENCY_TRACKER.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@776463 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 21 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Multicaster.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Multicaster.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/OutputInterceptor.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/sys/LatencyTracker.h | 159 |
5 files changed, 132 insertions, 59 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 5f51bb9dad..58ac4b91b3 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -109,6 +109,7 @@ #include "qpid/framing/ClusterShutdownBody.h" #include "qpid/framing/ClusterUpdateOfferBody.h" #include "qpid/framing/ClusterUpdateRequestBody.h" +#include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Helpers.h" #include "qpid/log/Statement.h" #include "qpid/management/IdAllocator.h" @@ -294,23 +295,27 @@ void Cluster::deliver( MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); Event e(Event::decodeCopy(from, buf)); + LATENCY_TRACK(if (e.getConnectionId().getMember() == self) mcast.cpgLatency.finish()); deliverEvent(e); } +LATENCY_TRACK(sys::LatencyTracker<const char*> eventQueueLatencyTracker("EventQueue");) +LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> frameQueueLatencyTracker("FrameQueue");) + void Cluster::deliverEvent(const Event& e) { - LATENCY_START(Event, "enqueue event", e.getData()); + LATENCY_TRACK(eventQueueLatencyTracker.start(e.getData());) deliverEventQueue.push(e); } void Cluster::deliverFrame(const EventFrame& e) { - LATENCY_START(EventFrame, "enqueue frame", e.frame.getBody()); + LATENCY_TRACK(frameQueueLatencyTracker.start(e.frame.getBody())); deliverFrameQueue.push(e); } // Handler for deliverEventQueue. // This thread decodes frames from events. void Cluster::deliveredEvent(const Event& e) { - LATENCY_STAGE(Event, "dequeue event", e.getData()); + LATENCY_TRACK(eventQueueLatencyTracker.finish(e.getData())); QPID_LOG(trace, *this << " DLVR: " << e); if (e.isCluster()) { EventFrame ef(e, e.getFrame()); @@ -329,7 +334,6 @@ void Cluster::deliveredEvent(const Event& e) { } else // Discard connection events if discarding is set. QPID_LOG(trace, *this << " DROP: " << e); - LATENCY_END(Event, "processed event", e.getData()); } void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) { @@ -337,18 +341,22 @@ void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) { error.error(connection, type, map.getFrameSeq(), map.getMembers()); } +LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> doOutputTracker("DoOutput");) + // Handler for deliverFrameQueue. // This thread executes the main logic. void Cluster::deliveredFrame(const EventFrame& e) { - LATENCY_STAGE(EventFrame, "dequeued frame", e.frame.getBody()); + LATENCY_TRACK(frameQueueLatencyTracker.finish(e.frame.getBody())); + LATENCY_TRACK(if (e.frame.getBody()->type() == CONTENT_BODY) doOutputTracker.start(e.frame.getBody())); Mutex::ScopedLock l(lock); // Process each frame through the error checker. error.delivered(e); while (error.canProcess()) // There is a frame ready to process. processFrame(error.getNext(), l); - LATENCY_END(EventFrame, "processed frame", e.frame.getBody()); } +LATENCY_TRACK(sys::LatencyStatistic processLatency("Process");) + void Cluster::processFrame(const EventFrame& e, Lock& l) { if (e.isCluster()) { QPID_LOG(trace, *this << " DLVR: " << e); @@ -357,6 +365,7 @@ void Cluster::processFrame(const EventFrame& e, Lock& l) { throw Exception(QPID_MSG("Invalid cluster control")); } else if (state >= CATCHUP) { + LATENCY_TRACK(LatencyScope ls(processLatency)); map.incrementFrameSeq(); QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e); ConnectionPtr connection = getConnection(e.connectionId, l); diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp index 3b9d3ac990..f72867de4d 100644 --- a/cpp/src/qpid/cluster/Multicaster.cpp +++ b/cpp/src/qpid/cluster/Multicaster.cpp @@ -31,6 +31,9 @@ namespace cluster { Multicaster::Multicaster(Cpg& cpg_, const boost::shared_ptr<sys::Poller>& poller, boost::function<void()> onError_) : +#if defined (QPID_LATENCY_TRACKER) + cpgLatency("CPG"), +#endif onError(onError_), cpg(cpg_), queue(boost::bind(&Multicaster::sendMcast, this, _1), poller), holding(true) @@ -58,6 +61,7 @@ void Multicaster::mcastBuffer(const char* data, size_t size, const ConnectionId& void Multicaster::mcast(const Event& e) { { sys::Mutex::ScopedLock l(lock); + LATENCY_TRACK(cpgLatency.start()); if (e.getType() == DATA && e.isConnection() && holding) { holdingQueue.push_back(e); return; diff --git a/cpp/src/qpid/cluster/Multicaster.h b/cpp/src/qpid/cluster/Multicaster.h index baa5b87f38..e1014fa499 100644 --- a/cpp/src/qpid/cluster/Multicaster.h +++ b/cpp/src/qpid/cluster/Multicaster.h @@ -26,6 +26,7 @@ #include "Event.h" #include "qpid/sys/PollableQueue.h" #include "qpid/sys/Mutex.h" +#include "qpid/sys/LatencyTracker.h" #include <boost/shared_ptr.hpp> namespace qpid { @@ -56,6 +57,8 @@ class Multicaster /** End holding mode, held events are mcast */ void release(); + LATENCY_TRACK(sys::LatencyCounter cpgLatency;) + private: typedef sys::PollableQueue<Event> PollableEventQueue; typedef std::deque<Event> PlainEventQueue; diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp index 9062edc846..a7ec82128b 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -24,6 +24,7 @@ #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" #include "qpid/framing/AMQFrame.h" #include "qpid/log/Statement.h" +#include "qpid/sys/LatencyTracker.h" #include <boost/current_function.hpp> @@ -41,7 +42,10 @@ OutputInterceptor::OutputInterceptor(Connection& p, sys::ConnectionOutputHandler moreOutput(), doingOutput() {} +LATENCY_TRACK(extern sys::LatencyTracker<const AMQBody*> doOutputTracker;) + void OutputInterceptor::send(framing::AMQFrame& f) { + LATENCY_TRACK(doOutputTracker.finish(f.getBody())); parent.getCluster().checkQuorum(); { // FIXME aconway 2009-04-28: locking around next-> may be redundant diff --git a/cpp/src/qpid/sys/LatencyTracker.h b/cpp/src/qpid/sys/LatencyTracker.h index 3e36525b90..fe423ec228 100644 --- a/cpp/src/qpid/sys/LatencyTracker.h +++ b/cpp/src/qpid/sys/LatencyTracker.h @@ -23,82 +23,135 @@ */ #include "Time.h" +#include <string> +#include <limits> +#include <map> namespace qpid { namespace sys { -/** - * Record latency between events in the lifecycle of an object. - * For testing/debugging purposes: use the macros to declare - * and #define QPID_LATENCY_TRACKER to enable in a build. +/**@file Tools for measuring latency. NOT SUITABLE FOR PROUDCTION BUILDS. + * Uses should be compiled only if QPID_LATENCY_TRACKER is defined. + * See the convenience macros at the end of this file. */ -template <class T> class LatencyTracker -{ - public: - static void start(const char* name, const void* p) { instance.doStart(name, p); } - static void stage(const char* name, const void* p) { instance.doStage(name, p); } - static void end(const char* name, const void* p) { instance.doEnd(name, p); } - private: - - LatencyTracker() : object(), times(), totals(), count(), names(), index(), maxIndex() { } - ~LatencyTracker() { print(); } - - void doStart(const char* n, const void* p) { if (!object) { name(n); object=p; times[0] = now(); index = 1; } } - void doStage(const char* n, const void* p) { if (p == object) { name(n); times[index++] = now(); } } - void doEnd(const char* n, const void* p) { if (p == object) { name(n); times[index++] = now(); record(); object = 0; } } +/** Used by LatencyCounter and LatencyTracker below */ +class LatencyStatistic { + public: + LatencyStatistic(std::string name_) : name(name_), count(0), total(0), min(std::numeric_limits<int64_t>::max()), max(0) {} + ~LatencyStatistic() { print(); } - void name(const char* n) { - if (names[index] == 0) names[index] = n; - assert(names[index] == n); - } - - void record() { - if (maxIndex == 0) maxIndex = index; - assert(maxIndex == index); - for (int i = 0; i < index-1; ++i) - totals[i] += Duration(times[i], times[i+1]); + void record(Duration d) { + total += d; ++count; + if (d > max) max=d; + if (d < min) min=d; } void print() { - printf("\nLatency from %s (%lu samples, %d stages) :\n", names[0], count, maxIndex-1); - for (int i = 0; i < maxIndex-1; ++i) - printf("to %s:\t%luus\n", names[i+1], (totals[i]/count)/TIME_USEC); + if (count) { + double meanMsec = (double(total)/count)/TIME_MSEC; + printf("\n==== Latency metric %s: samples=%lu mean=%fms (%f-%f)\n", name.c_str(), count, meanMsec, double(min)/TIME_MSEC, double(max)/TIME_MSEC); + } + else + printf("\n==== Latency metric %s: no samples.\n", name.c_str()); } - static const int SIZE = 1024; - const void* object; - AbsTime times[SIZE]; - unsigned long totals[SIZE]; + private: + std::string name; unsigned long count; - const char* names[SIZE]; - int index, maxIndex; + int64_t total, min, max; +}; + +/** Measure delay between seeing the same value at start and finish. */ +template <class T> class LatencyTracker { + public: + LatencyTracker(std::string name) : measuring(false), stat(name) {} + + void start(T value) { + sys::Mutex::ScopedLock l(lock); + if (!measuring) { + measureAt = value; + measuring = true; + startTime = AbsTime::now(); + } + } + + void finish(T value) { + sys::Mutex::ScopedLock l(lock); + if(measuring && measureAt == value) { + stat.record(Duration(startTime, AbsTime::now())); + measuring = false; + } + } - static LatencyTracker instance; + private: + sys::Mutex lock; + bool measuring; + T measureAt; + AbsTime startTime; + LatencyStatistic stat; }; -template <class T> struct LatencyEndOnExit { - const char* name; - const void* ptr; - LatencyEndOnExit(const char* n, const void* p) : name(n), ptr(p) {} - ~LatencyEndOnExit() { LatencyTracker<T>::end(name, ptr); } + +/** Measures delay between the nth call to start and the nth call to finish. + * E.g. to measure latency between sending & receiving an ordered stream of messages. + */ +class LatencyCounter { + public: + LatencyCounter(std::string name) : measuring(false), startCount(0), finishCount(0), stat(name) {} + + void start() { + sys::Mutex::ScopedLock l(lock); + if (!measuring) { + measureAt = startCount; + measuring = true; + startTime = AbsTime::now(); + } + ++startCount; + } + + void finish() { + sys::Mutex::ScopedLock l(lock); + if (measuring && measureAt == finishCount) { + stat.record(Duration(startTime, AbsTime::now())); + measuring = false; + } + ++finishCount; + } + + private: + sys::Mutex lock; + bool measuring; + uint64_t startCount, finishCount, measureAt; + AbsTime startTime; + LatencyStatistic stat; +}; + +/** Measures time spent in a scope. */ +class LatencyScope { + public: + LatencyScope(LatencyStatistic& s) : stat(s), startTime(AbsTime::now()) {} + + ~LatencyScope() { + sys::Mutex::ScopedLock l(lock); + stat.record(Duration(startTime, AbsTime::now())); + } + + private: + sys::Mutex lock; + LatencyStatistic& stat; + AbsTime startTime; }; -template <class T> LatencyTracker<T> LatencyTracker<T>::instance; + +/** Macros to wrap latency tracking so disabled unless QPID_LATENCY_TRACKER is defined */ #if defined(QPID_LATENCY_TRACKER) -#define LATENCY_START(TAG, NAME, PTR) ::qpid::sys::LatencyTracker<TAG>::start(NAME, PTR) -#define LATENCY_STAGE(TAG, NAME, PTR) ::qpid::sys::LatencyTracker<TAG>::stage(NAME, PTR) -#define LATENCY_END(TAG, NAME, PTR) ::qpid::sys::LatencyTracker<TAG>::end(NAME, PTR) -#define LATENCY_END_ON_EXIT(TAG, NAME, PTR) ::qpid::sys::LatencyEndOnExit<TAG>(NAME, PTR) +#define LATENCY_TRACK(X) X #else -#define LATENCY_START(TAG, NAME, PTR) void(0) -#define LATENCY_STAGE(TAG, NAME, PTR) void(0) -#define LATENCY_END(TAG, NAME, PTR) void(0) -#define LATENCY_END_ON_EXIT(TAG, NAME, PTR) void(0) +#define LATENCY_TRACK(X) #endif - }} // namespace qpid::sys #endif /*!QPID_SYS_LATENCYTRACKER_H*/ |