diff options
author | Alan Conway <aconway@apache.org> | 2009-01-20 22:11:37 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-01-20 22:11:37 +0000 |
commit | 066fd1ab9f1840cfe09204bc5f3d550f1e12d49b (patch) | |
tree | 51c9961e79c811c3240710bd7b7435a73e11c2b7 /cpp/src/qpid | |
parent | 861692abf515cf136e86e70446d005e7849a0f87 (diff) | |
download | qpid-python-066fd1ab9f1840cfe09204bc5f3d550f1e12d49b.tar.gz |
Latency measurements, compiled out of production code.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@736135 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Event.cpp | 33 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Event.h | 9 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Multicaster.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Multicaster.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/sys/LatencyMetric.cpp | 71 | ||||
-rw-r--r-- | cpp/src/qpid/sys/LatencyMetric.h | 80 |
7 files changed, 192 insertions, 16 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 18b4a2e69c..0b6d56c259 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -38,6 +38,7 @@ #include "qpid/log/Statement.h" #include "qpid/log/Helpers.h" #include "qpid/sys/Thread.h" +#include "qpid/sys/LatencyMetric.h" #include "qpid/memory.h" #include "qpid/shared_ptr.h" #include "qmf/org/apache/qpid/cluster/Package.h" @@ -182,7 +183,7 @@ void Cluster::deliver( MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); Event e(Event::decodeCopy(from, buf)); - if (from == myId) // Record self-deliveries for flow control. + if (from == myId) // Record self-deliveries for flow control. mcast.selfDeliver(e); deliver(e, l); } @@ -206,6 +207,7 @@ void Cluster::delivered(PollableEventQueue::Queue& events) { } void Cluster::deliveredEvent(const EventHeader& e, const char* data) { + QPID_LATENCY_RECORD("deliver queue", e); Buffer buf(const_cast<char*>(data), e.getSize()); AMQFrame frame; if (e.isCluster()) { diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp index 8d4429a8ed..59a7241715 100644 --- a/cpp/src/qpid/cluster/Event.cpp +++ b/cpp/src/qpid/cluster/Event.cpp @@ -35,16 +35,21 @@ using framing::Buffer; const size_t EventHeader::HEADER_SIZE = sizeof(uint8_t) + // type sizeof(uint64_t) + // connection pointer only, CPG provides member ID. - sizeof(uint32_t); // payload size + sizeof(uint32_t) // payload size +#ifdef QPID_LATENCY_METRIC + + sizeof(int64_t) // timestamp +#endif + ; EventHeader::EventHeader(EventType t, const ConnectionId& c, size_t s) : type(t), connectionId(c), size(s) {} + +Event::Event() {} + Event::Event(EventType t, const ConnectionId& c, size_t s) : EventHeader(t,c,s), store(RefCountedBuffer::create(s+HEADER_SIZE)) -{ - encodeHeader(); -} +{} void EventHeader::decode(const MemberId& m, framing::Buffer& buf) { if (buf.available() <= HEADER_SIZE) @@ -54,14 +59,17 @@ void EventHeader::decode(const MemberId& m, framing::Buffer& buf) { throw ClusterLeaveException("Invalid multicast event type"); connectionId = ConnectionId(m, reinterpret_cast<Connection*>(buf.getLongLong())); size = buf.getLong(); +#ifdef QPID_LATENCY_METRIC + latency_metric_timestamp = buf.getLongLong(); +#endif } Event Event::decodeCopy(const MemberId& m, framing::Buffer& buf) { - EventHeader h; - h.decode(m, buf); // Header - Event e(h.getType(), h.getConnectionId(), h.getSize()); + Event e; + e.decode(m, buf); // Header if (buf.available() < e.size) throw ClusterLeaveException("Not enough data for multicast event"); + e.store = RefCountedBuffer::create(e.size + HEADER_SIZE); memcpy(e.getData(), buf.getPointer() + buf.getPosition(), e.size); return e; } @@ -73,11 +81,20 @@ Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) { f.encode(buf); return e; } - + +iovec Event::toIovec() { + encodeHeader(); + iovec iov = { const_cast<char*>(getStore()), getStoreSize() }; + return iov; +} + void EventHeader::encode(Buffer& b) const { b.putOctet(type); b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer())); b.putLong(size); +#ifdef QPID_LATENCY_METRIC + b.putLongLong(latency_metric_timestamp); +#endif } // Encode my header in my buffer. diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h index 32e8f5e07b..110ec524c7 100644 --- a/cpp/src/qpid/cluster/Event.h +++ b/cpp/src/qpid/cluster/Event.h @@ -27,6 +27,8 @@ #include "Connection.h" #include "qpid/RefCountedBuffer.h" #include "qpid/framing/Buffer.h" +#include "qpid/sys/LatencyMetric.h" +#include <sys/uio.h> // For iovec #include <iosfwd> namespace qpid { @@ -37,7 +39,7 @@ namespace cluster { // /** Header data for a multicast event */ -class EventHeader { +class EventHeader : public ::qpid::sys::LatencyMetricTimestamp { public: EventHeader(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0); void decode(const MemberId& m, framing::Buffer&); @@ -65,8 +67,9 @@ class EventHeader { */ class Event : public EventHeader { public: + Event(); /** Create an event with a buffer that can hold size bytes plus an event header. */ - Event(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0); + Event(EventType t, const ConnectionId& c, size_t); /** Create an event copied from delivered data. */ static Event decodeCopy(const MemberId& m, framing::Buffer&); @@ -85,6 +88,8 @@ class Event : public EventHeader { operator framing::Buffer() const; + iovec toIovec(); + private: void encodeHeader(); diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp index 34614dc1ef..4fa12651eb 100644 --- a/cpp/src/qpid/cluster/Multicaster.cpp +++ b/cpp/src/qpid/cluster/Multicaster.cpp @@ -23,7 +23,7 @@ #include "Cpg.h" #include "ClusterLeaveException.h" #include "qpid/log/Statement.h" - +#include "qpid/sys/LatencyMetric.h" namespace qpid { namespace cluster { @@ -59,8 +59,8 @@ void Multicaster::mcast(const Event& e) { return; } } + QPID_LATENCY_INIT(e); queue.push(e); - } @@ -76,7 +76,8 @@ void Multicaster::sendMcast(PollableEventQueue::Queue& values) { } ++pending; } - iovec iov = { const_cast<char*>(i->getStore()), i->getStoreSize() }; + QPID_LATENCY_RECORD("mcast send queue", *i); + iovec iov = i->toIovec(); if (!cpg.mcast(&iov, 1)) { // cpg didn't send because of CPG flow control. if (mcastMax) { @@ -104,8 +105,9 @@ void Multicaster::release() { holdingQueue.clear(); } -void Multicaster::selfDeliver(const Event&) { +void Multicaster::selfDeliver(const Event& e) { sys::Mutex::ScopedLock l(lock); + QPID_LATENCY_RECORD("cpg self deliver", e); if (mcastMax) { assert(pending > 0); assert(pending <= mcastMax); diff --git a/cpp/src/qpid/cluster/Multicaster.h b/cpp/src/qpid/cluster/Multicaster.h index 8014cd8492..7e62134b96 100644 --- a/cpp/src/qpid/cluster/Multicaster.h +++ b/cpp/src/qpid/cluster/Multicaster.h @@ -27,7 +27,6 @@ #include "qpid/sys/PollableQueue.h" #include "qpid/sys/Mutex.h" #include <boost/shared_ptr.hpp> -#include <sys/uio.h> // For iovec namespace qpid { diff --git a/cpp/src/qpid/sys/LatencyMetric.cpp b/cpp/src/qpid/sys/LatencyMetric.cpp new file mode 100644 index 0000000000..93fd852d64 --- /dev/null +++ b/cpp/src/qpid/sys/LatencyMetric.cpp @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifdef QPID_LATENCY_METRIC + +#include "LatencyMetric.h" +#include "Time.h" +#include <iostream> + +namespace qpid { +namespace sys { + +void LatencyMetricTimestamp::initialize(const LatencyMetricTimestamp& ts) { + const_cast<int64_t&>(ts.latency_metric_timestamp) = Duration(now()); +} + +LatencyMetric::LatencyMetric(const char* msg, int64_t skip_) : + message(msg), count(0), total(0), skipped(0), skip(skip_) +{} + +LatencyMetric::~LatencyMetric() { report(); } + +void LatencyMetric::record(const LatencyMetricTimestamp& start) { + Mutex::ScopedLock l(lock); // FIXME aconway 2009-01-20: atomics? + if (!start.latency_metric_timestamp) return; // Ignore 0 timestamps. + if (skip) { + if (++skipped < skip) return; + else skipped = 0; + } + ++count; + int64_t now_ = Duration(now()); + total += now_ - start.latency_metric_timestamp; + // Set start time for next leg of the journey + const_cast<int64_t&>(start.latency_metric_timestamp) = now_; +} + +void LatencyMetric::report() { + using namespace std; + if (count) { + cout << "LATENCY: " << message << ": " + << total / (count * TIME_USEC) << " microseconds" << endl; + } + else { + cout << "LATENCY: " << message << ": no data." << endl; + } + count = 0; + total = 0; +} + + +}} // namespace qpid::sys + +#endif diff --git a/cpp/src/qpid/sys/LatencyMetric.h b/cpp/src/qpid/sys/LatencyMetric.h new file mode 100644 index 0000000000..f2ab1ec5e1 --- /dev/null +++ b/cpp/src/qpid/sys/LatencyMetric.h @@ -0,0 +1,80 @@ +#ifndef QPID_SYS_LATENCYMETRIC_H +#define QPID_SYS_LATENCYMETRIC_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifdef QPID_LATENCY_METRIC + +#include "qpid/sys/IntegerTypes.h" +#include "qpid/sys/Mutex.h" + +namespace qpid { +namespace sys { + +/** Use this base class to add a timestamp for latency to an object */ +struct LatencyMetricTimestamp { + LatencyMetricTimestamp() : latency_metric_timestamp(0) {} + static void initialize(const LatencyMetricTimestamp&); + int64_t latency_metric_timestamp; +}; + +/** + * Record average latencies, report on destruction. + * + * For debugging only, use via macros below so it can be compiled out + * of production code. + */ +class LatencyMetric { + public: + /** msg should be a string literal. */ + LatencyMetric(const char* msg, int64_t skip_=0); + ~LatencyMetric(); + + void record(const LatencyMetricTimestamp& start); + + private: + void report(); + Mutex lock; + const char* message; + int64_t ignore, count, total, skipped, skip; +}; + +}} // namespace qpid::sys + +#define QPID_LATENCY_INIT(x) ::qpid::sys::LatencyMetricTimestamp::initialize(x) +#define QPID_LATENCY_RECORD(msg, x) do { \ + static ::qpid::sys::LatencyMetric metric__(msg); metric__.record(x); \ + } while (false) + + +#else /* defined QPID_LATENCY_METRIC */ + +namespace qpid { namespace sys { +class LatencyMetricTimestamp {}; +}} + +#define QPID_LATENCY_INIT(x) (void)x +#define QPID_LATENCY_RECORD(msg, x) (void)x + +#endif /* defined QPID_LATENCY_METRIC */ + +#endif /*!QPID_SYS_LATENCYMETRIC_H*/ |