diff options
author | Gordon Sim <gsim@apache.org> | 2011-01-11 11:02:05 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2011-01-11 11:02:05 +0000 |
commit | 02148c8eb0abf489a5052173272c609761a60e7f (patch) | |
tree | b94f70d69cd3ef7763eebacf6299bcf116995f6a | |
parent | c68f328d629b9b11af8869aec0fb938beeaf535e (diff) | |
download | qpid-python-02148c8eb0abf489a5052173272c609761a60e7f.tar.gz |
QPID-2991: added message counts to connection stats; fixed xxxToClient stats
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1057578 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.cpp | 47 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.h | 26 | ||||
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java | 12 | ||||
-rw-r--r-- | qpid/specs/management-schema.xml | 2 | ||||
-rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_0_10/management.py | 30 |
5 files changed, 111 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 98743cdae6..680d0c7adf 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -31,6 +31,7 @@ #include "qpid/ptr_map.h" #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/enum.h" +#include "qpid/framing/MessageTransferBody.h" #include "qmf/org/apache/qpid/broker/EventClientConnect.h" #include "qmf/org/apache/qpid/broker/EventClientDisconnect.h" @@ -98,8 +99,10 @@ Connection::Connection(ConnectionOutputHandler* out_, timer(broker_.getTimer()), errorListener(0), objectId(objectId_), - shadow(shadow_) + shadow(shadow_), + outboundTracker(*this) { + outboundTracker.wrap(out); if (isLink) links.notifyConnection(mgmtId, this); // In a cluster, allow adding the management object to be delayed. @@ -160,27 +163,46 @@ void Connection::received(framing::AMQFrame& frame) { getChannel(frame.getChannel()).in(frame); } - if (isLink) + if (isLink) //i.e. we are acting as the client to another broker recordFromServer(frame); else recordFromClient(frame); } -void Connection::recordFromServer(framing::AMQFrame& frame) +void Connection::sent(const framing::AMQFrame& frame) +{ + if (isLink) //i.e. we are acting as the client to another broker + recordFromClient(frame); + else + recordFromServer(frame); +} + +bool isMessage(const AMQMethodBody* method) +{ + return method && method->isA<qpid::framing::MessageTransferBody>(); +} + +void Connection::recordFromServer(const framing::AMQFrame& frame) { if (mgmtObject != 0) { mgmtObject->inc_framesToClient(); mgmtObject->inc_bytesToClient(frame.encodedSize()); + if (isMessage(frame.getMethod())) { + mgmtObject->inc_msgsToClient(); + } } } -void Connection::recordFromClient(framing::AMQFrame& frame) +void Connection::recordFromClient(const framing::AMQFrame& frame) { if (mgmtObject != 0) { mgmtObject->inc_framesFromClient(); mgmtObject->inc_bytesFromClient(frame.encodedSize()); + if (isMessage(frame.getMethod())) { + mgmtObject->inc_msgsFromClient(); + } } } @@ -442,4 +464,21 @@ void Connection::restartTimeout() bool Connection::isOpen() { return adapter.isOpen(); } +Connection::OutboundFrameTracker::OutboundFrameTracker(Connection& _con) : con(_con), next(0) {} +void Connection::OutboundFrameTracker::close() { next->close(); } +size_t Connection::OutboundFrameTracker::getBuffered() const { return next->getBuffered(); } +void Connection::OutboundFrameTracker::abort() { next->abort(); } +void Connection::OutboundFrameTracker::activateOutput() { next->activateOutput(); } +void Connection::OutboundFrameTracker::giveReadCredit(int32_t credit) { next->giveReadCredit(credit); } +void Connection::OutboundFrameTracker::send(framing::AMQFrame& f) +{ + next->send(f); + con.sent(f); +} +void Connection::OutboundFrameTracker::wrap(sys::ConnectionOutputHandlerPtr& p) +{ + next = p.get(); + p.set(this); +} + }} diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index d978187e0c..b751848d73 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -111,8 +111,8 @@ class Connection : public sys::ConnectionInputHandler, ManagementMethod (uint32_t methodId, management::Args& args, std::string&); void requestIOProcessing (boost::function0<void>); - void recordFromServer (framing::AMQFrame& frame); - void recordFromClient (framing::AMQFrame& frame); + void recordFromServer (const framing::AMQFrame& frame); + void recordFromClient (const framing::AMQFrame& frame); std::string getAuthMechanism(); std::string getAuthCredentials(); std::string getUsername(); @@ -181,7 +181,29 @@ class Connection : public sys::ConnectionInputHandler, ErrorListener* errorListener; uint64_t objectId; bool shadow; + /** + * Chained ConnectionOutputHandler that allows outgoing frames to be + * tracked (for updating mgmt stats). + */ + class OutboundFrameTracker : public sys::ConnectionOutputHandler + { + public: + OutboundFrameTracker(Connection&); + void close(); + size_t getBuffered() const; + void abort(); + void activateOutput(); + void giveReadCredit(int32_t credit); + void send(framing::AMQFrame&); + void wrap(sys::ConnectionOutputHandlerPtr&); + private: + Connection& con; + sys::ConnectionOutputHandler* next; + }; + OutboundFrameTracker outboundTracker; + + void sent(const framing::AMQFrame& f); public: qmf::org::apache::qpid::broker::Connection* getMgmtObject() { return mgmtObject; } }; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java index 04960adb4b..c0afae0773 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java @@ -1249,6 +1249,18 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable return 0l; } + public Long getMsgsFromClient() + { + // TODO + return 0l; + } + + public Long getMsgsToClient() + { + // TODO + return 0l; + } + public BrokerSchema.ConnectionClass.CloseMethodResponseCommand close(final BrokerSchema.ConnectionClass.CloseMethodResponseCommandFactory factory) { _obj.mgmtClose(); diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml index b861cbd5da..b59f4c79d1 100644 --- a/qpid/specs/management-schema.xml +++ b/qpid/specs/management-schema.xml @@ -251,6 +251,8 @@ <statistic name="framesToClient" type="count64"/> <statistic name="bytesFromClient" type="count64"/> <statistic name="bytesToClient" type="count64"/> + <statistic name="msgsFromClient" type="count64"/> + <statistic name="msgsToClient" type="count64"/> <method name="close"/> </class> diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/management.py b/qpid/tests/src/py/qpid_tests/broker_0_10/management.py index 1e579debf4..06f3212a6f 100644 --- a/qpid/tests/src/py/qpid_tests/broker_0_10/management.py +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/management.py @@ -490,3 +490,33 @@ class ManagementTest (TestBase010): self.assertEqual(queue.bindingCount, 1, "deleted bindings not accounted for (expected 1, got %d)" % queue.bindingCount) + def test_connection_stats(self): + """ + Test message in/out stats for connection + """ + self.startQmf() + conn = self.connect() + session = conn.session("stats-session") + + #using qmf find named session and the corresponding connection: + conn_qmf = self.qmf.getObjects(_class="session", name="stats-session")[0]._connectionRef_ + + #send a message to a queue + session.queue_declare(queue="stats-q", exclusive=True, auto_delete=True) + session.message_transfer(message=Message(session.delivery_properties(routing_key="stats-q"), "abc")) + + #check the 'msgs sent from' stat for this connection + conn_qmf.update() + self.assertEqual(conn_qmf.msgsFromClient, 1) + + #receive message from queue + session.message_subscribe(destination="d", queue="stats-q") + incoming = session.incoming("d") + incoming.start() + self.assertEqual("abc", incoming.get(timeout=1).body) + + #check the 'msgs sent to' stat for this connection + conn_qmf.update() + self.assertEqual(conn_qmf.msgsToClient, 1) + + |