diff options
Diffstat (limited to 'cpp/src/qpid/broker/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 47 |
1 files changed, 43 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 98743cdae6..680d0c7adf 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -31,6 +31,7 @@ #include "qpid/ptr_map.h" #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/enum.h" +#include "qpid/framing/MessageTransferBody.h" #include "qmf/org/apache/qpid/broker/EventClientConnect.h" #include "qmf/org/apache/qpid/broker/EventClientDisconnect.h" @@ -98,8 +99,10 @@ Connection::Connection(ConnectionOutputHandler* out_, timer(broker_.getTimer()), errorListener(0), objectId(objectId_), - shadow(shadow_) + shadow(shadow_), + outboundTracker(*this) { + outboundTracker.wrap(out); if (isLink) links.notifyConnection(mgmtId, this); // In a cluster, allow adding the management object to be delayed. @@ -160,27 +163,46 @@ void Connection::received(framing::AMQFrame& frame) { getChannel(frame.getChannel()).in(frame); } - if (isLink) + if (isLink) //i.e. we are acting as the client to another broker recordFromServer(frame); else recordFromClient(frame); } -void Connection::recordFromServer(framing::AMQFrame& frame) +void Connection::sent(const framing::AMQFrame& frame) +{ + if (isLink) //i.e. we are acting as the client to another broker + recordFromClient(frame); + else + recordFromServer(frame); +} + +bool isMessage(const AMQMethodBody* method) +{ + return method && method->isA<qpid::framing::MessageTransferBody>(); +} + +void Connection::recordFromServer(const framing::AMQFrame& frame) { if (mgmtObject != 0) { mgmtObject->inc_framesToClient(); mgmtObject->inc_bytesToClient(frame.encodedSize()); + if (isMessage(frame.getMethod())) { + mgmtObject->inc_msgsToClient(); + } } } -void Connection::recordFromClient(framing::AMQFrame& frame) +void Connection::recordFromClient(const framing::AMQFrame& frame) { if (mgmtObject != 0) { mgmtObject->inc_framesFromClient(); mgmtObject->inc_bytesFromClient(frame.encodedSize()); + if (isMessage(frame.getMethod())) { + mgmtObject->inc_msgsFromClient(); + } } } @@ -442,4 +464,21 @@ void Connection::restartTimeout() bool Connection::isOpen() { return adapter.isOpen(); } +Connection::OutboundFrameTracker::OutboundFrameTracker(Connection& _con) : con(_con), next(0) {} +void Connection::OutboundFrameTracker::close() { next->close(); } +size_t Connection::OutboundFrameTracker::getBuffered() const { return next->getBuffered(); } +void Connection::OutboundFrameTracker::abort() { next->abort(); } +void Connection::OutboundFrameTracker::activateOutput() { next->activateOutput(); } +void Connection::OutboundFrameTracker::giveReadCredit(int32_t credit) { next->giveReadCredit(credit); } +void Connection::OutboundFrameTracker::send(framing::AMQFrame& f) +{ + next->send(f); + con.sent(f); +} +void Connection::OutboundFrameTracker::wrap(sys::ConnectionOutputHandlerPtr& p) +{ + next = p.get(); + p.set(this); +} + }} |