summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java59
1 files changed, 56 insertions, 3 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 93fc50a2bd..e758febc3c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -78,13 +78,16 @@ import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.transport.Sender;
-public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
+public class AMQMinaProtocolSession implements AMQProtocolSession, Managable, StatisticsGatherer
{
private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
@@ -148,6 +151,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
private LogSubject _logSubject;
private final AtomicBoolean _closing = new AtomicBoolean(false);
+
+ private ApplicationRegistry _registry;
+ private boolean _statisticsEnabled = false;
+ private StatisticsCounter _messageStats, _dataStats;
public ManagedObject getManagedObject()
{
@@ -162,8 +169,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
session.setAttachment(this);
_codecFactory = codecFactory;
+ _registry = virtualHostRegistry.getApplicationRegistry();
- _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
+ _actor = new AMQPConnectionActor(this, _registry.getRootMessageLogger());
_actor.message(ConnectionMessages.CON_OPEN(null, null, false, false));
@@ -178,8 +186,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
e.printStackTrace();
throw e;
-
}
+
+ initialiseStatistics();
}
private AMQProtocolSessionMBean createMBean() throws AMQException
@@ -944,4 +953,48 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
// No-op, interface munging between this and AMQProtocolSession
}
+
+ public void registerMessageDelivery(long messageSize, long timestamp)
+ {
+ if (isStatisticsEnabled())
+ {
+ _logger.info("=== STATS === register " + messageSize);
+ _messageStats.registerEvent(1L, timestamp);
+ _dataStats.registerEvent(messageSize, timestamp);
+ }
+ _virtualHost.registerMessageDelivery(messageSize, timestamp);
+ }
+
+ public StatisticsCounter getMessageStatistics()
+ {
+ return _messageStats;
+ }
+
+ public StatisticsCounter getDataStatistics()
+ {
+ return _dataStats;
+ }
+
+ public void resetStatistics()
+ {
+ _messageStats.reset();
+ _dataStats.reset();
+ }
+
+ public void initialiseStatistics()
+ {
+ setStatisticsEnabled(_registry.getConfiguration().isStatisticsGenerationConnectionsEnabled());
+ _messageStats = new StatisticsCounter("messages-" + getSessionID());
+ _dataStats = new StatisticsCounter("bytes-" + getSessionID());
+ }
+
+ public boolean isStatisticsEnabled()
+ {
+ return _statisticsEnabled;
+ }
+
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ _statisticsEnabled = enabled;
+ }
}