diff options
author | Gordon Sim <gsim@apache.org> | 2011-01-11 11:02:05 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2011-01-11 11:02:05 +0000 |
commit | 82f2bf33f3c39bdcde2720420bb406d9f405ac68 (patch) | |
tree | 872d88e7ba816d3f2a48d5fc08a57bb96eabe634 /cpp/src | |
parent | d3cc015488955b0baa60cbd9b8dbe579aed26d40 (diff) | |
download | qpid-python-82f2bf33f3c39bdcde2720420bb406d9f405ac68.tar.gz |
QPID-2991: added message counts to connection stats; fixed xxxToClient stats
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1057578 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 47 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.h | 26 |
2 files changed, 67 insertions, 6 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 98743cdae6..680d0c7adf 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -31,6 +31,7 @@ #include "qpid/ptr_map.h" #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/enum.h" +#include "qpid/framing/MessageTransferBody.h" #include "qmf/org/apache/qpid/broker/EventClientConnect.h" #include "qmf/org/apache/qpid/broker/EventClientDisconnect.h" @@ -98,8 +99,10 @@ Connection::Connection(ConnectionOutputHandler* out_, timer(broker_.getTimer()), errorListener(0), objectId(objectId_), - shadow(shadow_) + shadow(shadow_), + outboundTracker(*this) { + outboundTracker.wrap(out); if (isLink) links.notifyConnection(mgmtId, this); // In a cluster, allow adding the management object to be delayed. @@ -160,27 +163,46 @@ void Connection::received(framing::AMQFrame& frame) { getChannel(frame.getChannel()).in(frame); } - if (isLink) + if (isLink) //i.e. we are acting as the client to another broker recordFromServer(frame); else recordFromClient(frame); } -void Connection::recordFromServer(framing::AMQFrame& frame) +void Connection::sent(const framing::AMQFrame& frame) +{ + if (isLink) //i.e. we are acting as the client to another broker + recordFromClient(frame); + else + recordFromServer(frame); +} + +bool isMessage(const AMQMethodBody* method) +{ + return method && method->isA<qpid::framing::MessageTransferBody>(); +} + +void Connection::recordFromServer(const framing::AMQFrame& frame) { if (mgmtObject != 0) { mgmtObject->inc_framesToClient(); mgmtObject->inc_bytesToClient(frame.encodedSize()); + if (isMessage(frame.getMethod())) { + mgmtObject->inc_msgsToClient(); + } } } -void Connection::recordFromClient(framing::AMQFrame& frame) +void Connection::recordFromClient(const framing::AMQFrame& frame) { if (mgmtObject != 0) { mgmtObject->inc_framesFromClient(); mgmtObject->inc_bytesFromClient(frame.encodedSize()); + if (isMessage(frame.getMethod())) { + mgmtObject->inc_msgsFromClient(); + } } } @@ -442,4 +464,21 @@ void Connection::restartTimeout() bool Connection::isOpen() { return adapter.isOpen(); } +Connection::OutboundFrameTracker::OutboundFrameTracker(Connection& _con) : con(_con), next(0) {} +void Connection::OutboundFrameTracker::close() { next->close(); } +size_t Connection::OutboundFrameTracker::getBuffered() const { return next->getBuffered(); } +void Connection::OutboundFrameTracker::abort() { next->abort(); } +void Connection::OutboundFrameTracker::activateOutput() { next->activateOutput(); } +void Connection::OutboundFrameTracker::giveReadCredit(int32_t credit) { next->giveReadCredit(credit); } +void Connection::OutboundFrameTracker::send(framing::AMQFrame& f) +{ + next->send(f); + con.sent(f); +} +void Connection::OutboundFrameTracker::wrap(sys::ConnectionOutputHandlerPtr& p) +{ + next = p.get(); + p.set(this); +} + }} diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index d978187e0c..b751848d73 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -111,8 +111,8 @@ class Connection : public sys::ConnectionInputHandler, ManagementMethod (uint32_t methodId, management::Args& args, std::string&); void requestIOProcessing (boost::function0<void>); - void recordFromServer (framing::AMQFrame& frame); - void recordFromClient (framing::AMQFrame& frame); + void recordFromServer (const framing::AMQFrame& frame); + void recordFromClient (const framing::AMQFrame& frame); std::string getAuthMechanism(); std::string getAuthCredentials(); std::string getUsername(); @@ -181,7 +181,29 @@ class Connection : public sys::ConnectionInputHandler, ErrorListener* errorListener; uint64_t objectId; bool shadow; + /** + * Chained ConnectionOutputHandler that allows outgoing frames to be + * tracked (for updating mgmt stats). + */ + class OutboundFrameTracker : public sys::ConnectionOutputHandler + { + public: + OutboundFrameTracker(Connection&); + void close(); + size_t getBuffered() const; + void abort(); + void activateOutput(); + void giveReadCredit(int32_t credit); + void send(framing::AMQFrame&); + void wrap(sys::ConnectionOutputHandlerPtr&); + private: + Connection& con; + sys::ConnectionOutputHandler* next; + }; + OutboundFrameTracker outboundTracker; + + void sent(const framing::AMQFrame& f); public: qmf::org::apache::qpid::broker::Connection* getMgmtObject() { return mgmtObject; } }; |