diff options
-rw-r--r-- | cpp/src/cluster.mk | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 19 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Event.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Multicaster.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Multicaster.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/OutputInterceptor.cpp | 12 | ||||
-rw-r--r-- | cpp/src/qpid/sys/LatencyTracker.h | 157 |
7 files changed, 6 insertions, 201 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index d90a06e1e2..1a8812d169 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -82,8 +82,7 @@ cluster_la_SOURCES = \ qpid/cluster/PollerDispatch.h \ qpid/cluster/ProxyInputHandler.h \ qpid/cluster/Quorum.h \ - qpid/cluster/types.h \ - qpid/sys/LatencyTracker.h + qpid/cluster/types.h cluster_la_LIBADD= -lcpg $(libcman) libqpidbroker.la libqpidclient.la cluster_la_CXXFLAGS = $(AM_CXXFLAGS) -fno-strict-aliasing diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 60d580d6c7..d440068781 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -120,7 +120,6 @@ #include "qpid/management/ManagementAgent.h" #include "qpid/memory.h" #include "qpid/sys/Thread.h" -#include "qpid/sys/LatencyTracker.h" #include <boost/shared_ptr.hpp> #include <boost/bind.hpp> @@ -328,20 +327,14 @@ void Cluster::deliver( MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); Event e(Event::decodeCopy(from, buf)); - LATENCY_TRACK(if (e.getConnectionId().getMember() == self) mcast.cpgLatency.finish()); deliverEvent(e); } -LATENCY_TRACK(sys::LatencyTracker<const char*> eventQueueLatencyTracker("EventQueue");) - LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> frameQueueLatencyTracker("FrameQueue");) - - void Cluster::deliverEvent(const Event& e) { - LATENCY_TRACK(eventQueueLatencyTracker.start(e.getData());) - deliverEventQueue.push(e); +void Cluster::deliverEvent(const Event& e) { + deliverEventQueue.push(e); } void Cluster::deliverFrame(const EventFrame& e) { - LATENCY_TRACK(frameQueueLatencyTracker.start(e.frame.getBody())); deliverFrameQueue.push(e); } @@ -354,7 +347,6 @@ const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) { // Handler for deliverEventQueue. // This thread decodes frames from events. void Cluster::deliveredEvent(const Event& e) { - LATENCY_TRACK(eventQueueLatencyTracker.finish(e.getData())); if (e.isCluster()) { QPID_LOG(trace, *this << " DLVR: " << e); EventFrame ef(e, e.getFrame()); @@ -400,13 +392,9 @@ void Cluster::flagError( error.error(connection, type, map.getFrameSeq(), map.getMembers(), msg); } -LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> doOutputTracker("DoOutput");) - // Handler for deliverFrameQueue. // This thread executes the main logic. - void Cluster::deliveredFrame(const EventFrame& efConst) { - LATENCY_TRACK(frameQueueLatencyTracker.finish(e.frame.getBody())); - LATENCY_TRACK(if (e.frame.getBody()->type() == CONTENT_BODY) doOutputTracker.start(e.frame.getBody())); +void Cluster::deliveredFrame(const EventFrame& efConst) { Mutex::ScopedLock l(lock); if (state == LEFT) return; EventFrame e(efConst); @@ -438,7 +426,6 @@ void Cluster::processFrame(const EventFrame& e, Lock& l) { throw Exception(QPID_MSG("Invalid cluster control")); } else if (state >= CATCHUP) { - LATENCY_TRACK(LatencyScope ls(processLatency)); map.incrementFrameSeq(); ConnectionPtr connection = getConnection(e, l); if (connection) { diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp index 30866d3154..4831e7eabe 100644 --- a/cpp/src/qpid/cluster/Event.cpp +++ b/cpp/src/qpid/cluster/Event.cpp @@ -38,9 +38,6 @@ const size_t EventHeader::HEADER_SIZE = sizeof(uint8_t) + // type sizeof(uint64_t) + // connection pointer only, CPG provides member ID. sizeof(uint32_t) // payload size -#ifdef QPID_LATENCY_METRIC - + sizeof(int64_t) // timestamp -#endif ; EventHeader::EventHeader(EventType t, const ConnectionId& c, size_t s) @@ -61,9 +58,6 @@ void EventHeader::decode(const MemberId& m, framing::Buffer& buf) { throw Exception("Invalid multicast event type"); connectionId = ConnectionId(m, buf.getLongLong()); size = buf.getLong(); -#ifdef QPID_LATENCY_METRIC - latency_metric_timestamp = buf.getLongLong(); -#endif } Event Event::decodeCopy(const MemberId& m, framing::Buffer& buf) { @@ -97,9 +91,6 @@ void EventHeader::encode(Buffer& b) const { b.putOctet(type); b.putLongLong(connectionId.getNumber()); b.putLong(size); -#ifdef QPID_LATENCY_METRIC - b.putLongLong(latency_metric_timestamp); -#endif } // Encode my header in my buffer. diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp index 7e97963318..72fc1533f8 100644 --- a/cpp/src/qpid/cluster/Multicaster.cpp +++ b/cpp/src/qpid/cluster/Multicaster.cpp @@ -31,9 +31,6 @@ namespace cluster { Multicaster::Multicaster(Cpg& cpg_, const boost::shared_ptr<sys::Poller>& poller, boost::function<void()> onError_) : -#if defined (QPID_LATENCY_TRACKER) - cpgLatency("CPG"), -#endif onError(onError_), cpg(cpg_), queue(boost::bind(&Multicaster::sendMcast, this, _1), poller), holding(true) @@ -61,7 +58,6 @@ void Multicaster::mcastBuffer(const char* data, size_t size, const ConnectionId& void Multicaster::mcast(const Event& e) { { sys::Mutex::ScopedLock l(lock); - LATENCY_TRACK(cpgLatency.start()); if (e.isConnection() && holding) { holdingQueue.push_back(e); return; diff --git a/cpp/src/qpid/cluster/Multicaster.h b/cpp/src/qpid/cluster/Multicaster.h index f2ee5099bb..c1a0ddffc6 100644 --- a/cpp/src/qpid/cluster/Multicaster.h +++ b/cpp/src/qpid/cluster/Multicaster.h @@ -26,7 +26,6 @@ #include "qpid/cluster/Event.h" #include "qpid/sys/PollableQueue.h" #include "qpid/sys/Mutex.h" -#include "qpid/sys/LatencyTracker.h" #include <boost/shared_ptr.hpp> #include <deque> @@ -58,8 +57,6 @@ class Multicaster /** End holding mode, held events are mcast */ void release(); - LATENCY_TRACK(sys::LatencyCounter cpgLatency;) - private: typedef sys::PollableQueue<Event> PollableEventQueue; typedef std::deque<Event> PlainEventQueue; diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp index cb8f01386c..cb75fe5561 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -24,7 +24,6 @@ #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" #include "qpid/framing/AMQFrame.h" #include "qpid/log/Statement.h" -#include "qpid/sys/LatencyTracker.h" #include <boost/current_function.hpp> @@ -40,16 +39,9 @@ OutputInterceptor::OutputInterceptor(Connection& p, sys::ConnectionOutputHandler : parent(p), closing(false), next(&h), sendMax(1), sent(0), sentDoOutput(false) {} -#if defined QPID_LATENCY_TRACKER -extern sys::LatencyTracker<const AMQBody*> doOutputTracker; -#endif - void OutputInterceptor::send(framing::AMQFrame& f) { - LATENCY_TRACK(doOutputTracker.finish(f.getBody())); - { - sys::Mutex::ScopedLock l(lock); - next->send(f); - } + sys::Mutex::ScopedLock l(lock); + next->send(f); } void OutputInterceptor::activateOutput() { diff --git a/cpp/src/qpid/sys/LatencyTracker.h b/cpp/src/qpid/sys/LatencyTracker.h deleted file mode 100644 index 3294528ff6..0000000000 --- a/cpp/src/qpid/sys/LatencyTracker.h +++ /dev/null @@ -1,157 +0,0 @@ -#ifndef QPID_SYS_LATENCYTRACKER_H -#define QPID_SYS_LATENCYTRACKER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/Time.h" -#include <string> -#include <limits> -#include <map> - -namespace qpid { -namespace sys { - -/**@file Tools for measuring latency. NOT SUITABLE FOR PROUDCTION BUILDS. - * Uses should be compiled only if QPID_LATENCY_TRACKER is defined. - * See the convenience macros at the end of this file. - */ - -/** Used by LatencyCounter and LatencyTracker below */ -class LatencyStatistic { - public: - LatencyStatistic(std::string name_) : name(name_), count(0), total(0), min(std::numeric_limits<int64_t>::max()), max(0) {} - ~LatencyStatistic() { print(); } - - void record(Duration d) { - total += d; - ++count; - if (d > max) max=d; - if (d < min) min=d; - } - - void print() { - if (count) { - double meanMsec = (double(total)/count)/TIME_MSEC; - printf("\n==== Latency metric %s: samples=%lu mean=%fms (%f-%f)\n", name.c_str(), count, meanMsec, double(min)/TIME_MSEC, double(max)/TIME_MSEC); - } - else - printf("\n==== Latency metric %s: no samples.\n", name.c_str()); - } - - private: - std::string name; - unsigned long count; - int64_t total, min, max; -}; - -/** Measure delay between seeing the same value at start and finish. */ -template <class T> class LatencyTracker { - public: - LatencyTracker(std::string name) : measuring(false), stat(name) {} - - void start(T value) { - sys::Mutex::ScopedLock l(lock); - if (!measuring) { - measureAt = value; - measuring = true; - startTime = AbsTime::now(); - } - } - - void finish(T value) { - sys::Mutex::ScopedLock l(lock); - if(measuring && measureAt == value) { - stat.record(Duration(startTime, AbsTime::now())); - measuring = false; - } - } - - private: - sys::Mutex lock; - bool measuring; - T measureAt; - AbsTime startTime; - LatencyStatistic stat; -}; - - -/** Measures delay between the nth call to start and the nth call to finish. - * E.g. to measure latency between sending & receiving an ordered stream of messages. - */ -class LatencyCounter { - public: - LatencyCounter(std::string name) : measuring(false), startCount(0), finishCount(0), stat(name) {} - - void start() { - sys::Mutex::ScopedLock l(lock); - if (!measuring) { - measureAt = startCount; - measuring = true; - startTime = AbsTime::now(); - } - ++startCount; - } - - void finish() { - sys::Mutex::ScopedLock l(lock); - if (measuring && measureAt == finishCount) { - stat.record(Duration(startTime, AbsTime::now())); - measuring = false; - } - ++finishCount; - } - - private: - sys::Mutex lock; - bool measuring; - uint64_t startCount, finishCount, measureAt; - AbsTime startTime; - LatencyStatistic stat; -}; - -/** Measures time spent in a scope. */ -class LatencyScope { - public: - LatencyScope(LatencyStatistic& s) : stat(s), startTime(AbsTime::now()) {} - - ~LatencyScope() { - sys::Mutex::ScopedLock l(lock); - stat.record(Duration(startTime, AbsTime::now())); - } - - private: - sys::Mutex lock; - LatencyStatistic& stat; - AbsTime startTime; -}; - - -/** Macros to wrap latency tracking so disabled unless QPID_LATENCY_TRACKER is defined */ - -#if defined(QPID_LATENCY_TRACKER) -#define LATENCY_TRACK(X) X -#else -#define LATENCY_TRACK(X) -#endif -}} // namespace qpid::sys - -#endif /*!QPID_SYS_LATENCYTRACKER_H*/ |