summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2011-01-11 11:02:05 +0000
committerGordon Sim <gsim@apache.org>2011-01-11 11:02:05 +0000
commit02148c8eb0abf489a5052173272c609761a60e7f (patch)
treeb94f70d69cd3ef7763eebacf6299bcf116995f6a
parentc68f328d629b9b11af8869aec0fb938beeaf535e (diff)
downloadqpid-python-02148c8eb0abf489a5052173272c609761a60e7f.tar.gz
QPID-2991: added message counts to connection stats; fixed xxxToClient stats
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1057578 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp47
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h26
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java12
-rw-r--r--qpid/specs/management-schema.xml2
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/management.py30
5 files changed, 111 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index 98743cdae6..680d0c7adf 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -31,6 +31,7 @@
#include "qpid/ptr_map.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/framing/enum.h"
+#include "qpid/framing/MessageTransferBody.h"
#include "qmf/org/apache/qpid/broker/EventClientConnect.h"
#include "qmf/org/apache/qpid/broker/EventClientDisconnect.h"
@@ -98,8 +99,10 @@ Connection::Connection(ConnectionOutputHandler* out_,
timer(broker_.getTimer()),
errorListener(0),
objectId(objectId_),
- shadow(shadow_)
+ shadow(shadow_),
+ outboundTracker(*this)
{
+ outboundTracker.wrap(out);
if (isLink)
links.notifyConnection(mgmtId, this);
// In a cluster, allow adding the management object to be delayed.
@@ -160,27 +163,46 @@ void Connection::received(framing::AMQFrame& frame) {
getChannel(frame.getChannel()).in(frame);
}
- if (isLink)
+ if (isLink) //i.e. we are acting as the client to another broker
recordFromServer(frame);
else
recordFromClient(frame);
}
-void Connection::recordFromServer(framing::AMQFrame& frame)
+void Connection::sent(const framing::AMQFrame& frame)
+{
+ if (isLink) //i.e. we are acting as the client to another broker
+ recordFromClient(frame);
+ else
+ recordFromServer(frame);
+}
+
+bool isMessage(const AMQMethodBody* method)
+{
+ return method && method->isA<qpid::framing::MessageTransferBody>();
+}
+
+void Connection::recordFromServer(const framing::AMQFrame& frame)
{
if (mgmtObject != 0)
{
mgmtObject->inc_framesToClient();
mgmtObject->inc_bytesToClient(frame.encodedSize());
+ if (isMessage(frame.getMethod())) {
+ mgmtObject->inc_msgsToClient();
+ }
}
}
-void Connection::recordFromClient(framing::AMQFrame& frame)
+void Connection::recordFromClient(const framing::AMQFrame& frame)
{
if (mgmtObject != 0)
{
mgmtObject->inc_framesFromClient();
mgmtObject->inc_bytesFromClient(frame.encodedSize());
+ if (isMessage(frame.getMethod())) {
+ mgmtObject->inc_msgsFromClient();
+ }
}
}
@@ -442,4 +464,21 @@ void Connection::restartTimeout()
bool Connection::isOpen() { return adapter.isOpen(); }
+Connection::OutboundFrameTracker::OutboundFrameTracker(Connection& _con) : con(_con), next(0) {}
+void Connection::OutboundFrameTracker::close() { next->close(); }
+size_t Connection::OutboundFrameTracker::getBuffered() const { return next->getBuffered(); }
+void Connection::OutboundFrameTracker::abort() { next->abort(); }
+void Connection::OutboundFrameTracker::activateOutput() { next->activateOutput(); }
+void Connection::OutboundFrameTracker::giveReadCredit(int32_t credit) { next->giveReadCredit(credit); }
+void Connection::OutboundFrameTracker::send(framing::AMQFrame& f)
+{
+ next->send(f);
+ con.sent(f);
+}
+void Connection::OutboundFrameTracker::wrap(sys::ConnectionOutputHandlerPtr& p)
+{
+ next = p.get();
+ p.set(this);
+}
+
}}
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index d978187e0c..b751848d73 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -111,8 +111,8 @@ class Connection : public sys::ConnectionInputHandler,
ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
void requestIOProcessing (boost::function0<void>);
- void recordFromServer (framing::AMQFrame& frame);
- void recordFromClient (framing::AMQFrame& frame);
+ void recordFromServer (const framing::AMQFrame& frame);
+ void recordFromClient (const framing::AMQFrame& frame);
std::string getAuthMechanism();
std::string getAuthCredentials();
std::string getUsername();
@@ -181,7 +181,29 @@ class Connection : public sys::ConnectionInputHandler,
ErrorListener* errorListener;
uint64_t objectId;
bool shadow;
+ /**
+ * Chained ConnectionOutputHandler that allows outgoing frames to be
+ * tracked (for updating mgmt stats).
+ */
+ class OutboundFrameTracker : public sys::ConnectionOutputHandler
+ {
+ public:
+ OutboundFrameTracker(Connection&);
+ void close();
+ size_t getBuffered() const;
+ void abort();
+ void activateOutput();
+ void giveReadCredit(int32_t credit);
+ void send(framing::AMQFrame&);
+ void wrap(sys::ConnectionOutputHandlerPtr&);
+ private:
+ Connection& con;
+ sys::ConnectionOutputHandler* next;
+ };
+ OutboundFrameTracker outboundTracker;
+
+ void sent(const framing::AMQFrame& f);
public:
qmf::org::apache::qpid::broker::Connection* getMgmtObject() { return mgmtObject; }
};
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
index 04960adb4b..c0afae0773 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
@@ -1249,6 +1249,18 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
return 0l;
}
+ public Long getMsgsFromClient()
+ {
+ // TODO
+ return 0l;
+ }
+
+ public Long getMsgsToClient()
+ {
+ // TODO
+ return 0l;
+ }
+
public BrokerSchema.ConnectionClass.CloseMethodResponseCommand close(final BrokerSchema.ConnectionClass.CloseMethodResponseCommandFactory factory)
{
_obj.mgmtClose();
diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml
index b861cbd5da..b59f4c79d1 100644
--- a/qpid/specs/management-schema.xml
+++ b/qpid/specs/management-schema.xml
@@ -251,6 +251,8 @@
<statistic name="framesToClient" type="count64"/>
<statistic name="bytesFromClient" type="count64"/>
<statistic name="bytesToClient" type="count64"/>
+ <statistic name="msgsFromClient" type="count64"/>
+ <statistic name="msgsToClient" type="count64"/>
<method name="close"/>
</class>
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/management.py b/qpid/tests/src/py/qpid_tests/broker_0_10/management.py
index 1e579debf4..06f3212a6f 100644
--- a/qpid/tests/src/py/qpid_tests/broker_0_10/management.py
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/management.py
@@ -490,3 +490,33 @@ class ManagementTest (TestBase010):
self.assertEqual(queue.bindingCount, 1,
"deleted bindings not accounted for (expected 1, got %d)" % queue.bindingCount)
+ def test_connection_stats(self):
+ """
+ Test message in/out stats for connection
+ """
+ self.startQmf()
+ conn = self.connect()
+ session = conn.session("stats-session")
+
+ #using qmf find named session and the corresponding connection:
+ conn_qmf = self.qmf.getObjects(_class="session", name="stats-session")[0]._connectionRef_
+
+ #send a message to a queue
+ session.queue_declare(queue="stats-q", exclusive=True, auto_delete=True)
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="stats-q"), "abc"))
+
+ #check the 'msgs sent from' stat for this connection
+ conn_qmf.update()
+ self.assertEqual(conn_qmf.msgsFromClient, 1)
+
+ #receive message from queue
+ session.message_subscribe(destination="d", queue="stats-q")
+ incoming = session.incoming("d")
+ incoming.start()
+ self.assertEqual("abc", incoming.get(timeout=1).body)
+
+ #check the 'msgs sent to' stat for this connection
+ conn_qmf.update()
+ self.assertEqual(conn_qmf.msgsToClient, 1)
+
+