summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp1
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp4
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.cpp4
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.h3
-rw-r--r--cpp/src/qpid/framing/AMQFrame.h3
-rw-r--r--cpp/src/qpid/sys/LatencyMetric.cpp1
-rw-r--r--cpp/src/qpid/sys/LatencyMetric.h2
-rw-r--r--cpp/src/tests/.valgrind.supp8
-rwxr-xr-xcpp/src/tests/run_test4
9 files changed, 21 insertions, 9 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 0b6d56c259..d31fa07c57 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -238,6 +238,7 @@ void Cluster::deliveredEvent(const EventHeader& e, const char* data) {
}
}
}
+ QPID_LATENCY_RECORD("decode+execute", e);
}
struct AddrList {
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index f0d38bf299..cdee87dfcd 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -39,6 +39,7 @@
#include "qpid/framing/ConnectionCloseBody.h"
#include "qpid/framing/ConnectionCloseOkBody.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/LatencyMetric.h"
#include <boost/current_function.hpp>
@@ -73,7 +74,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
void Connection::init() {
QPID_LOG(debug, cluster << " new connection: " << *this);
- if (isLocal() && !isCatchUp()) {
+ if (isLocal() && !isCatchUp() && cluster.getReadMax()) {
output.giveReadCredit(cluster.getReadMax());
}
}
@@ -137,6 +138,7 @@ bool Connection::checkUnsupported(const AMQBody& body) {
// Delivered from cluster.
void Connection::delivered(framing::AMQFrame& f) {
QPID_LOG(trace, cluster << " RECV: " << *this << ": " << f);
+ QPID_LATENCY_INIT(f);
assert(!catchUp);
currentChannel = f.getChannel();
if (!framing::invoke(*this, *f.getBody()).wasHandled() // Connection contol.
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp
index 1565ef2efb..472fb2e6c0 100644
--- a/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -46,6 +46,7 @@ void OutputInterceptor::send(framing::AMQFrame& f) {
}
if (!parent.isCatchUp())
sent += f.encodedSize();
+ QPID_LATENCY_RECORD("on write queue", f);
}
void OutputInterceptor::activateOutput() {
@@ -77,6 +78,7 @@ bool OutputInterceptor::doOutput() {
// which tranfers frames to the codec for writing.
//
void OutputInterceptor::deliverDoOutput(size_t requested) {
+ QPID_LATENCY_RECORD("deliver do-output", *this);
size_t buf = getBuffered();
if (parent.isLocal())
writeEstimate.delivered(requested, sent, buf); // Update the estimate.
@@ -101,7 +103,7 @@ void OutputInterceptor::deliverDoOutput(size_t requested) {
// Send a doOutput request if one is not already in flight.
void OutputInterceptor::sendDoOutput() {
if (!parent.isLocal()) return;
-
+ QPID_LATENCY_INIT(*this);
doingOutput = true;
size_t request = writeEstimate.sending(getBuffered());
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.h b/cpp/src/qpid/cluster/OutputInterceptor.h
index 0ac15e747a..6cf381178d 100644
--- a/cpp/src/qpid/cluster/OutputInterceptor.h
+++ b/cpp/src/qpid/cluster/OutputInterceptor.h
@@ -25,6 +25,7 @@
#include "WriteEstimate.h"
#include "qpid/sys/ConnectionOutputHandler.h"
#include "qpid/broker/ConnectionFactory.h"
+#include "qpid/sys/LatencyMetric.h"
#include <boost/function.hpp>
namespace qpid {
@@ -36,7 +37,7 @@ class Connection;
/**
* Interceptor for connection OutputHandler, manages outgoing message replication.
*/
-class OutputInterceptor : public sys::ConnectionOutputHandler {
+class OutputInterceptor : public sys::ConnectionOutputHandler, sys::LatencyMetricTimestamp {
public:
OutputInterceptor(cluster::Connection& p, sys::ConnectionOutputHandler& h);
diff --git a/cpp/src/qpid/framing/AMQFrame.h b/cpp/src/qpid/framing/AMQFrame.h
index ddfe438806..7bf4638089 100644
--- a/cpp/src/qpid/framing/AMQFrame.h
+++ b/cpp/src/qpid/framing/AMQFrame.h
@@ -27,6 +27,7 @@
#include "AMQHeartbeatBody.h"
#include "ProtocolVersion.h"
#include "BodyHolder.h"
+#include "qpid/sys/LatencyMetric.h"
#include <boost/intrusive_ptr.hpp>
#include <boost/cast.hpp>
@@ -36,7 +37,7 @@ namespace framing {
class BodyHolder;
-class AMQFrame : public AMQDataBlock
+class AMQFrame : public AMQDataBlock, public sys::LatencyMetricTimestamp
{
public:
AMQFrame(boost::intrusive_ptr<BodyHolder> b=0) : body(b) { init(); }
diff --git a/cpp/src/qpid/sys/LatencyMetric.cpp b/cpp/src/qpid/sys/LatencyMetric.cpp
index 93fd852d64..6a52425706 100644
--- a/cpp/src/qpid/sys/LatencyMetric.cpp
+++ b/cpp/src/qpid/sys/LatencyMetric.cpp
@@ -39,7 +39,6 @@ LatencyMetric::LatencyMetric(const char* msg, int64_t skip_) :
LatencyMetric::~LatencyMetric() { report(); }
void LatencyMetric::record(const LatencyMetricTimestamp& start) {
- Mutex::ScopedLock l(lock); // FIXME aconway 2009-01-20: atomics?
if (!start.latency_metric_timestamp) return; // Ignore 0 timestamps.
if (skip) {
if (++skipped < skip) return;
diff --git a/cpp/src/qpid/sys/LatencyMetric.h b/cpp/src/qpid/sys/LatencyMetric.h
index f2ab1ec5e1..ff679ef6a8 100644
--- a/cpp/src/qpid/sys/LatencyMetric.h
+++ b/cpp/src/qpid/sys/LatencyMetric.h
@@ -25,7 +25,6 @@
#ifdef QPID_LATENCY_METRIC
#include "qpid/sys/IntegerTypes.h"
-#include "qpid/sys/Mutex.h"
namespace qpid {
namespace sys {
@@ -53,7 +52,6 @@ class LatencyMetric {
private:
void report();
- Mutex lock;
const char* message;
int64_t ignore, count, total, skipped, skip;
};
diff --git a/cpp/src/tests/.valgrind.supp b/cpp/src/tests/.valgrind.supp
index 7ae2bd9845..9984c18151 100644
--- a/cpp/src/tests/.valgrind.supp
+++ b/cpp/src/tests/.valgrind.supp
@@ -1,4 +1,12 @@
{
+ Benign leak in CPG - patched version.
+ Memcheck:Leak
+ fun:*
+ fun:openais_service_connect
+ fun:cpg_initialize
+}
+
+{
Benign error in libcpg.
Memcheck:Param
socketcall.sendmsg(msg.msg_iov[i])
diff --git a/cpp/src/tests/run_test b/cpp/src/tests/run_test
index 062e9e137e..82be3b0893 100755
--- a/cpp/src/tests/run_test
+++ b/cpp/src/tests/run_test
@@ -24,7 +24,7 @@
# Output nothing if test passes, show the output if it fails and
# leave output in <test>.log for examination.
#
-# If qpidd.port exists run test with QPID_PORT=`cat qpidd.port`
+# If qpidd.port exists and is not empty run test with QPID_PORT=`cat qpidd.port`
#
# If $VALGRIND if is set run under valgrind. If there are valgrind
# erros show valgrind output, also leave it in <test>.valgrind for
@@ -38,7 +38,7 @@ srcdir=`dirname $0`
export VALGRIND srcdir
# Set QPID_PORT if qpidd.port exists.
-test -f qpidd.port && QPID_PORT=`cat qpidd.port`
+test -s qpidd.port && QPID_PORT=`cat qpidd.port`
export QPID_PORT
# Avoid silly libtool error messages if these are not defined