summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-03-17 17:01:37 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-03-17 17:01:37 +0000
commita803830aab112579ff8c194c2dea44a5e663f77e (patch)
treea3f162664c89552816f0035e013569fb606c4f1c
parentb5feef17e3fda5d564cb0db9884060d396ebbdb3 (diff)
downloadqpid-python-a803830aab112579ff8c194c2dea44a5e663f77e.tar.gz
QPID-4653 : [Java Broker 1.0] Implement statistics counting on 1.0 connections
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1457489 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java28
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java5
4 files changed, 40 insertions, 11 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index cf4164c244..f79d34ea71 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -118,10 +118,10 @@ public class Connection_1_0 implements ConnectionEventListener
private final AMQConnectionModel _model = new AMQConnectionModel()
{
- private final StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter();
- private final StatisticsCounter _messageReceiptStatistics = new StatisticsCounter();
- private final StatisticsCounter _dataDeliveryStatistics = new StatisticsCounter();
- private final StatisticsCounter _dataReceiptStatistics = new StatisticsCounter();
+ private StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter();
+ private StatisticsCounter _messageReceiptStatistics = new StatisticsCounter();
+ private StatisticsCounter _dataDeliveryStatistics = new StatisticsCounter();
+ private StatisticsCounter _dataReceiptStatistics = new StatisticsCounter();
private final LogSubject _logSubject = new LogSubject()
{
@@ -232,19 +232,28 @@ public class Connection_1_0 implements ConnectionEventListener
@Override
public void initialiseStatistics()
{
- // TODO
+ _messageDeliveryStatistics = new StatisticsCounter("messages-delivered-" + getConnectionId());
+ _dataDeliveryStatistics = new StatisticsCounter("data-delivered-" + getConnectionId());
+ _messageReceiptStatistics = new StatisticsCounter("messages-received-" + getConnectionId());
+ _dataReceiptStatistics = new StatisticsCounter("data-received-" + getConnectionId());
}
@Override
public void registerMessageReceived(long messageSize, long timestamp)
{
- // TODO
+ _messageReceiptStatistics.registerEvent(1L, timestamp);
+ _dataReceiptStatistics.registerEvent(messageSize, timestamp);
+ _vhost.registerMessageReceived(messageSize,timestamp);
+
}
@Override
public void registerMessageDelivered(long messageSize)
{
- // TODO
+
+ _messageDeliveryStatistics.registerEvent(1L);
+ _dataDeliveryStatistics.registerEvent(messageSize);
+ _vhost.registerMessageDelivered(messageSize);
}
@Override
@@ -274,7 +283,10 @@ public class Connection_1_0 implements ConnectionEventListener
@Override
public void resetStatistics()
{
- // TODO
+ _dataDeliveryStatistics.reset();
+ _dataReceiptStatistics.reset();
+ _messageDeliveryStatistics.reset();
+ _messageReceiptStatistics.reset();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
index fbce1666b7..d0000c8db6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
@@ -45,6 +45,7 @@ public class Message_1_0 implements ServerMessage, InboundMessage
private final StoredMessage<MessageMetaData_1_0> _storedMessage;
private List<ByteBuffer> _fragments;
private WeakReference<Session_1_0> _session;
+ private long _arrivalTime;
public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage)
@@ -81,6 +82,7 @@ public class Message_1_0 implements ServerMessage, InboundMessage
_storedMessage = storedMessage;
_fragments = fragments;
_session = new WeakReference<Session_1_0>(session);
+ _arrivalTime = System.currentTimeMillis();
}
public String getRoutingKey()
@@ -129,8 +131,16 @@ public class Message_1_0 implements ServerMessage, InboundMessage
public long getSize()
{
- // TODO
- return 0l;
+ long size = 0l;
+ if(_fragments != null)
+ {
+ for(ByteBuffer buf : _fragments)
+ {
+ size += buf.remaining();
+ }
+ }
+
+ return size;
}
public boolean isImmediate()
@@ -155,7 +165,7 @@ public class Message_1_0 implements ServerMessage, InboundMessage
public long getArrivalTime()
{
- return 0; //TODO
+ return _arrivalTime;
}
public int getContent(final ByteBuffer buf, final int offset)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
index e097dd5c83..3e2652b3b8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
@@ -214,6 +214,8 @@ public class ReceivingLink_1_0 implements ReceivingLinkListener, Link_1_0, Deliv
getEndpoint().updateDisposition(deliveryTag, resultantState, settled);
+ getSession().getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
+
if(!(transaction instanceof AutoCommitTransaction))
{
ServerTransaction.Action a;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index a0ed824c58..60bb8e4044 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -598,4 +598,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
{
return getId().compareTo(o.getId());
}
+
+ public Connection_1_0 getConnection()
+ {
+ return _connection;
+ }
}