diff options
author | Gordon Sim <gsim@apache.org> | 2011-01-11 11:02:05 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2011-01-11 11:02:05 +0000 |
commit | 82f2bf33f3c39bdcde2720420bb406d9f405ac68 (patch) | |
tree | 872d88e7ba816d3f2a48d5fc08a57bb96eabe634 /cpp/src/qpid/broker/Connection.cpp | |
parent | d3cc015488955b0baa60cbd9b8dbe579aed26d40 (diff) | |
download | qpid-python-82f2bf33f3c39bdcde2720420bb406d9f405ac68.tar.gz |
QPID-2991: added message counts to connection stats; fixed xxxToClient stats
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1057578 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 47 |
1 files changed, 43 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 98743cdae6..680d0c7adf 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -31,6 +31,7 @@ #include "qpid/ptr_map.h" #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/enum.h" +#include "qpid/framing/MessageTransferBody.h" #include "qmf/org/apache/qpid/broker/EventClientConnect.h" #include "qmf/org/apache/qpid/broker/EventClientDisconnect.h" @@ -98,8 +99,10 @@ Connection::Connection(ConnectionOutputHandler* out_, timer(broker_.getTimer()), errorListener(0), objectId(objectId_), - shadow(shadow_) + shadow(shadow_), + outboundTracker(*this) { + outboundTracker.wrap(out); if (isLink) links.notifyConnection(mgmtId, this); // In a cluster, allow adding the management object to be delayed. @@ -160,27 +163,46 @@ void Connection::received(framing::AMQFrame& frame) { getChannel(frame.getChannel()).in(frame); } - if (isLink) + if (isLink) //i.e. we are acting as the client to another broker recordFromServer(frame); else recordFromClient(frame); } -void Connection::recordFromServer(framing::AMQFrame& frame) +void Connection::sent(const framing::AMQFrame& frame) +{ + if (isLink) //i.e. we are acting as the client to another broker + recordFromClient(frame); + else + recordFromServer(frame); +} + +bool isMessage(const AMQMethodBody* method) +{ + return method && method->isA<qpid::framing::MessageTransferBody>(); +} + +void Connection::recordFromServer(const framing::AMQFrame& frame) { if (mgmtObject != 0) { mgmtObject->inc_framesToClient(); mgmtObject->inc_bytesToClient(frame.encodedSize()); + if (isMessage(frame.getMethod())) { + mgmtObject->inc_msgsToClient(); + } } } -void Connection::recordFromClient(framing::AMQFrame& frame) +void Connection::recordFromClient(const framing::AMQFrame& frame) { if (mgmtObject != 0) { mgmtObject->inc_framesFromClient(); mgmtObject->inc_bytesFromClient(frame.encodedSize()); + if (isMessage(frame.getMethod())) { + mgmtObject->inc_msgsFromClient(); + } } } @@ -442,4 +464,21 @@ void Connection::restartTimeout() bool Connection::isOpen() { return adapter.isOpen(); } +Connection::OutboundFrameTracker::OutboundFrameTracker(Connection& _con) : con(_con), next(0) {} +void Connection::OutboundFrameTracker::close() { next->close(); } +size_t Connection::OutboundFrameTracker::getBuffered() const { return next->getBuffered(); } +void Connection::OutboundFrameTracker::abort() { next->abort(); } +void Connection::OutboundFrameTracker::activateOutput() { next->activateOutput(); } +void Connection::OutboundFrameTracker::giveReadCredit(int32_t credit) { next->giveReadCredit(credit); } +void Connection::OutboundFrameTracker::send(framing::AMQFrame& f) +{ + next->send(f); + con.sent(f); +} +void Connection::OutboundFrameTracker::wrap(sys::ConnectionOutputHandlerPtr& p) +{ + next = p.get(); + p.set(this); +} + }} |