diff options
author | Alan Conway <aconway@apache.org> | 2009-01-20 22:11:37 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-01-20 22:11:37 +0000 |
commit | 066fd1ab9f1840cfe09204bc5f3d550f1e12d49b (patch) | |
tree | 51c9961e79c811c3240710bd7b7435a73e11c2b7 /cpp/src/qpid/cluster | |
parent | 861692abf515cf136e86e70446d005e7849a0f87 (diff) | |
download | qpid-python-066fd1ab9f1840cfe09204bc5f3d550f1e12d49b.tar.gz |
Latency measurements, compiled out of production code.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@736135 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Event.cpp | 33 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Event.h | 9 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Multicaster.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Multicaster.h | 1 |
5 files changed, 41 insertions, 16 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 18b4a2e69c..0b6d56c259 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -38,6 +38,7 @@ #include "qpid/log/Statement.h" #include "qpid/log/Helpers.h" #include "qpid/sys/Thread.h" +#include "qpid/sys/LatencyMetric.h" #include "qpid/memory.h" #include "qpid/shared_ptr.h" #include "qmf/org/apache/qpid/cluster/Package.h" @@ -182,7 +183,7 @@ void Cluster::deliver( MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); Event e(Event::decodeCopy(from, buf)); - if (from == myId) // Record self-deliveries for flow control. + if (from == myId) // Record self-deliveries for flow control. mcast.selfDeliver(e); deliver(e, l); } @@ -206,6 +207,7 @@ void Cluster::delivered(PollableEventQueue::Queue& events) { } void Cluster::deliveredEvent(const EventHeader& e, const char* data) { + QPID_LATENCY_RECORD("deliver queue", e); Buffer buf(const_cast<char*>(data), e.getSize()); AMQFrame frame; if (e.isCluster()) { diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp index 8d4429a8ed..59a7241715 100644 --- a/cpp/src/qpid/cluster/Event.cpp +++ b/cpp/src/qpid/cluster/Event.cpp @@ -35,16 +35,21 @@ using framing::Buffer; const size_t EventHeader::HEADER_SIZE = sizeof(uint8_t) + // type sizeof(uint64_t) + // connection pointer only, CPG provides member ID. - sizeof(uint32_t); // payload size + sizeof(uint32_t) // payload size +#ifdef QPID_LATENCY_METRIC + + sizeof(int64_t) // timestamp +#endif + ; EventHeader::EventHeader(EventType t, const ConnectionId& c, size_t s) : type(t), connectionId(c), size(s) {} + +Event::Event() {} + Event::Event(EventType t, const ConnectionId& c, size_t s) : EventHeader(t,c,s), store(RefCountedBuffer::create(s+HEADER_SIZE)) -{ - encodeHeader(); -} +{} void EventHeader::decode(const MemberId& m, framing::Buffer& buf) { if (buf.available() <= HEADER_SIZE) @@ -54,14 +59,17 @@ void EventHeader::decode(const MemberId& m, framing::Buffer& buf) { throw ClusterLeaveException("Invalid multicast event type"); connectionId = ConnectionId(m, reinterpret_cast<Connection*>(buf.getLongLong())); size = buf.getLong(); +#ifdef QPID_LATENCY_METRIC + latency_metric_timestamp = buf.getLongLong(); +#endif } Event Event::decodeCopy(const MemberId& m, framing::Buffer& buf) { - EventHeader h; - h.decode(m, buf); // Header - Event e(h.getType(), h.getConnectionId(), h.getSize()); + Event e; + e.decode(m, buf); // Header if (buf.available() < e.size) throw ClusterLeaveException("Not enough data for multicast event"); + e.store = RefCountedBuffer::create(e.size + HEADER_SIZE); memcpy(e.getData(), buf.getPointer() + buf.getPosition(), e.size); return e; } @@ -73,11 +81,20 @@ Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) { f.encode(buf); return e; } - + +iovec Event::toIovec() { + encodeHeader(); + iovec iov = { const_cast<char*>(getStore()), getStoreSize() }; + return iov; +} + void EventHeader::encode(Buffer& b) const { b.putOctet(type); b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer())); b.putLong(size); +#ifdef QPID_LATENCY_METRIC + b.putLongLong(latency_metric_timestamp); +#endif } // Encode my header in my buffer. diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h index 32e8f5e07b..110ec524c7 100644 --- a/cpp/src/qpid/cluster/Event.h +++ b/cpp/src/qpid/cluster/Event.h @@ -27,6 +27,8 @@ #include "Connection.h" #include "qpid/RefCountedBuffer.h" #include "qpid/framing/Buffer.h" +#include "qpid/sys/LatencyMetric.h" +#include <sys/uio.h> // For iovec #include <iosfwd> namespace qpid { @@ -37,7 +39,7 @@ namespace cluster { // /** Header data for a multicast event */ -class EventHeader { +class EventHeader : public ::qpid::sys::LatencyMetricTimestamp { public: EventHeader(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0); void decode(const MemberId& m, framing::Buffer&); @@ -65,8 +67,9 @@ class EventHeader { */ class Event : public EventHeader { public: + Event(); /** Create an event with a buffer that can hold size bytes plus an event header. */ - Event(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0); + Event(EventType t, const ConnectionId& c, size_t); /** Create an event copied from delivered data. */ static Event decodeCopy(const MemberId& m, framing::Buffer&); @@ -85,6 +88,8 @@ class Event : public EventHeader { operator framing::Buffer() const; + iovec toIovec(); + private: void encodeHeader(); diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp index 34614dc1ef..4fa12651eb 100644 --- a/cpp/src/qpid/cluster/Multicaster.cpp +++ b/cpp/src/qpid/cluster/Multicaster.cpp @@ -23,7 +23,7 @@ #include "Cpg.h" #include "ClusterLeaveException.h" #include "qpid/log/Statement.h" - +#include "qpid/sys/LatencyMetric.h" namespace qpid { namespace cluster { @@ -59,8 +59,8 @@ void Multicaster::mcast(const Event& e) { return; } } + QPID_LATENCY_INIT(e); queue.push(e); - } @@ -76,7 +76,8 @@ void Multicaster::sendMcast(PollableEventQueue::Queue& values) { } ++pending; } - iovec iov = { const_cast<char*>(i->getStore()), i->getStoreSize() }; + QPID_LATENCY_RECORD("mcast send queue", *i); + iovec iov = i->toIovec(); if (!cpg.mcast(&iov, 1)) { // cpg didn't send because of CPG flow control. if (mcastMax) { @@ -104,8 +105,9 @@ void Multicaster::release() { holdingQueue.clear(); } -void Multicaster::selfDeliver(const Event&) { +void Multicaster::selfDeliver(const Event& e) { sys::Mutex::ScopedLock l(lock); + QPID_LATENCY_RECORD("cpg self deliver", e); if (mcastMax) { assert(pending > 0); assert(pending <= mcastMax); diff --git a/cpp/src/qpid/cluster/Multicaster.h b/cpp/src/qpid/cluster/Multicaster.h index 8014cd8492..7e62134b96 100644 --- a/cpp/src/qpid/cluster/Multicaster.h +++ b/cpp/src/qpid/cluster/Multicaster.h @@ -27,7 +27,6 @@ #include "qpid/sys/PollableQueue.h" #include "qpid/sys/Mutex.h" #include <boost/shared_ptr.hpp> -#include <sys/uio.h> // For iovec namespace qpid { |