summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-01-20 22:11:37 +0000
committerAlan Conway <aconway@apache.org>2009-01-20 22:11:37 +0000
commit066fd1ab9f1840cfe09204bc5f3d550f1e12d49b (patch)
tree51c9961e79c811c3240710bd7b7435a73e11c2b7 /cpp/src/qpid
parent861692abf515cf136e86e70446d005e7849a0f87 (diff)
downloadqpid-python-066fd1ab9f1840cfe09204bc5f3d550f1e12d49b.tar.gz
Latency measurements, compiled out of production code.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@736135 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp4
-rw-r--r--cpp/src/qpid/cluster/Event.cpp33
-rw-r--r--cpp/src/qpid/cluster/Event.h9
-rw-r--r--cpp/src/qpid/cluster/Multicaster.cpp10
-rw-r--r--cpp/src/qpid/cluster/Multicaster.h1
-rw-r--r--cpp/src/qpid/sys/LatencyMetric.cpp71
-rw-r--r--cpp/src/qpid/sys/LatencyMetric.h80
7 files changed, 192 insertions, 16 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 18b4a2e69c..0b6d56c259 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -38,6 +38,7 @@
#include "qpid/log/Statement.h"
#include "qpid/log/Helpers.h"
#include "qpid/sys/Thread.h"
+#include "qpid/sys/LatencyMetric.h"
#include "qpid/memory.h"
#include "qpid/shared_ptr.h"
#include "qmf/org/apache/qpid/cluster/Package.h"
@@ -182,7 +183,7 @@ void Cluster::deliver(
MemberId from(nodeid, pid);
framing::Buffer buf(static_cast<char*>(msg), msg_len);
Event e(Event::decodeCopy(from, buf));
- if (from == myId) // Record self-deliveries for flow control.
+ if (from == myId) // Record self-deliveries for flow control.
mcast.selfDeliver(e);
deliver(e, l);
}
@@ -206,6 +207,7 @@ void Cluster::delivered(PollableEventQueue::Queue& events) {
}
void Cluster::deliveredEvent(const EventHeader& e, const char* data) {
+ QPID_LATENCY_RECORD("deliver queue", e);
Buffer buf(const_cast<char*>(data), e.getSize());
AMQFrame frame;
if (e.isCluster()) {
diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp
index 8d4429a8ed..59a7241715 100644
--- a/cpp/src/qpid/cluster/Event.cpp
+++ b/cpp/src/qpid/cluster/Event.cpp
@@ -35,16 +35,21 @@ using framing::Buffer;
const size_t EventHeader::HEADER_SIZE =
sizeof(uint8_t) + // type
sizeof(uint64_t) + // connection pointer only, CPG provides member ID.
- sizeof(uint32_t); // payload size
+ sizeof(uint32_t) // payload size
+#ifdef QPID_LATENCY_METRIC
+ + sizeof(int64_t) // timestamp
+#endif
+ ;
EventHeader::EventHeader(EventType t, const ConnectionId& c, size_t s)
: type(t), connectionId(c), size(s) {}
+
+Event::Event() {}
+
Event::Event(EventType t, const ConnectionId& c, size_t s)
: EventHeader(t,c,s), store(RefCountedBuffer::create(s+HEADER_SIZE))
-{
- encodeHeader();
-}
+{}
void EventHeader::decode(const MemberId& m, framing::Buffer& buf) {
if (buf.available() <= HEADER_SIZE)
@@ -54,14 +59,17 @@ void EventHeader::decode(const MemberId& m, framing::Buffer& buf) {
throw ClusterLeaveException("Invalid multicast event type");
connectionId = ConnectionId(m, reinterpret_cast<Connection*>(buf.getLongLong()));
size = buf.getLong();
+#ifdef QPID_LATENCY_METRIC
+ latency_metric_timestamp = buf.getLongLong();
+#endif
}
Event Event::decodeCopy(const MemberId& m, framing::Buffer& buf) {
- EventHeader h;
- h.decode(m, buf); // Header
- Event e(h.getType(), h.getConnectionId(), h.getSize());
+ Event e;
+ e.decode(m, buf); // Header
if (buf.available() < e.size)
throw ClusterLeaveException("Not enough data for multicast event");
+ e.store = RefCountedBuffer::create(e.size + HEADER_SIZE);
memcpy(e.getData(), buf.getPointer() + buf.getPosition(), e.size);
return e;
}
@@ -73,11 +81,20 @@ Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) {
f.encode(buf);
return e;
}
-
+
+iovec Event::toIovec() {
+ encodeHeader();
+ iovec iov = { const_cast<char*>(getStore()), getStoreSize() };
+ return iov;
+}
+
void EventHeader::encode(Buffer& b) const {
b.putOctet(type);
b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer()));
b.putLong(size);
+#ifdef QPID_LATENCY_METRIC
+ b.putLongLong(latency_metric_timestamp);
+#endif
}
// Encode my header in my buffer.
diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h
index 32e8f5e07b..110ec524c7 100644
--- a/cpp/src/qpid/cluster/Event.h
+++ b/cpp/src/qpid/cluster/Event.h
@@ -27,6 +27,8 @@
#include "Connection.h"
#include "qpid/RefCountedBuffer.h"
#include "qpid/framing/Buffer.h"
+#include "qpid/sys/LatencyMetric.h"
+#include <sys/uio.h> // For iovec
#include <iosfwd>
namespace qpid {
@@ -37,7 +39,7 @@ namespace cluster {
//
/** Header data for a multicast event */
-class EventHeader {
+class EventHeader : public ::qpid::sys::LatencyMetricTimestamp {
public:
EventHeader(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0);
void decode(const MemberId& m, framing::Buffer&);
@@ -65,8 +67,9 @@ class EventHeader {
*/
class Event : public EventHeader {
public:
+ Event();
/** Create an event with a buffer that can hold size bytes plus an event header. */
- Event(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0);
+ Event(EventType t, const ConnectionId& c, size_t);
/** Create an event copied from delivered data. */
static Event decodeCopy(const MemberId& m, framing::Buffer&);
@@ -85,6 +88,8 @@ class Event : public EventHeader {
operator framing::Buffer() const;
+ iovec toIovec();
+
private:
void encodeHeader();
diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp
index 34614dc1ef..4fa12651eb 100644
--- a/cpp/src/qpid/cluster/Multicaster.cpp
+++ b/cpp/src/qpid/cluster/Multicaster.cpp
@@ -23,7 +23,7 @@
#include "Cpg.h"
#include "ClusterLeaveException.h"
#include "qpid/log/Statement.h"
-
+#include "qpid/sys/LatencyMetric.h"
namespace qpid {
namespace cluster {
@@ -59,8 +59,8 @@ void Multicaster::mcast(const Event& e) {
return;
}
}
+ QPID_LATENCY_INIT(e);
queue.push(e);
-
}
@@ -76,7 +76,8 @@ void Multicaster::sendMcast(PollableEventQueue::Queue& values) {
}
++pending;
}
- iovec iov = { const_cast<char*>(i->getStore()), i->getStoreSize() };
+ QPID_LATENCY_RECORD("mcast send queue", *i);
+ iovec iov = i->toIovec();
if (!cpg.mcast(&iov, 1)) {
// cpg didn't send because of CPG flow control.
if (mcastMax) {
@@ -104,8 +105,9 @@ void Multicaster::release() {
holdingQueue.clear();
}
-void Multicaster::selfDeliver(const Event&) {
+void Multicaster::selfDeliver(const Event& e) {
sys::Mutex::ScopedLock l(lock);
+ QPID_LATENCY_RECORD("cpg self deliver", e);
if (mcastMax) {
assert(pending > 0);
assert(pending <= mcastMax);
diff --git a/cpp/src/qpid/cluster/Multicaster.h b/cpp/src/qpid/cluster/Multicaster.h
index 8014cd8492..7e62134b96 100644
--- a/cpp/src/qpid/cluster/Multicaster.h
+++ b/cpp/src/qpid/cluster/Multicaster.h
@@ -27,7 +27,6 @@
#include "qpid/sys/PollableQueue.h"
#include "qpid/sys/Mutex.h"
#include <boost/shared_ptr.hpp>
-#include <sys/uio.h> // For iovec
namespace qpid {
diff --git a/cpp/src/qpid/sys/LatencyMetric.cpp b/cpp/src/qpid/sys/LatencyMetric.cpp
new file mode 100644
index 0000000000..93fd852d64
--- /dev/null
+++ b/cpp/src/qpid/sys/LatencyMetric.cpp
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifdef QPID_LATENCY_METRIC
+
+#include "LatencyMetric.h"
+#include "Time.h"
+#include <iostream>
+
+namespace qpid {
+namespace sys {
+
+void LatencyMetricTimestamp::initialize(const LatencyMetricTimestamp& ts) {
+ const_cast<int64_t&>(ts.latency_metric_timestamp) = Duration(now());
+}
+
+LatencyMetric::LatencyMetric(const char* msg, int64_t skip_) :
+ message(msg), count(0), total(0), skipped(0), skip(skip_)
+{}
+
+LatencyMetric::~LatencyMetric() { report(); }
+
+void LatencyMetric::record(const LatencyMetricTimestamp& start) {
+ Mutex::ScopedLock l(lock); // FIXME aconway 2009-01-20: atomics?
+ if (!start.latency_metric_timestamp) return; // Ignore 0 timestamps.
+ if (skip) {
+ if (++skipped < skip) return;
+ else skipped = 0;
+ }
+ ++count;
+ int64_t now_ = Duration(now());
+ total += now_ - start.latency_metric_timestamp;
+ // Set start time for next leg of the journey
+ const_cast<int64_t&>(start.latency_metric_timestamp) = now_;
+}
+
+void LatencyMetric::report() {
+ using namespace std;
+ if (count) {
+ cout << "LATENCY: " << message << ": "
+ << total / (count * TIME_USEC) << " microseconds" << endl;
+ }
+ else {
+ cout << "LATENCY: " << message << ": no data." << endl;
+ }
+ count = 0;
+ total = 0;
+}
+
+
+}} // namespace qpid::sys
+
+#endif
diff --git a/cpp/src/qpid/sys/LatencyMetric.h b/cpp/src/qpid/sys/LatencyMetric.h
new file mode 100644
index 0000000000..f2ab1ec5e1
--- /dev/null
+++ b/cpp/src/qpid/sys/LatencyMetric.h
@@ -0,0 +1,80 @@
+#ifndef QPID_SYS_LATENCYMETRIC_H
+#define QPID_SYS_LATENCYMETRIC_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifdef QPID_LATENCY_METRIC
+
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/sys/Mutex.h"
+
+namespace qpid {
+namespace sys {
+
+/** Use this base class to add a timestamp for latency to an object */
+struct LatencyMetricTimestamp {
+ LatencyMetricTimestamp() : latency_metric_timestamp(0) {}
+ static void initialize(const LatencyMetricTimestamp&);
+ int64_t latency_metric_timestamp;
+};
+
+/**
+ * Record average latencies, report on destruction.
+ *
+ * For debugging only, use via macros below so it can be compiled out
+ * of production code.
+ */
+class LatencyMetric {
+ public:
+ /** msg should be a string literal. */
+ LatencyMetric(const char* msg, int64_t skip_=0);
+ ~LatencyMetric();
+
+ void record(const LatencyMetricTimestamp& start);
+
+ private:
+ void report();
+ Mutex lock;
+ const char* message;
+ int64_t ignore, count, total, skipped, skip;
+};
+
+}} // namespace qpid::sys
+
+#define QPID_LATENCY_INIT(x) ::qpid::sys::LatencyMetricTimestamp::initialize(x)
+#define QPID_LATENCY_RECORD(msg, x) do { \
+ static ::qpid::sys::LatencyMetric metric__(msg); metric__.record(x); \
+ } while (false)
+
+
+#else /* defined QPID_LATENCY_METRIC */
+
+namespace qpid { namespace sys {
+class LatencyMetricTimestamp {};
+}}
+
+#define QPID_LATENCY_INIT(x) (void)x
+#define QPID_LATENCY_RECORD(msg, x) (void)x
+
+#endif /* defined QPID_LATENCY_METRIC */
+
+#endif /*!QPID_SYS_LATENCYMETRIC_H*/