summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2011-01-11 11:02:05 +0000
committerGordon Sim <gsim@apache.org>2011-01-11 11:02:05 +0000
commit82f2bf33f3c39bdcde2720420bb406d9f405ac68 (patch)
tree872d88e7ba816d3f2a48d5fc08a57bb96eabe634 /cpp/src
parentd3cc015488955b0baa60cbd9b8dbe579aed26d40 (diff)
downloadqpid-python-82f2bf33f3c39bdcde2720420bb406d9f405ac68.tar.gz
QPID-2991: added message counts to connection stats; fixed xxxToClient stats
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1057578 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Connection.cpp47
-rw-r--r--cpp/src/qpid/broker/Connection.h26
2 files changed, 67 insertions, 6 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 98743cdae6..680d0c7adf 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -31,6 +31,7 @@
#include "qpid/ptr_map.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/framing/enum.h"
+#include "qpid/framing/MessageTransferBody.h"
#include "qmf/org/apache/qpid/broker/EventClientConnect.h"
#include "qmf/org/apache/qpid/broker/EventClientDisconnect.h"
@@ -98,8 +99,10 @@ Connection::Connection(ConnectionOutputHandler* out_,
timer(broker_.getTimer()),
errorListener(0),
objectId(objectId_),
- shadow(shadow_)
+ shadow(shadow_),
+ outboundTracker(*this)
{
+ outboundTracker.wrap(out);
if (isLink)
links.notifyConnection(mgmtId, this);
// In a cluster, allow adding the management object to be delayed.
@@ -160,27 +163,46 @@ void Connection::received(framing::AMQFrame& frame) {
getChannel(frame.getChannel()).in(frame);
}
- if (isLink)
+ if (isLink) //i.e. we are acting as the client to another broker
recordFromServer(frame);
else
recordFromClient(frame);
}
-void Connection::recordFromServer(framing::AMQFrame& frame)
+void Connection::sent(const framing::AMQFrame& frame)
+{
+ if (isLink) //i.e. we are acting as the client to another broker
+ recordFromClient(frame);
+ else
+ recordFromServer(frame);
+}
+
+bool isMessage(const AMQMethodBody* method)
+{
+ return method && method->isA<qpid::framing::MessageTransferBody>();
+}
+
+void Connection::recordFromServer(const framing::AMQFrame& frame)
{
if (mgmtObject != 0)
{
mgmtObject->inc_framesToClient();
mgmtObject->inc_bytesToClient(frame.encodedSize());
+ if (isMessage(frame.getMethod())) {
+ mgmtObject->inc_msgsToClient();
+ }
}
}
-void Connection::recordFromClient(framing::AMQFrame& frame)
+void Connection::recordFromClient(const framing::AMQFrame& frame)
{
if (mgmtObject != 0)
{
mgmtObject->inc_framesFromClient();
mgmtObject->inc_bytesFromClient(frame.encodedSize());
+ if (isMessage(frame.getMethod())) {
+ mgmtObject->inc_msgsFromClient();
+ }
}
}
@@ -442,4 +464,21 @@ void Connection::restartTimeout()
bool Connection::isOpen() { return adapter.isOpen(); }
+Connection::OutboundFrameTracker::OutboundFrameTracker(Connection& _con) : con(_con), next(0) {}
+void Connection::OutboundFrameTracker::close() { next->close(); }
+size_t Connection::OutboundFrameTracker::getBuffered() const { return next->getBuffered(); }
+void Connection::OutboundFrameTracker::abort() { next->abort(); }
+void Connection::OutboundFrameTracker::activateOutput() { next->activateOutput(); }
+void Connection::OutboundFrameTracker::giveReadCredit(int32_t credit) { next->giveReadCredit(credit); }
+void Connection::OutboundFrameTracker::send(framing::AMQFrame& f)
+{
+ next->send(f);
+ con.sent(f);
+}
+void Connection::OutboundFrameTracker::wrap(sys::ConnectionOutputHandlerPtr& p)
+{
+ next = p.get();
+ p.set(this);
+}
+
}}
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index d978187e0c..b751848d73 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -111,8 +111,8 @@ class Connection : public sys::ConnectionInputHandler,
ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
void requestIOProcessing (boost::function0<void>);
- void recordFromServer (framing::AMQFrame& frame);
- void recordFromClient (framing::AMQFrame& frame);
+ void recordFromServer (const framing::AMQFrame& frame);
+ void recordFromClient (const framing::AMQFrame& frame);
std::string getAuthMechanism();
std::string getAuthCredentials();
std::string getUsername();
@@ -181,7 +181,29 @@ class Connection : public sys::ConnectionInputHandler,
ErrorListener* errorListener;
uint64_t objectId;
bool shadow;
+ /**
+ * Chained ConnectionOutputHandler that allows outgoing frames to be
+ * tracked (for updating mgmt stats).
+ */
+ class OutboundFrameTracker : public sys::ConnectionOutputHandler
+ {
+ public:
+ OutboundFrameTracker(Connection&);
+ void close();
+ size_t getBuffered() const;
+ void abort();
+ void activateOutput();
+ void giveReadCredit(int32_t credit);
+ void send(framing::AMQFrame&);
+ void wrap(sys::ConnectionOutputHandlerPtr&);
+ private:
+ Connection& con;
+ sys::ConnectionOutputHandler* next;
+ };
+ OutboundFrameTracker outboundTracker;
+
+ void sent(const framing::AMQFrame& f);
public:
qmf::org::apache::qpid::broker::Connection* getMgmtObject() { return mgmtObject; }
};