summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/cluster.mk3
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp19
-rw-r--r--cpp/src/qpid/cluster/Event.cpp9
-rw-r--r--cpp/src/qpid/cluster/Multicaster.cpp4
-rw-r--r--cpp/src/qpid/cluster/Multicaster.h3
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.cpp12
-rw-r--r--cpp/src/qpid/sys/LatencyTracker.h157
7 files changed, 6 insertions, 201 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk
index d90a06e1e2..1a8812d169 100644
--- a/cpp/src/cluster.mk
+++ b/cpp/src/cluster.mk
@@ -82,8 +82,7 @@ cluster_la_SOURCES = \
qpid/cluster/PollerDispatch.h \
qpid/cluster/ProxyInputHandler.h \
qpid/cluster/Quorum.h \
- qpid/cluster/types.h \
- qpid/sys/LatencyTracker.h
+ qpid/cluster/types.h
cluster_la_LIBADD= -lcpg $(libcman) libqpidbroker.la libqpidclient.la
cluster_la_CXXFLAGS = $(AM_CXXFLAGS) -fno-strict-aliasing
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 60d580d6c7..d440068781 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -120,7 +120,6 @@
#include "qpid/management/ManagementAgent.h"
#include "qpid/memory.h"
#include "qpid/sys/Thread.h"
-#include "qpid/sys/LatencyTracker.h"
#include <boost/shared_ptr.hpp>
#include <boost/bind.hpp>
@@ -328,20 +327,14 @@ void Cluster::deliver(
MemberId from(nodeid, pid);
framing::Buffer buf(static_cast<char*>(msg), msg_len);
Event e(Event::decodeCopy(from, buf));
- LATENCY_TRACK(if (e.getConnectionId().getMember() == self) mcast.cpgLatency.finish());
deliverEvent(e);
}
-LATENCY_TRACK(sys::LatencyTracker<const char*> eventQueueLatencyTracker("EventQueue");)
- LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> frameQueueLatencyTracker("FrameQueue");)
-
- void Cluster::deliverEvent(const Event& e) {
- LATENCY_TRACK(eventQueueLatencyTracker.start(e.getData());)
- deliverEventQueue.push(e);
+void Cluster::deliverEvent(const Event& e) {
+ deliverEventQueue.push(e);
}
void Cluster::deliverFrame(const EventFrame& e) {
- LATENCY_TRACK(frameQueueLatencyTracker.start(e.frame.getBody()));
deliverFrameQueue.push(e);
}
@@ -354,7 +347,6 @@ const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) {
// Handler for deliverEventQueue.
// This thread decodes frames from events.
void Cluster::deliveredEvent(const Event& e) {
- LATENCY_TRACK(eventQueueLatencyTracker.finish(e.getData()));
if (e.isCluster()) {
QPID_LOG(trace, *this << " DLVR: " << e);
EventFrame ef(e, e.getFrame());
@@ -400,13 +392,9 @@ void Cluster::flagError(
error.error(connection, type, map.getFrameSeq(), map.getMembers(), msg);
}
-LATENCY_TRACK(sys::LatencyTracker<const AMQBody*> doOutputTracker("DoOutput");)
-
// Handler for deliverFrameQueue.
// This thread executes the main logic.
- void Cluster::deliveredFrame(const EventFrame& efConst) {
- LATENCY_TRACK(frameQueueLatencyTracker.finish(e.frame.getBody()));
- LATENCY_TRACK(if (e.frame.getBody()->type() == CONTENT_BODY) doOutputTracker.start(e.frame.getBody()));
+void Cluster::deliveredFrame(const EventFrame& efConst) {
Mutex::ScopedLock l(lock);
if (state == LEFT) return;
EventFrame e(efConst);
@@ -438,7 +426,6 @@ void Cluster::processFrame(const EventFrame& e, Lock& l) {
throw Exception(QPID_MSG("Invalid cluster control"));
}
else if (state >= CATCHUP) {
- LATENCY_TRACK(LatencyScope ls(processLatency));
map.incrementFrameSeq();
ConnectionPtr connection = getConnection(e, l);
if (connection) {
diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp
index 30866d3154..4831e7eabe 100644
--- a/cpp/src/qpid/cluster/Event.cpp
+++ b/cpp/src/qpid/cluster/Event.cpp
@@ -38,9 +38,6 @@ const size_t EventHeader::HEADER_SIZE =
sizeof(uint8_t) + // type
sizeof(uint64_t) + // connection pointer only, CPG provides member ID.
sizeof(uint32_t) // payload size
-#ifdef QPID_LATENCY_METRIC
- + sizeof(int64_t) // timestamp
-#endif
;
EventHeader::EventHeader(EventType t, const ConnectionId& c, size_t s)
@@ -61,9 +58,6 @@ void EventHeader::decode(const MemberId& m, framing::Buffer& buf) {
throw Exception("Invalid multicast event type");
connectionId = ConnectionId(m, buf.getLongLong());
size = buf.getLong();
-#ifdef QPID_LATENCY_METRIC
- latency_metric_timestamp = buf.getLongLong();
-#endif
}
Event Event::decodeCopy(const MemberId& m, framing::Buffer& buf) {
@@ -97,9 +91,6 @@ void EventHeader::encode(Buffer& b) const {
b.putOctet(type);
b.putLongLong(connectionId.getNumber());
b.putLong(size);
-#ifdef QPID_LATENCY_METRIC
- b.putLongLong(latency_metric_timestamp);
-#endif
}
// Encode my header in my buffer.
diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp
index 7e97963318..72fc1533f8 100644
--- a/cpp/src/qpid/cluster/Multicaster.cpp
+++ b/cpp/src/qpid/cluster/Multicaster.cpp
@@ -31,9 +31,6 @@ namespace cluster {
Multicaster::Multicaster(Cpg& cpg_,
const boost::shared_ptr<sys::Poller>& poller,
boost::function<void()> onError_) :
-#if defined (QPID_LATENCY_TRACKER)
- cpgLatency("CPG"),
-#endif
onError(onError_), cpg(cpg_),
queue(boost::bind(&Multicaster::sendMcast, this, _1), poller),
holding(true)
@@ -61,7 +58,6 @@ void Multicaster::mcastBuffer(const char* data, size_t size, const ConnectionId&
void Multicaster::mcast(const Event& e) {
{
sys::Mutex::ScopedLock l(lock);
- LATENCY_TRACK(cpgLatency.start());
if (e.isConnection() && holding) {
holdingQueue.push_back(e);
return;
diff --git a/cpp/src/qpid/cluster/Multicaster.h b/cpp/src/qpid/cluster/Multicaster.h
index f2ee5099bb..c1a0ddffc6 100644
--- a/cpp/src/qpid/cluster/Multicaster.h
+++ b/cpp/src/qpid/cluster/Multicaster.h
@@ -26,7 +26,6 @@
#include "qpid/cluster/Event.h"
#include "qpid/sys/PollableQueue.h"
#include "qpid/sys/Mutex.h"
-#include "qpid/sys/LatencyTracker.h"
#include <boost/shared_ptr.hpp>
#include <deque>
@@ -58,8 +57,6 @@ class Multicaster
/** End holding mode, held events are mcast */
void release();
- LATENCY_TRACK(sys::LatencyCounter cpgLatency;)
-
private:
typedef sys::PollableQueue<Event> PollableEventQueue;
typedef std::deque<Event> PlainEventQueue;
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp
index cb8f01386c..cb75fe5561 100644
--- a/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -24,7 +24,6 @@
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/log/Statement.h"
-#include "qpid/sys/LatencyTracker.h"
#include <boost/current_function.hpp>
@@ -40,16 +39,9 @@ OutputInterceptor::OutputInterceptor(Connection& p, sys::ConnectionOutputHandler
: parent(p), closing(false), next(&h), sendMax(1), sent(0), sentDoOutput(false)
{}
-#if defined QPID_LATENCY_TRACKER
-extern sys::LatencyTracker<const AMQBody*> doOutputTracker;
-#endif
-
void OutputInterceptor::send(framing::AMQFrame& f) {
- LATENCY_TRACK(doOutputTracker.finish(f.getBody()));
- {
- sys::Mutex::ScopedLock l(lock);
- next->send(f);
- }
+ sys::Mutex::ScopedLock l(lock);
+ next->send(f);
}
void OutputInterceptor::activateOutput() {
diff --git a/cpp/src/qpid/sys/LatencyTracker.h b/cpp/src/qpid/sys/LatencyTracker.h
deleted file mode 100644
index 3294528ff6..0000000000
--- a/cpp/src/qpid/sys/LatencyTracker.h
+++ /dev/null
@@ -1,157 +0,0 @@
-#ifndef QPID_SYS_LATENCYTRACKER_H
-#define QPID_SYS_LATENCYTRACKER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/Time.h"
-#include <string>
-#include <limits>
-#include <map>
-
-namespace qpid {
-namespace sys {
-
-/**@file Tools for measuring latency. NOT SUITABLE FOR PROUDCTION BUILDS.
- * Uses should be compiled only if QPID_LATENCY_TRACKER is defined.
- * See the convenience macros at the end of this file.
- */
-
-/** Used by LatencyCounter and LatencyTracker below */
-class LatencyStatistic {
- public:
- LatencyStatistic(std::string name_) : name(name_), count(0), total(0), min(std::numeric_limits<int64_t>::max()), max(0) {}
- ~LatencyStatistic() { print(); }
-
- void record(Duration d) {
- total += d;
- ++count;
- if (d > max) max=d;
- if (d < min) min=d;
- }
-
- void print() {
- if (count) {
- double meanMsec = (double(total)/count)/TIME_MSEC;
- printf("\n==== Latency metric %s: samples=%lu mean=%fms (%f-%f)\n", name.c_str(), count, meanMsec, double(min)/TIME_MSEC, double(max)/TIME_MSEC);
- }
- else
- printf("\n==== Latency metric %s: no samples.\n", name.c_str());
- }
-
- private:
- std::string name;
- unsigned long count;
- int64_t total, min, max;
-};
-
-/** Measure delay between seeing the same value at start and finish. */
-template <class T> class LatencyTracker {
- public:
- LatencyTracker(std::string name) : measuring(false), stat(name) {}
-
- void start(T value) {
- sys::Mutex::ScopedLock l(lock);
- if (!measuring) {
- measureAt = value;
- measuring = true;
- startTime = AbsTime::now();
- }
- }
-
- void finish(T value) {
- sys::Mutex::ScopedLock l(lock);
- if(measuring && measureAt == value) {
- stat.record(Duration(startTime, AbsTime::now()));
- measuring = false;
- }
- }
-
- private:
- sys::Mutex lock;
- bool measuring;
- T measureAt;
- AbsTime startTime;
- LatencyStatistic stat;
-};
-
-
-/** Measures delay between the nth call to start and the nth call to finish.
- * E.g. to measure latency between sending & receiving an ordered stream of messages.
- */
-class LatencyCounter {
- public:
- LatencyCounter(std::string name) : measuring(false), startCount(0), finishCount(0), stat(name) {}
-
- void start() {
- sys::Mutex::ScopedLock l(lock);
- if (!measuring) {
- measureAt = startCount;
- measuring = true;
- startTime = AbsTime::now();
- }
- ++startCount;
- }
-
- void finish() {
- sys::Mutex::ScopedLock l(lock);
- if (measuring && measureAt == finishCount) {
- stat.record(Duration(startTime, AbsTime::now()));
- measuring = false;
- }
- ++finishCount;
- }
-
- private:
- sys::Mutex lock;
- bool measuring;
- uint64_t startCount, finishCount, measureAt;
- AbsTime startTime;
- LatencyStatistic stat;
-};
-
-/** Measures time spent in a scope. */
-class LatencyScope {
- public:
- LatencyScope(LatencyStatistic& s) : stat(s), startTime(AbsTime::now()) {}
-
- ~LatencyScope() {
- sys::Mutex::ScopedLock l(lock);
- stat.record(Duration(startTime, AbsTime::now()));
- }
-
- private:
- sys::Mutex lock;
- LatencyStatistic& stat;
- AbsTime startTime;
-};
-
-
-/** Macros to wrap latency tracking so disabled unless QPID_LATENCY_TRACKER is defined */
-
-#if defined(QPID_LATENCY_TRACKER)
-#define LATENCY_TRACK(X) X
-#else
-#define LATENCY_TRACK(X)
-#endif
-}} // namespace qpid::sys
-
-#endif /*!QPID_SYS_LATENCYTRACKER_H*/