summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Donald Kennedy <grkvlt@apache.org>2011-03-08 00:14:59 +0000
committerAndrew Donald Kennedy <grkvlt@apache.org>2011-03-08 00:14:59 +0000
commitb387fc19a0dc062e95659ad34360ea942790e49e (patch)
treeab963e63e33e0500ff93c2065b2f5e00c99a7330
parent5c797b1f2ebce8b79f118dc64cd0c1b3b9efd23c (diff)
downloadqpid-python-b387fc19a0dc062e95659ad34360ea942790e49e.tar.gz
QPID-2984: Add statistics generation for broker message delivery
Port of QPID-2932 changes from 0.5.x-dev branch to trunk. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1079043 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java70
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java30
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java81
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java79
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java107
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java156
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java163
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java118
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java77
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java22
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java2
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java32
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java79
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java144
-rw-r--r--qpid/java/broker/src/velocity/java/org/apache/qpid/server/logging/GenerateLogMessages.java2
-rw-r--r--qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java118
-rw-r--r--qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java121
-rw-r--r--qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java119
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java102
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java177
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsDeliveryTest.java110
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java90
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java233
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java128
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java66
-rwxr-xr-xqpid/java/test-profiles/Java010Excludes1
32 files changed, 2392 insertions, 60 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
index a612f280d6..d1ea5dba69 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
@@ -327,4 +327,74 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
{
return getObjectNameForSingleInstanceMBean();
}
+
+ public void resetStatistics() throws Exception
+ {
+ getVirtualHost().resetStatistics();
+ }
+
+ public double getPeakMessageDeliveryRate()
+ {
+ return getVirtualHost().getMessageDeliveryStatistics().getPeak();
+ }
+
+ public double getPeakDataDeliveryRate()
+ {
+ return getVirtualHost().getDataDeliveryStatistics().getPeak();
+ }
+
+ public double getMessageDeliveryRate()
+ {
+ return getVirtualHost().getMessageDeliveryStatistics().getRate();
+ }
+
+ public double getDataDeliveryRate()
+ {
+ return getVirtualHost().getDataDeliveryStatistics().getRate();
+ }
+
+ public long getTotalMessagesDelivered()
+ {
+ return getVirtualHost().getMessageDeliveryStatistics().getTotal();
+ }
+
+ public long getTotalDataDelivered()
+ {
+ return getVirtualHost().getDataDeliveryStatistics().getTotal();
+ }
+
+ public double getPeakMessageReceiptRate()
+ {
+ return getVirtualHost().getMessageReceiptStatistics().getPeak();
+ }
+
+ public double getPeakDataReceiptRate()
+ {
+ return getVirtualHost().getDataReceiptStatistics().getPeak();
+ }
+
+ public double getMessageReceiptRate()
+ {
+ return getVirtualHost().getMessageReceiptStatistics().getRate();
+ }
+
+ public double getDataReceiptRate()
+ {
+ return getVirtualHost().getDataReceiptStatistics().getRate();
+ }
+
+ public long getTotalMessagesReceived()
+ {
+ return getVirtualHost().getMessageReceiptStatistics().getTotal();
+ }
+
+ public long getTotalDataReceived()
+ {
+ return getVirtualHost().getDataReceiptStatistics().getTotal();
+ }
+
+ public boolean isStatisticsEnabled()
+ {
+ return getVirtualHost().isStatisticsEnabled();
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 1c91de6d15..dd3046cd01 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -345,6 +345,9 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
}
finally
{
+ long bodySize = _currentMessage.getSize();
+ long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeader().properties).getTimestamp();
+ _session.registerMessageReceived(bodySize, timestamp);
_currentMessage = null;
}
}
@@ -1037,6 +1040,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
{
getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(),
deliveryTag, sub.getConsumerTag());
+ _session.registerMessageDelivered(entry.getMessage().getSize());
}
};
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
index 71cf17ed60..9d3c4dd2e8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -320,8 +320,7 @@ public class Main
ConfigurationManagementMBean configMBean = new ConfigurationManagementMBean();
configMBean.register();
- ServerInformationMBean sysInfoMBean =
- new ServerInformationMBean(QpidProperties.getBuildVersion(), QpidProperties.getReleaseVersion());
+ ServerInformationMBean sysInfoMBean = new ServerInformationMBean(config);
sysInfoMBean.register();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index 7197ec8cdc..43be0611a5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -767,6 +767,36 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa
DEFAULT_HOUSEKEEPING_PERIOD));
}
+ public long getStatisticsSamplePeriod()
+ {
+ return getConfig().getLong("statistics.sample.period", 5000L);
+ }
+
+ public boolean isStatisticsGenerationBrokerEnabled()
+ {
+ return getConfig().getBoolean("statistics.generation.broker", false);
+ }
+
+ public boolean isStatisticsGenerationVirtualhostsEnabled()
+ {
+ return getConfig().getBoolean("statistics.generation.virtualhosts", false);
+ }
+
+ public boolean isStatisticsGenerationConnectionsEnabled()
+ {
+ return getConfig().getBoolean("statistics.generation.connections", false);
+ }
+
+ public long getStatisticsReportingPeriod()
+ {
+ return getConfig().getLong("statistics.reporting.period", 0L);
+ }
+
+ public boolean isStatisticsReportResetEnabled()
+ {
+ return getConfig().getBoolean("statistics.reporting.reset", false);
+ }
+
public NetworkDriverConfiguration getNetworkConfiguration()
{
return new NetworkDriverConfiguration()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java
index db2cc970b2..5e6a143d52 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java
@@ -22,9 +22,11 @@ package org.apache.qpid.server.information.management;
import java.io.IOException;
+import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.management.common.mbeans.ServerInformation;
import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import javax.management.JMException;
@@ -34,12 +36,15 @@ public class ServerInformationMBean extends AMQManagedObject implements ServerIn
{
private String buildVersion;
private String productVersion;
+ private ApplicationRegistry registry;
- public ServerInformationMBean(String buildVersion, String productVersion) throws JMException
+ public ServerInformationMBean(ApplicationRegistry applicationRegistry) throws JMException
{
super(ServerInformation.class, ServerInformation.TYPE);
- this.buildVersion = buildVersion;
- this.productVersion = productVersion;
+
+ registry = applicationRegistry;
+ buildVersion = QpidProperties.getBuildVersion();
+ productVersion = QpidProperties.getReleaseVersion();
}
public String getObjectInstanceName()
@@ -67,5 +72,75 @@ public class ServerInformationMBean extends AMQManagedObject implements ServerIn
return productVersion;
}
+
+ public void resetStatistics() throws Exception
+ {
+ registry.resetStatistics();
+ }
+
+ public double getPeakMessageDeliveryRate()
+ {
+ return registry.getMessageDeliveryStatistics().getPeak();
+ }
+
+ public double getPeakDataDeliveryRate()
+ {
+ return registry.getDataDeliveryStatistics().getPeak();
+ }
+
+ public double getMessageDeliveryRate()
+ {
+ return registry.getMessageDeliveryStatistics().getRate();
+ }
+
+ public double getDataDeliveryRate()
+ {
+ return registry.getDataDeliveryStatistics().getRate();
+ }
+
+ public long getTotalMessagesDelivered()
+ {
+ return registry.getMessageDeliveryStatistics().getTotal();
+ }
+
+ public long getTotalDataDelivered()
+ {
+ return registry.getDataDeliveryStatistics().getTotal();
+ }
+
+ public double getPeakMessageReceiptRate()
+ {
+ return registry.getMessageReceiptStatistics().getPeak();
+ }
+
+ public double getPeakDataReceiptRate()
+ {
+ return registry.getDataReceiptStatistics().getPeak();
+ }
+
+ public double getMessageReceiptRate()
+ {
+ return registry.getMessageReceiptStatistics().getRate();
+ }
+
+ public double getDataReceiptRate()
+ {
+ return registry.getDataReceiptStatistics().getRate();
+ }
+
+ public long getTotalMessagesReceived()
+ {
+ return registry.getMessageReceiptStatistics().getTotal();
+ }
+
+ public long getTotalDataReceived()
+ {
+ return registry.getDataReceiptStatistics().getTotal();
+ }
+
+ public boolean isStatisticsEnabled()
+ {
+ return registry.isStatisticsEnabled();
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties
index 6b83a7e7a5..5d1e85fe41 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties
@@ -32,4 +32,7 @@ STOPPED = BRK-1005 : Stopped
# 0 - path
CONFIG = BRK-1006 : Using configuration : {0}
# 0 - path
-LOG_CONFIG = BRK-1007 : Using logging configuration : {0} \ No newline at end of file
+LOG_CONFIG = BRK-1007 : Using logging configuration : {0}
+
+STATS_DATA = BRK-1008 : {0,choice,0#delivered|1#received} : {1,number,#.###} kB/s peak : {2,number,#} bytes total
+STATS_MSGS = BRK-1009 : {0,choice,0#delivered|1#received} : {1,number,#.###} msg/s peak : {2,number,#} msgs total \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties
index 66bbefacb0..3e640c7929 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties
@@ -20,4 +20,7 @@
#
# 0 - name
CREATED = VHT-1001 : Created : {0}
-CLOSED = VHT-1002 : Closed \ No newline at end of file
+CLOSED = VHT-1002 : Closed
+
+STATS_DATA = VHT-1003 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} kB/s peak : {3,number,#} bytes total
+STATS_MSGS = VHT-1004 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} msg/s peak : {3,number,#} msgs total` \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
index 4ef84631b4..061ebf50cd 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
@@ -26,8 +26,9 @@ import java.util.UUID;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.stats.StatisticsGatherer;
-public interface AMQConnectionModel
+public interface AMQConnectionModel extends StatisticsGatherer
{
/**
* get a unique id for this connection.
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index aef905772a..449f698c48 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -91,6 +91,7 @@ import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.transport.NetworkDriver;
@@ -171,6 +172,10 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
private final UUID _id;
private final ConfigStore _configStore;
private long _createTime = System.currentTimeMillis();
+
+ private ApplicationRegistry _registry;
+ private boolean _statisticsEnabled = false;
+ private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
public ManagedObject getManagedObject()
{
@@ -194,9 +199,10 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
_configStore = virtualHostRegistry.getConfigStore();
_id = _configStore.createId();
-
_actor.message(ConnectionMessages.OPEN(null, null, false, false));
+ _registry = virtualHostRegistry.getApplicationRegistry();
+ initialiseStatistics();
}
private AMQProtocolSessionMBean createMBean() throws JMException
@@ -1282,5 +1288,74 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
public LogSubject getLogSubject()
{
return _logSubject;
- }
+ }
+
+ public void registerMessageDelivered(long messageSize)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesDelivered.registerEvent(1L);
+ _dataDelivered.registerEvent(messageSize);
+ }
+ _virtualHost.registerMessageDelivered(messageSize);
+ }
+
+ public void registerMessageReceived(long messageSize, long timestamp)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesReceived.registerEvent(1L, timestamp);
+ _dataReceived.registerEvent(messageSize, timestamp);
+ }
+ _virtualHost.registerMessageReceived(messageSize, timestamp);
+ }
+
+ public StatisticsCounter getMessageReceiptStatistics()
+ {
+ return _messagesReceived;
+ }
+
+ public StatisticsCounter getDataReceiptStatistics()
+ {
+ return _dataReceived;
+ }
+
+ public StatisticsCounter getMessageDeliveryStatistics()
+ {
+ return _messagesDelivered;
+ }
+
+ public StatisticsCounter getDataDeliveryStatistics()
+ {
+ return _dataDelivered;
+ }
+
+ public void resetStatistics()
+ {
+ _messagesDelivered.reset();
+ _dataDelivered.reset();
+ _messagesReceived.reset();
+ _dataReceived.reset();
+ }
+
+ public void initialiseStatistics()
+ {
+ setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
+ _registry.getConfiguration().isStatisticsGenerationConnectionsEnabled());
+
+ _messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID());
+ _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID());
+ _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID());
+ _dataReceived = new StatisticsCounter("data-received-" + getSessionID());
+ }
+
+ public boolean isStatisticsEnabled()
+ {
+ return _statisticsEnabled;
+ }
+
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ _statisticsEnabled = enabled;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
index 9d0d63b18e..fcac78fafa 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
@@ -37,19 +37,8 @@
*/
package org.apache.qpid.server.protocol;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.management.common.mbeans.ManagedConnection;
-import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
-import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.ManagementActor;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.ManagedObject;
+import java.util.Date;
+import java.util.List;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -67,8 +56,20 @@ import javax.management.openmbean.SimpleType;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
-import java.util.Date;
-import java.util.List;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.ManagementActor;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.ManagedObject;
/**
* This MBean class implements the management interface. In order to make more attributes, operations and notifications
@@ -339,4 +340,78 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
_broadcaster.sendNotification(n);
}
-} // End of MBean class
+ public void resetStatistics() throws Exception
+ {
+ _protocolSession.resetStatistics();
+ }
+
+ public double getPeakMessageDeliveryRate()
+ {
+ return _protocolSession.getMessageDeliveryStatistics().getPeak();
+ }
+
+ public double getPeakDataDeliveryRate()
+ {
+ return _protocolSession.getDataDeliveryStatistics().getPeak();
+ }
+
+ public double getMessageDeliveryRate()
+ {
+ return _protocolSession.getMessageDeliveryStatistics().getRate();
+ }
+
+ public double getDataDeliveryRate()
+ {
+ return _protocolSession.getDataDeliveryStatistics().getRate();
+ }
+
+ public long getTotalMessagesDelivered()
+ {
+ return _protocolSession.getMessageDeliveryStatistics().getTotal();
+ }
+
+ public long getTotalDataDelivered()
+ {
+ return _protocolSession.getDataDeliveryStatistics().getTotal();
+ }
+
+ public double getPeakMessageReceiptRate()
+ {
+ return _protocolSession.getMessageReceiptStatistics().getPeak();
+ }
+
+ public double getPeakDataReceiptRate()
+ {
+ return _protocolSession.getDataReceiptStatistics().getPeak();
+ }
+
+ public double getMessageReceiptRate()
+ {
+ return _protocolSession.getMessageReceiptStatistics().getRate();
+ }
+
+ public double getDataReceiptRate()
+ {
+ return _protocolSession.getDataReceiptStatistics().getRate();
+ }
+
+ public long getTotalMessagesReceived()
+ {
+ return _protocolSession.getMessageReceiptStatistics().getTotal();
+ }
+
+ public long getTotalDataReceived()
+ {
+ return _protocolSession.getDataReceiptStatistics().getTotal();
+ }
+
+ public boolean isStatisticsEnabled()
+ {
+ return _protocolSession.isStatisticsEnabled();
+ }
+
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ _protocolSession.setStatisticsEnabled(enabled);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index 78a642f22f..72b2a68450 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.registry;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.UUID;
import org.apache.commons.configuration.ConfigurationException;
@@ -41,11 +43,12 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.logging.CompositeStartupMessageLogger;
import org.apache.qpid.server.logging.Log4jMessageLogger;
import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.AbstractRootMessageLogger;
import org.apache.qpid.server.logging.SystemOutMessageLogger;
+import org.apache.qpid.server.logging.actors.AbstractActor;
import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.BrokerMessages;
+import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.management.ManagedObjectRegistry;
import org.apache.qpid.server.management.NoopManagedObjectRegistry;
import org.apache.qpid.server.plugins.PluginManager;
@@ -54,6 +57,7 @@ import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalD
import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.transport.QpidAcceptor;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -104,6 +108,10 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
private ConfigStore _configStore;
protected String _registryName;
+
+ private Timer _reportingTimer;
+ private boolean _statisticsEnabled = false;
+ private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
static
{
@@ -294,6 +302,8 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
try
{
initialiseVirtualHosts();
+ initialiseStatistics();
+ initialiseStatisticsReporting();
}
finally
{
@@ -320,6 +330,72 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
{
_managedObjectRegistry = new NoopManagedObjectRegistry();
}
+
+ public void initialiseStatisticsReporting()
+ {
+ long report = _configuration.getStatisticsReportingPeriod() * 1000; // convert to ms
+ final boolean broker = _configuration.isStatisticsGenerationBrokerEnabled();
+ final boolean virtualhost = _configuration.isStatisticsGenerationVirtualhostsEnabled();
+ final boolean reset = _configuration.isStatisticsReportResetEnabled();
+
+ /* add a timer task to report statistics if generation is enabled for broker or virtualhosts */
+ if (report > 0L && (broker || virtualhost))
+ {
+ _reportingTimer = new Timer("Statistics-Reporting", true);
+
+ class StatisticsReportingTask extends TimerTask
+ {
+ private final int DELIVERED = 0;
+ private final int RECEIVED = 1;
+
+ public void run()
+ {
+ CurrentActor.set(new AbstractActor(ApplicationRegistry.getInstance().getRootMessageLogger()) {
+ public String getLogMessage()
+ {
+ return "[" + Thread.currentThread().getName() + "] ";
+ }
+ });
+
+ if (broker)
+ {
+ CurrentActor.get().message(BrokerMessages.STATS_DATA(DELIVERED, _dataDelivered.getPeak() / 1024.0, _dataDelivered.getTotal()));
+ CurrentActor.get().message(BrokerMessages.STATS_MSGS(DELIVERED, _messagesDelivered.getPeak(), _messagesDelivered.getTotal()));
+ CurrentActor.get().message(BrokerMessages.STATS_DATA(RECEIVED, _dataReceived.getPeak() / 1024.0, _dataReceived.getTotal()));
+ CurrentActor.get().message(BrokerMessages.STATS_MSGS(RECEIVED, _messagesReceived.getPeak(), _messagesReceived.getTotal()));
+ }
+
+ if (virtualhost)
+ {
+ for (VirtualHost vhost : getVirtualHostRegistry().getVirtualHosts())
+ {
+ String name = vhost.getName();
+ StatisticsCounter dataDelivered = vhost.getDataDeliveryStatistics();
+ StatisticsCounter messagesDelivered = vhost.getMessageDeliveryStatistics();
+ StatisticsCounter dataReceived = vhost.getDataReceiptStatistics();
+ StatisticsCounter messagesReceived = vhost.getMessageReceiptStatistics();
+
+ CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, DELIVERED, dataDelivered.getPeak() / 1024.0, dataDelivered.getTotal()));
+ CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, DELIVERED, messagesDelivered.getPeak(), messagesDelivered.getTotal()));
+ CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, RECEIVED, dataReceived.getPeak() / 1024.0, dataReceived.getTotal()));
+ CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, RECEIVED, messagesReceived.getPeak(), messagesReceived.getTotal()));
+ }
+ }
+
+ if (reset)
+ {
+ resetStatistics();
+ }
+
+ CurrentActor.remove();
+ }
+ }
+
+ _reportingTimer.scheduleAtFixedRate(new StatisticsReportingTask(),
+ report / 2,
+ report);
+ }
+ }
public static IApplicationRegistry getInstance()
{
@@ -369,6 +445,12 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
{
_logger.info("Shutting down ApplicationRegistry:" + this);
}
+
+ //Stop Statistics Reporting
+ if (_reportingTimer != null)
+ {
+ _reportingTimer.cancel();
+ }
//Stop incoming connections
unbind();
@@ -498,4 +580,76 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
getBroker().addVirtualHost(virtualHost);
return virtualHost;
}
+
+ public void registerMessageDelivered(long messageSize)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesDelivered.registerEvent(1L);
+ _dataDelivered.registerEvent(messageSize);
+ }
+ }
+
+ public void registerMessageReceived(long messageSize, long timestamp)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesReceived.registerEvent(1L, timestamp);
+ _dataReceived.registerEvent(messageSize, timestamp);
+ }
+ }
+
+ public StatisticsCounter getMessageReceiptStatistics()
+ {
+ return _messagesReceived;
+ }
+
+ public StatisticsCounter getDataReceiptStatistics()
+ {
+ return _dataReceived;
+ }
+
+ public StatisticsCounter getMessageDeliveryStatistics()
+ {
+ return _messagesDelivered;
+ }
+
+ public StatisticsCounter getDataDeliveryStatistics()
+ {
+ return _dataDelivered;
+ }
+
+ public void resetStatistics()
+ {
+ _messagesDelivered.reset();
+ _dataDelivered.reset();
+ _messagesReceived.reset();
+ _dataReceived.reset();
+
+ for (VirtualHost vhost : _virtualHostRegistry.getVirtualHosts())
+ {
+ vhost.resetStatistics();
+ }
+ }
+
+ public void initialiseStatistics()
+ {
+ setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
+ getConfiguration().isStatisticsGenerationBrokerEnabled());
+
+ _messagesDelivered = new StatisticsCounter("messages-delivered");
+ _dataDelivered = new StatisticsCounter("bytes-delivered");
+ _messagesReceived = new StatisticsCounter("messages-received");
+ _dataReceived = new StatisticsCounter("bytes-received");
+ }
+
+ public boolean isStatisticsEnabled()
+ {
+ return _statisticsEnabled;
+ }
+
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ _statisticsEnabled = enabled;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
index 228c3b9112..0ef55097ce 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
@@ -35,11 +35,12 @@ import org.apache.qpid.server.plugins.PluginManager;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.transport.QpidAcceptor;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-public interface IApplicationRegistry
+public interface IApplicationRegistry extends StatisticsGatherer
{
/**
* Initialise the application registry. All initialisation must be done in this method so that any components
@@ -97,4 +98,6 @@ public interface IApplicationRegistry
ConfigStore getConfigStore();
void setConfigStore(ConfigStore store);
+
+ void initialiseStatisticsReporting();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
new file mode 100644
index 0000000000..b732121180
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.stats;
+
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class collects statistics and counts the total, rate per second and
+ * peak rate per second values for the events that are registered with it.
+ */
+public class StatisticsCounter
+{
+ private static final Logger _log = LoggerFactory.getLogger(StatisticsCounter.class);
+
+ public static final long DEFAULT_SAMPLE_PERIOD = Long.getLong("qpid.statistics.samplePeriod", 2000L); // 2s
+ public static final boolean DISABLE_STATISTICS = Boolean.getBoolean("qpid.statistics.disable");
+
+ private static final String COUNTER = "counter";
+ private static final AtomicLong _counterIds = new AtomicLong(0L);
+
+ private long _peak = 0L;
+ private long _total = 0L;
+ private long _temp = 0L;
+ private long _last = 0L;
+ private long _rate = 0L;
+
+ private long _start;
+
+ private final long _period;
+ private final String _name;
+
+ public StatisticsCounter()
+ {
+ this(COUNTER);
+ }
+
+ public StatisticsCounter(String name)
+ {
+ this(name, DEFAULT_SAMPLE_PERIOD);
+ }
+
+ public StatisticsCounter(String name, long period)
+ {
+ _period = period;
+ _name = name + "-" + + _counterIds.incrementAndGet();
+ reset();
+ }
+
+ public void registerEvent()
+ {
+ registerEvent(1L);
+ }
+
+ public void registerEvent(long value)
+ {
+ registerEvent(value, System.currentTimeMillis());
+ }
+
+ public void registerEvent(long value, long timestamp)
+ {
+ if (DISABLE_STATISTICS)
+ {
+ return;
+ }
+
+ long thisSample = (timestamp / _period);
+ synchronized (this)
+ {
+ if (thisSample > _last)
+ {
+ _last = thisSample;
+ _rate = _temp;
+ _temp = 0L;
+ if (_rate > _peak)
+ {
+ _peak = _rate;
+ }
+ }
+
+ _total += value;
+ _temp += value;
+ }
+ }
+
+ /**
+ * Update the current rate and peak - may reset rate to zero if a new
+ * sample period has started.
+ */
+ private void update()
+ {
+ registerEvent(0L, System.currentTimeMillis());
+ }
+
+ /**
+ * Reset
+ */
+ public void reset()
+ {
+ _log.info("Resetting statistics for counter: " + _name);
+ _peak = 0L;
+ _rate = 0L;
+ _total = 0L;
+ _start = System.currentTimeMillis();
+ _last = _start / _period;
+ }
+
+ public double getPeak()
+ {
+ update();
+ return (double) _peak / ((double) _period / 1000.0d);
+ }
+
+ public double getRate()
+ {
+ update();
+ return (double) _rate / ((double) _period / 1000.0d);
+ }
+
+ public long getTotal()
+ {
+ return _total;
+ }
+
+ public long getStart()
+ {
+ return _start;
+ }
+
+ public Date getStartTime()
+ {
+ return new Date(_start);
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public long getPeriod()
+ {
+ return _period;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
new file mode 100644
index 0000000000..36fec4025a
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.stats;
+
+/**
+ * This interface is to be implemented by any broker business object that
+ * wishes to gather statistics about messages delivered through it.
+ *
+ * These statistics are exposed using a separate JMX Mbean interface, which
+ * calls these methods to retrieve the underlying {@link StatisticsCounter}s
+ * and return their attributes. This interface gives a standard way for
+ * parts of the broker to set up and configure statistics generation.
+ * <p>
+ * When creating these objects, there should be a parent/child relationship
+ * between them, such that the lowest level gatherer can record staticics if
+ * enabled, and pass on the notification to the parent object to allow higher
+ * level aggregation. When resetting statistics, this works in the opposite
+ * direction, with higher level gatherers also resetting all of their children.
+ */
+public interface StatisticsGatherer
+{
+ /**
+ * Initialise the statistics gathering for this object.
+ *
+ * This method is responsible for creating any {@link StatisticsCounter}
+ * objects and for determining whether statistics generation should be
+ * enabled, by checking broker and system configuration.
+ *
+ * @see StatisticsCounter#DISABLE_STATISTICS
+ */
+ void initialiseStatistics();
+
+ /**
+ * This method is responsible for registering the receipt of a message
+ * with the counters, and also for passing this notification to any parent
+ * {@link StatisticsGatherer}s. If statistics generation is not enabled,
+ * then this method should simple delegate to the parent gatherer.
+ *
+ * @param messageSize the size in bytes of the delivered message
+ * @param timestamp the time the message was delivered
+ */
+ void registerMessageReceived(long messageSize, long timestamp);
+
+ /**
+ * This method is responsible for registering the delivery of a message
+ * with the counters. Message delivery is recorded by the counter using
+ * the current system time, as opposed to the message timestamp.
+ *
+ * @param messageSize the size in bytes of the delivered message
+ * @see #registerMessageReceived(long, long)
+ */
+ void registerMessageDelivered(long messageSize);
+
+ /**
+ * Gives access to the {@link StatisticsCounter} that is used to count
+ * delivered message statistics.
+ *
+ * @return the {@link StatisticsCounter} that counts delivered messages
+ */
+ StatisticsCounter getMessageDeliveryStatistics();
+
+ /**
+ * Gives access to the {@link StatisticsCounter} that is used to count
+ * received message statistics.
+ *
+ * @return the {@link StatisticsCounter} that counts received messages
+ */
+ StatisticsCounter getMessageReceiptStatistics();
+
+ /**
+ * Gives access to the {@link StatisticsCounter} that is used to count
+ * delivered message size statistics.
+ *
+ * @return the {@link StatisticsCounter} that counts delivered bytes
+ */
+ StatisticsCounter getDataDeliveryStatistics();
+
+ /**
+ * Gives access to the {@link StatisticsCounter} that is used to count
+ * received message size statistics.
+ *
+ * @return the {@link StatisticsCounter} that counts received bytes
+ */
+ StatisticsCounter getDataReceiptStatistics();
+
+ /**
+ * Reset the counters for this, and any child {@link StatisticsGatherer}s.
+ */
+ void resetStatistics();
+
+ /**
+ * Check if this object has statistics generation enabled.
+ *
+ * @return true if statistics generation is enabled
+ */
+ boolean isStatisticsEnabled();
+
+ /**
+ * Enable or disable statistics generation for this object.
+ */
+ void setStatisticsEnabled(boolean enabled);
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index e635ad0188..75bd50e3a2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -38,6 +38,8 @@ import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionCloseCode;
@@ -54,6 +56,10 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
private AtomicBoolean _logClosed = new AtomicBoolean(false);
private LogActor _actor = GenericActor.getInstance(this);
+ private ApplicationRegistry _registry;
+ private boolean _statisticsEnabled = false;
+ private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+
public ServerConnection()
{
@@ -121,6 +127,8 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
{
_virtualHost = virtualHost;
_virtualHost.getConnectionRegistry().registerConnection(this);
+
+ initialiseStatistics();
}
public void setConnectionConfig(final ConnectionConfig config)
@@ -258,4 +266,73 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
}
return sessions;
}
+
+ public void registerMessageDelivered(long messageSize)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesDelivered.registerEvent(1L);
+ _dataDelivered.registerEvent(messageSize);
+ }
+ _virtualHost.registerMessageDelivered(messageSize);
+ }
+
+ public void registerMessageReceived(long messageSize, long timestamp)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesReceived.registerEvent(1L, timestamp);
+ _dataReceived.registerEvent(messageSize, timestamp);
+ }
+ _virtualHost.registerMessageReceived(messageSize, timestamp);
+ }
+
+ public StatisticsCounter getMessageReceiptStatistics()
+ {
+ return _messagesReceived;
+ }
+
+ public StatisticsCounter getDataReceiptStatistics()
+ {
+ return _dataReceived;
+ }
+
+ public StatisticsCounter getMessageDeliveryStatistics()
+ {
+ return _messagesDelivered;
+ }
+
+ public StatisticsCounter getDataDeliveryStatistics()
+ {
+ return _dataDelivered;
+ }
+
+ public void resetStatistics()
+ {
+ _messagesDelivered.reset();
+ _dataDelivered.reset();
+ _messagesReceived.reset();
+ _dataReceived.reset();
+ }
+
+ public void initialiseStatistics()
+ {
+ setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
+ _virtualHost.getApplicationRegistry().getConfiguration().isStatisticsGenerationConnectionsEnabled());
+
+ _messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId());
+ _dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId());
+ _messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId());
+ _dataReceived = new StatisticsCounter("data-received-" + getConnectionId());
+ }
+
+ public boolean isStatisticsEnabled()
+ {
+ return _statisticsEnabled;
+ }
+
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ _statisticsEnabled = enabled;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
index fb27dec949..174dcbfa69 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
@@ -20,26 +20,28 @@
*/
package org.apache.qpid.server.transport;
-import org.apache.qpid.transport.*;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.common.ClientProperties;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import javax.security.sasl.SaslServer;
-import javax.security.sasl.SaslException;
-import java.util.*;
+import org.apache.qpid.transport.*;
public class ServerConnectionDelegate extends ServerDelegate
{
private String _localFQDN;
private final IApplicationRegistry _appRegistry;
-
public ServerConnectionDelegate(IApplicationRegistry appRegistry, String localFQDN)
{
this(new HashMap<String,Object>(Collections.singletonMap("qpid.federation_tag",appRegistry.getBroker().getFederationTag())), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 714b2aa61f..60c94b43c0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -166,6 +166,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
public void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues)
{
+ getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
_transaction.enqueue(queues,message, new ServerTransaction.Action()
{
@@ -202,6 +203,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
Runnable postIdSettingAction)
{
invoke(xfr, postIdSettingAction);
+ getConnectionModel().registerMessageDelivered(xfr.getBodySize());
}
public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 4ed0507228..04f19b79bb 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -20,30 +20,28 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.UUID;
+
import org.apache.qpid.common.Closeable;
+import org.apache.qpid.server.binding.BindingFactory;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.VirtualHostConfig;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.configuration.VirtualHostConfig;
-import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TransactionLog;
-import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.binding.BindingFactory;
-
-import java.util.List;
-import java.util.UUID;
-import java.util.TimerTask;
-import java.util.concurrent.FutureTask;
+import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TransactionLog;
-public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable
+public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable, StatisticsGatherer
{
IConnectionRegistry getConnectionRegistry();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index a1566917dd..5374a56f06 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -73,6 +73,7 @@ import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
@@ -112,6 +113,8 @@ public class VirtualHostImpl implements VirtualHost
private BrokerConfig _broker;
private UUID _id;
+ private boolean _statisticsEnabled = false;
+ private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
private final long _createTime = System.currentTimeMillis();
private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>();
@@ -250,6 +253,8 @@ public class VirtualHostImpl implements VirtualHost
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
_brokerMBean.register();
initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod());
+
+ initialiseStatistics();
}
private void initialiseHouseKeeping(long period)
@@ -639,6 +644,80 @@ public class VirtualHostImpl implements VirtualHost
{
return _bindingFactory;
}
+
+ public void registerMessageDelivered(long messageSize)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesDelivered.registerEvent(1L);
+ _dataDelivered.registerEvent(messageSize);
+ }
+ _appRegistry.registerMessageDelivered(messageSize);
+ }
+
+ public void registerMessageReceived(long messageSize, long timestamp)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesReceived.registerEvent(1L, timestamp);
+ _dataReceived.registerEvent(messageSize, timestamp);
+ }
+ _appRegistry.registerMessageReceived(messageSize, timestamp);
+ }
+
+ public StatisticsCounter getMessageReceiptStatistics()
+ {
+ return _messagesReceived;
+ }
+
+ public StatisticsCounter getDataReceiptStatistics()
+ {
+ return _dataReceived;
+ }
+
+ public StatisticsCounter getMessageDeliveryStatistics()
+ {
+ return _messagesDelivered;
+ }
+
+ public StatisticsCounter getDataDeliveryStatistics()
+ {
+ return _dataDelivered;
+ }
+
+ public void resetStatistics()
+ {
+ _messagesDelivered.reset();
+ _dataDelivered.reset();
+ _messagesReceived.reset();
+ _dataReceived.reset();
+
+ for (AMQConnectionModel connection : _connectionRegistry.getConnections())
+ {
+ connection.resetStatistics();
+ }
+ }
+
+ public void initialiseStatistics()
+ {
+ setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
+ _appRegistry.getConfiguration().isStatisticsGenerationVirtualhostsEnabled());
+
+ _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName());
+ _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName());
+ _messagesReceived = new StatisticsCounter("messages-received-" + getName());
+ _dataReceived = new StatisticsCounter("bytes-received-" + getName());
+ }
+
+ public boolean isStatisticsEnabled()
+ {
+ return _statisticsEnabled;
+ }
+
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ _statisticsEnabled = enabled;
+ }
public void createBrokerConnection(final String transport,
final String host,
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java
new file mode 100644
index 0000000000..fbaa1342c9
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java
@@ -0,0 +1,144 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.stats;
+
+import junit.framework.TestCase;
+
+/**
+ * Unit tests for the {@link StatisticsCounter} class.
+ */
+public class StatisticsCounterTest extends TestCase
+{
+ /**
+ * Check that statistics counters are created correctly.
+ */
+ public void testCreate()
+ {
+ long before = System.currentTimeMillis();
+ StatisticsCounter counter = new StatisticsCounter("name", 1234L);
+ long after = System.currentTimeMillis();
+
+ assertTrue(before <= counter.getStart());
+ assertTrue(after >= counter.getStart());
+ assertTrue(counter.getName().startsWith("name-"));
+ assertEquals(1234L, counter.getPeriod());
+ }
+
+ /**
+ * Check that totals add up correctly.
+ */
+ public void testTotal()
+ {
+ StatisticsCounter counter = new StatisticsCounter("test", 1000L);
+ long start = counter.getStart();
+ for (int i = 0; i < 100; i++)
+ {
+ counter.registerEvent(i, start + i);
+ }
+ assertEquals(99 * 50, counter.getTotal()); // cf. Gauss
+ }
+
+ /**
+ * Test totals add up correctly even when messages are delivered
+ * out-of-order.
+ */
+ public void testTotalOutOfOrder()
+ {
+ StatisticsCounter counter = new StatisticsCounter("test", 1000L);
+ long start = counter.getStart();
+ assertEquals(0, counter.getTotal());
+ counter.registerEvent(10, start + 2500);
+ assertEquals(10, counter.getTotal());
+ counter.registerEvent(20, start + 1500);
+ assertEquals(30, counter.getTotal());
+ counter.registerEvent(10, start + 500);
+ assertEquals(40, counter.getTotal());
+ }
+
+ /**
+ * Test that the peak rate is reported correctly.
+ */
+ public void testPeak() throws Exception
+ {
+ StatisticsCounter counter = new StatisticsCounter("test", 1000L);
+ long start = counter.getStart();
+ assertEquals(0.0, counter.getPeak());
+ Thread.sleep(500);
+ counter.registerEvent(1000, start + 500);
+ Thread.sleep(1000);
+ assertEquals(1000.0, counter.getPeak());
+ counter.registerEvent(2000, start + 1500);
+ Thread.sleep(1000);
+ assertEquals(2000.0, counter.getPeak());
+ counter.registerEvent(1000, start + 2500);
+ Thread.sleep(1000);
+ assertEquals(2000.0, counter.getPeak());
+ }
+
+ /**
+ * Test that peak rate is reported correctly for out-of-order messages,
+ * and the total is also unaffected.
+ */
+ public void testPeakOutOfOrder() throws Exception
+ {
+ StatisticsCounter counter = new StatisticsCounter("test", 1000L);
+ long start = counter.getStart();
+ assertEquals(0.0, counter.getPeak());
+ counter.registerEvent(1000, start + 2500);
+ Thread.sleep(1500);
+ assertEquals(0.0, counter.getPeak());
+ counter.registerEvent(2000, start + 1500);
+ Thread.sleep(1000L);
+ assertEquals(0.0, counter.getPeak());
+ counter.registerEvent(1000, start + 500);
+ Thread.sleep(1500);
+ assertEquals(4000.0, counter.getPeak());
+ Thread.sleep(2000);
+ assertEquals(4000.0, counter.getPeak());
+ counter.registerEvent(1000, start + 500);
+ assertEquals(4000.0, counter.getPeak());
+ Thread.sleep(2000);
+ counter.registerEvent(1000);
+ assertEquals(4000.0, counter.getPeak());
+ assertEquals(6000, counter.getTotal());
+ }
+
+ /**
+ * Test the current rate is generated correctly.
+ */
+ public void testRate() throws Exception
+ {
+ StatisticsCounter counter = new StatisticsCounter("test", 1000L);
+ assertEquals(0.0, counter.getRate());
+ Thread.sleep(500);
+ counter.registerEvent(1000);
+ Thread.sleep(1000);
+ assertEquals(1000.0, counter.getRate());
+ counter.registerEvent(2000);
+ Thread.sleep(1000);
+ assertEquals(2000.0, counter.getRate());
+ counter.registerEvent(1000);
+ Thread.sleep(1000);
+ assertEquals(1000.0, counter.getRate());
+ Thread.sleep(1000);
+ assertEquals(0.0, counter.getRate());
+ }
+}
diff --git a/qpid/java/broker/src/velocity/java/org/apache/qpid/server/logging/GenerateLogMessages.java b/qpid/java/broker/src/velocity/java/org/apache/qpid/server/logging/GenerateLogMessages.java
index 902b86f80b..a39799a6b6 100644
--- a/qpid/java/broker/src/velocity/java/org/apache/qpid/server/logging/GenerateLogMessages.java
+++ b/qpid/java/broker/src/velocity/java/org/apache/qpid/server/logging/GenerateLogMessages.java
@@ -481,7 +481,7 @@ public class GenerateLogMessages
// Only check the text inside the braces '{}'
int typeIndexEnd = parametersString[index].indexOf("}", typeIndex);
String typeString = parametersString[index].substring(typeIndex, typeIndexEnd);
- if (typeString.contains("number"))
+ if (typeString.contains("number") || typeString.contains("choice"))
{
type = "Number";
}
diff --git a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java
index c0e7293611..b48ad3f856 100644
--- a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java
+++ b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java
@@ -36,8 +36,8 @@ import org.apache.qpid.management.common.mbeans.annotations.MBeanOperationParame
* The ManagedBroker is the management interface to expose management
* features of the Broker.
*
- * @author Bhupendra Bhardwaj
- * @version 0.1
+ * @version Qpid JMX API 2.1
+ * @since Qpid JMX API 1.3
*/
public interface ManagedBroker
{
@@ -131,4 +131,118 @@ public interface ManagedBroker
impact= MBeanOperationInfo.ACTION)
void deleteQueue(@MBeanOperationParameter(name= ManagedQueue.TYPE, description="Queue Name")String queueName)
throws IOException, JMException, MBeanException;
+
+ /**
+ * Resets all message and data statistics for the virtual host.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanOperation(name="resetStatistics",
+ description="Resets all message and data statistics for the virtual host",
+ impact= MBeanOperationInfo.ACTION)
+ void resetStatistics() throws Exception;
+
+ /**
+ * Peak rate of messages delivered per second for the virtual host.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="PeakMessageDeliveryRate", description=TYPE + " Peak Message Delivery Rate")
+ double getPeakMessageDeliveryRate();
+
+ /**
+ * Peak rate of bytes delivered per second for the virtual host.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="PeakDataDeliveryRate", description=TYPE + " Peak Data Delivery Rate")
+ double getPeakDataDeliveryRate();
+
+ /**
+ * Rate of messages delivered per second for the virtual host.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="MessageDeliveryRate", description=TYPE + " Message Delivery Rate")
+ double getMessageDeliveryRate();
+
+ /**
+ * Rate of bytes delivered per second for the virtual host.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="DataDeliveryRate", description=TYPE + " Data Delivery Rate")
+ double getDataDeliveryRate();
+
+ /**
+ * Total count of messages delivered for the virtual host.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="TotalMessagesDelivered", description=TYPE + " Total Messages Delivered")
+ long getTotalMessagesDelivered();
+
+ /**
+ * Total count of bytes for the virtual host.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="TotalDataDelivered", description=TYPE + " Total Data Delivered")
+ long getTotalDataDelivered();
+
+ /**
+ * Peak rate of messages received per second for the virtual host.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="PeakMessageReceiptRate", description=TYPE + " Peak Message Receipt Rate")
+ double getPeakMessageReceiptRate();
+
+ /**
+ * Peak rate of bytes received per second for the virtual host.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="PeakDataReceiptRate", description=TYPE + " Peak Data Receipt Rate")
+ double getPeakDataReceiptRate();
+
+ /**
+ * Rate of messages received per second for the virtual host.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="MessageReceiptRate", description=TYPE + " Message Receipt Rate")
+ double getMessageReceiptRate();
+
+ /**
+ * Rate of bytes received per second for the virtual host.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="DataReceiptRate", description=TYPE + " Data Receipt Rate")
+ double getDataReceiptRate();
+
+ /**
+ * Total count of messages received for the virtual host.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="TotalMessagesReceived", description=TYPE + " Total Messages Received")
+ long getTotalMessagesReceived();
+
+ /**
+ * Total count of bytes received for the virtual host.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="TotalDataReceived", description=TYPE + " Total Data Received")
+ long getTotalDataReceived();
+
+ /**
+ * Is statistics collection enabled for this connection.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="StatisticsEnabled", description=TYPE + " Statistics Enabled")
+ boolean isStatisticsEnabled();
}
diff --git a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java
index d6b79d1dde..6ef0fd5b19 100644
--- a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java
+++ b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java
@@ -37,8 +37,9 @@ import org.apache.qpid.management.common.mbeans.annotations.MBeanOperationParame
/**
* The management interface exposed to allow management of Connections.
- * @author Bhupendra Bhardwaj
- * @version 0.1
+ *
+ * @version Qpid JMX API 2.1
+ * @since Qpid JMX API 1.3
*/
public interface ManagedConnection
{
@@ -145,4 +146,120 @@ public interface ManagedConnection
description="Closes this connection and all related channels",
impact= MBeanOperationInfo.ACTION)
void closeConnection() throws Exception;
+
+ /**
+ * Resets message and data statistics for this connection.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanOperation(name="resetStatistics",
+ description="Resets message and data statistics for this connection",
+ impact= MBeanOperationInfo.ACTION)
+ void resetStatistics() throws Exception;
+
+ /**
+ * Peak rate of messages delivered per second for the virtual host.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="PeakMessageDeliveryRate", description=TYPE + " Peak Message Delivery Rate")
+ double getPeakMessageDeliveryRate();
+
+ /**
+ * Peak rate of bytes delivered per second for the virtual host.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="PeakDataDeliveryRate", description=TYPE + " Peak Data Delivery Rate")
+ double getPeakDataDeliveryRate();
+
+ /**
+ * Rate of messages delivered per second for the virtual host.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="MessageDeliveryRate", description=TYPE + " Message Delivery Rate")
+ double getMessageDeliveryRate();
+
+ /**
+ * Rate of bytes delivered per second for the virtual host.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="DataDeliveryRate", description=TYPE + " Data Delivery Rate")
+ double getDataDeliveryRate();
+
+ /**
+ * Total count of messages delivered for the virtual host.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="TotalMessagesDelivered", description=TYPE + " Total Messages Delivered")
+ long getTotalMessagesDelivered();
+
+ /**
+ * Total count of bytes for the virtual host.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="TotalDataDelivered", description=TYPE + " Total Data Delivered")
+ long getTotalDataDelivered();
+
+ /**
+ * Peak rate of messages received per second for this connection.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="PeakMessageReceiptRate", description=TYPE + " Peak Message Receipt Rate")
+ double getPeakMessageReceiptRate();
+
+ /**
+ * Peak rate of bytes received per second for this connection.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="PeakDataReceiptRate", description=TYPE + " Peak Data Receipt Rate")
+ double getPeakDataReceiptRate();
+
+ /**
+ * Rate of messages received per second for this connection.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="MessageReceiptRate", description=TYPE + " Message Receipt Rate")
+ double getMessageReceiptRate();
+
+ /**
+ * Rate of bytes received per second for this connection.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="DataReceiptRate", description=TYPE + " Data Receipt Rate")
+ double getDataReceiptRate();
+
+ /**
+ * Total count of messages received for this connection.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="TotalMessagesReceived", description=TYPE + " Total Messages Received")
+ long getTotalMessagesReceived();
+
+ /**
+ * Total count of bytes received for this connection.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="TotalDataReceived", description=TYPE + " Total Data Received")
+ long getTotalDataReceived();
+
+ /**
+ * Is statistics collection enabled for this connection.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="StatisticsEnabled", description=TYPE + " Statistics Enabled")
+ boolean isStatisticsEnabled();
+
+ void setStatisticsEnabled(boolean enabled);
}
diff --git a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java
index bfcb10189f..abee6a745e 100644
--- a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java
+++ b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java
@@ -22,10 +22,15 @@ package org.apache.qpid.management.common.mbeans;
import java.io.IOException;
+import javax.management.MBeanOperationInfo;
+
import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation;
/**
* Interface for the ServerInformation MBean
+ *
+ * @version Qpid JMX API 2.1
* @since Qpid JMX API 1.3
*/
public interface ServerInformation
@@ -80,4 +85,118 @@ public interface ServerInformation
@MBeanAttribute(name="ProductVersion",
description = "The product version string")
String getProductVersion() throws IOException;
+
+ /**
+ * Resets all message and data statistics for the broker.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanOperation(name="resetStatistics",
+ description="Resets all message and data statistics for the broker",
+ impact= MBeanOperationInfo.ACTION)
+ void resetStatistics() throws Exception;
+
+ /**
+ * Peak rate of messages delivered per second for the virtual host.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="PeakMessageDeliveryRate", description=TYPE + " Peak Message Delivery Rate")
+ double getPeakMessageDeliveryRate();
+
+ /**
+ * Peak rate of bytes delivered per second for the broker.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="PeakDataDeliveryRate", description=TYPE + " Peak Data Delivery Rate")
+ double getPeakDataDeliveryRate();
+
+ /**
+ * Rate of messages delivered per second for the broker.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="MessageDeliveryRate", description=TYPE + " Message Delivery Rate")
+ double getMessageDeliveryRate();
+
+ /**
+ * Rate of bytes delivered per second for the broker.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="DataDeliveryRate", description=TYPE + " Data Delivery Rate")
+ double getDataDeliveryRate();
+
+ /**
+ * Total count of messages delivered for the broker.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="TotalMessagesDelivered", description=TYPE + " Total Messages Delivered")
+ long getTotalMessagesDelivered();
+
+ /**
+ * Total count of bytes for the broker.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="TotalDataDelivered", description=TYPE + " Total Data Delivered")
+ long getTotalDataDelivered();
+
+ /**
+ * Peak rate of messages received per second for the broker.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="PeakMessageReceiptRate", description=TYPE + " Peak Message Receipt Rate")
+ double getPeakMessageReceiptRate();
+
+ /**
+ * Peak rate of bytes received per second for the broker.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="PeakDataReceiptRate", description=TYPE + " Peak Data Receipt Rate")
+ double getPeakDataReceiptRate();
+
+ /**
+ * Rate of messages received per second for the broker.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="MessageReceiptRate", description=TYPE + " Message Receipt Rate")
+ double getMessageReceiptRate();
+
+ /**
+ * Rate of bytes received per second for the broker.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="DataReceiptRate", description=TYPE + " Data Receipt Rate")
+ double getDataReceiptRate();
+
+ /**
+ * Total count of messages received for the broker.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="TotalMessagesReceived", description=TYPE + " Total Messages Received")
+ long getTotalMessagesReceived();
+
+ /**
+ * Total count of bytes received for the broker.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="TotalDataReceived", description=TYPE + " Total Data Received")
+ long getTotalDataReceived();
+
+ /**
+ * Is statistics collection enabled for this connection.
+ *
+ * @since Qpid JMX API 2.1
+ */
+ @MBeanAttribute(name="StatisticsEnabled", description=TYPE + " Statistics Enabled")
+ boolean isStatisticsEnabled();
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java
new file mode 100644
index 0000000000..9839c6e475
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.jmx;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.management.common.mbeans.ManagedBroker;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
+
+/**
+ * Test enabling generation of message statistics on a per-connection basis.
+ */
+public class MessageConnectionStatisticsTest extends MessageStatisticsTestCase
+{
+ public void configureStatistics() throws Exception
+ {
+ // no statistics generation configured
+ }
+
+ /**
+ * Test statistics on a single connection
+ */
+ public void testEnablingStatisticsPerConnection() throws Exception
+ {
+ ManagedBroker vhost = _jmxUtils.getManagedBroker("test");
+
+ sendUsing(_test, 5, 200);
+ Thread.sleep(1000);
+
+ List<String> addresses = new ArrayList<String>();
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+ {
+ assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived());
+ assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived());
+ assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled());
+
+ addresses.add(mc.getRemoteAddress());
+ }
+ assertEquals("Incorrect vhost total", 0, vhost.getTotalMessagesReceived());
+ assertEquals("Incorrect vhost data", 0, vhost.getTotalDataReceived());
+
+ Connection test = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test");
+ test.start();
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+ {
+ if (addresses.contains(mc.getRemoteAddress()))
+ {
+ continue;
+ }
+ mc.setStatisticsEnabled(true);
+ assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived());
+ assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived());
+ }
+
+ sendUsing(test, 5, 200);
+ sendUsing(_test, 5, 200);
+ Thread.sleep(1000);
+
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+ {
+ if (addresses.contains(mc.getRemoteAddress()))
+ {
+ assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived());
+ assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived());
+ assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled());
+ }
+ else
+ {
+ assertEquals("Incorrect connection total", 5, mc.getTotalMessagesReceived());
+ assertEquals("Incorrect connection data", 1000, mc.getTotalDataReceived());
+ assertTrue("Connection statistics should be enabled", mc.isStatisticsEnabled());
+ }
+ }
+ assertEquals("Incorrect vhost total", 0, vhost.getTotalMessagesReceived());
+ assertEquals("Incorrect vhost data", 0, vhost.getTotalDataReceived());
+ assertFalse("Vhost statistics should not be enabled", vhost.isStatisticsEnabled());
+
+ test.close();
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java
new file mode 100644
index 0000000000..df8c6e74cd
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java
@@ -0,0 +1,177 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.jmx;
+
+import org.apache.qpid.management.common.mbeans.ManagedBroker;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
+
+/**
+ * Test enabling generation of message statistics on a per-connection basis.
+ */
+public class MessageStatisticsConfigurationTest extends MessageStatisticsTestCase
+{
+ public void configureStatistics() throws Exception
+ {
+ setConfigurationProperty("statistics.generation.broker", Boolean.toString(getName().contains("Broker")));
+ setConfigurationProperty("statistics.generation.virtualhosts", Boolean.toString(getName().contains("Virtualhost")));
+ setConfigurationProperty("statistics.generation.connections", Boolean.toString(getName().contains("Connection")));
+ }
+
+ /**
+ * Just broker statistics.
+ */
+ public void testGenerateBrokerStatistics() throws Exception
+ {
+ sendUsing(_test, 5, 200);
+ Thread.sleep(1000);
+
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+ {
+ assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived());
+ assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived());
+ assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled());
+ }
+
+ ManagedBroker vhost = _jmxUtils.getManagedBroker("test");
+ assertEquals("Incorrect vhost data", 0, vhost.getTotalMessagesReceived());
+ assertEquals("Incorrect vhost data", 0, vhost.getTotalDataReceived());
+ assertFalse("Vhost statistics should not be enabled", vhost.isStatisticsEnabled());
+
+ if (!_broker.equals(VM))
+ {
+ assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessagesReceived());
+ assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalDataReceived());
+ assertTrue("Server statistics should be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled());
+ }
+ }
+
+ /**
+ * Just virtualhost statistics.
+ */
+ public void testGenerateVirtualhostStatistics() throws Exception
+ {
+ sendUsing(_test, 5, 200);
+ Thread.sleep(1000);
+
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+ {
+ assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived());
+ assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived());
+ assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled());
+ }
+
+ ManagedBroker vhost = _jmxUtils.getManagedBroker("test");
+ assertEquals("Incorrect vhost data", 5, vhost.getTotalMessagesReceived());
+ assertEquals("Incorrect vhost data", 1000, vhost.getTotalDataReceived());
+ assertTrue("Vhost statistics should be enabled", vhost.isStatisticsEnabled());
+
+ if (!_broker.equals(VM))
+ {
+ assertEquals("Incorrect server total messages", 0, _jmxUtils.getServerInformation().getTotalMessagesReceived());
+ assertEquals("Incorrect server total data", 0, _jmxUtils.getServerInformation().getTotalDataReceived());
+ assertFalse("Server statistics should not be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled());
+ }
+ }
+
+ /**
+ * Just connection statistics.
+ */
+ public void testGenerateConnectionStatistics() throws Exception
+ {
+ sendUsing(_test, 5, 200);
+ Thread.sleep(1000);
+
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+ {
+ assertEquals("Incorrect connection total", 5, mc.getTotalMessagesReceived());
+ assertEquals("Incorrect connection data", 1000, mc.getTotalDataReceived());
+ assertTrue("Connection statistics should be enabled", mc.isStatisticsEnabled());
+ }
+
+ ManagedBroker vhost = _jmxUtils.getManagedBroker("test");
+ assertEquals("Incorrect vhost data", 0, vhost.getTotalMessagesReceived());
+ assertEquals("Incorrect vhost data", 0, vhost.getTotalDataReceived());
+ assertFalse("Vhost statistics should not be enabled", vhost.isStatisticsEnabled());
+
+ if (!_broker.equals(VM))
+ {
+ assertEquals("Incorrect server total messages", 0, _jmxUtils.getServerInformation().getTotalMessagesReceived());
+ assertEquals("Incorrect server total data", 0, _jmxUtils.getServerInformation().getTotalDataReceived());
+ assertFalse("Server statistics should not be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled());
+ }
+ }
+
+ /**
+ * Both broker and virtualhost statistics.
+ */
+ public void testGenerateBrokerAndVirtualhostStatistics() throws Exception
+ {
+ sendUsing(_test, 5, 200);
+ Thread.sleep(1000);
+
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+ {
+ assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived());
+ assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived());
+ assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled());
+ }
+
+ ManagedBroker vhost = _jmxUtils.getManagedBroker("test");
+ assertEquals("Incorrect vhost data", 5, vhost.getTotalMessagesReceived());
+ assertEquals("Incorrect vhost data", 1000, vhost.getTotalDataReceived());
+ assertTrue("Vhost statistics should be enabled", vhost.isStatisticsEnabled());
+
+ if (!_broker.equals(VM))
+ {
+ assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessagesReceived());
+ assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalDataReceived());
+ assertTrue("Server statistics should be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled());
+ }
+ }
+
+ /**
+ * Broker, virtualhost and connection statistics.
+ */
+ public void testGenerateBrokerVirtualhostAndConnectionStatistics() throws Exception
+ {
+ sendUsing(_test, 5, 200);
+ Thread.sleep(1000);
+
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+ {
+ assertEquals("Incorrect connection total", 5, mc.getTotalMessagesReceived());
+ assertEquals("Incorrect connection data", 1000, mc.getTotalDataReceived());
+ assertTrue("Connection statistics should be enabled", mc.isStatisticsEnabled());
+ }
+
+ ManagedBroker vhost = _jmxUtils.getManagedBroker("test");
+ assertEquals("Incorrect vhost data", 5, vhost.getTotalMessagesReceived());
+ assertEquals("Incorrect vhost data", 1000, vhost.getTotalDataReceived());
+ assertTrue("Vhost statistics should be enabled", vhost.isStatisticsEnabled());
+
+ if (!_broker.equals(VM))
+ {
+ assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessagesReceived());
+ assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalDataReceived());
+ assertTrue("Server statistics should be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled());
+ }
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsDeliveryTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsDeliveryTest.java
new file mode 100644
index 0000000000..e657856d0e
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsDeliveryTest.java
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.jmx;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.management.common.mbeans.ManagedBroker;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
+
+/**
+ * Test statistics for delivery and receipt.
+ */
+public class MessageStatisticsDeliveryTest extends MessageStatisticsTestCase
+{
+ public void configureStatistics() throws Exception
+ {
+ setConfigurationProperty("statistics.generation.broker", "true");
+ setConfigurationProperty("statistics.generation.virtualhosts", "true");
+ setConfigurationProperty("statistics.generation.connections", "true");
+ }
+
+ public void testDeliveryAndReceiptStatistics() throws Exception
+ {
+ ManagedBroker vhost = _jmxUtils.getManagedBroker("test");
+
+ sendUsing(_test, 5, 200);
+ Thread.sleep(1000);
+
+ List<String> addresses = new ArrayList<String>();
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+ {
+ assertEquals("Incorrect connection delivery total", 0, mc.getTotalMessagesDelivered());
+ assertEquals("Incorrect connection delivery data", 0, mc.getTotalDataDelivered());
+ assertEquals("Incorrect connection receipt total", 5, mc.getTotalMessagesReceived());
+ assertEquals("Incorrect connection receipt data", 1000, mc.getTotalDataReceived());
+
+ addresses.add(mc.getRemoteAddress());
+ }
+
+ assertEquals("Incorrect vhost delivery total", 0, vhost.getTotalMessagesDelivered());
+ assertEquals("Incorrect vhost delivery data", 0, vhost.getTotalDataDelivered());
+ assertEquals("Incorrect vhost receipt total", 5, vhost.getTotalMessagesReceived());
+ assertEquals("Incorrect vhost receipt data", 1000, vhost.getTotalDataReceived());
+
+ Connection test = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test");
+ test.start();
+ receiveUsing(test, 5);
+
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+ {
+ if (addresses.contains(mc.getRemoteAddress()))
+ {
+ assertEquals("Incorrect connection delivery total", 0, mc.getTotalMessagesDelivered());
+ assertEquals("Incorrect connection delivery data", 0, mc.getTotalDataDelivered());
+ assertEquals("Incorrect connection receipt total", 5, mc.getTotalMessagesReceived());
+ assertEquals("Incorrect connection receipt data", 1000, mc.getTotalDataReceived());
+ }
+ else
+ {
+ assertEquals("Incorrect connection delivery total", 5, mc.getTotalMessagesDelivered());
+ assertEquals("Incorrect connection delivery data", 1000, mc.getTotalDataDelivered());
+ assertEquals("Incorrect connection receipt total", 0, mc.getTotalMessagesReceived());
+ assertEquals("Incorrect connection receipt data", 0, mc.getTotalDataReceived());
+ }
+ }
+ assertEquals("Incorrect vhost delivery total", 5, vhost.getTotalMessagesDelivered());
+ assertEquals("Incorrect vhost delivery data", 1000, vhost.getTotalDataDelivered());
+ assertEquals("Incorrect vhost receipt total", 5, vhost.getTotalMessagesReceived());
+ assertEquals("Incorrect vhost receipt data", 1000, vhost.getTotalDataReceived());
+
+ test.close();
+ }
+
+ protected void receiveUsing(Connection con, int number) throws Exception
+ {
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ createQueue(session);
+ MessageConsumer consumer = session.createConsumer(_queue);
+ for (int i = 0; i < number; i++)
+ {
+ Message msg = consumer.receive(100);
+ assertNotNull("Message " + i + " was not received", msg);
+ }
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java
new file mode 100644
index 0000000000..180440c0d6
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.jmx;
+
+import java.util.List;
+
+import org.apache.qpid.util.LogMonitor;
+
+/**
+ * Test generation of message statistics reporting.
+ */
+public class MessageStatisticsReportingTest extends MessageStatisticsTestCase
+{
+ protected LogMonitor _monitor;
+
+ public void configureStatistics() throws Exception
+ {
+ setConfigurationProperty("statistics.generation.broker", "true");
+ setConfigurationProperty("statistics.generation.virtualhosts", "true");
+
+ if (getName().equals("testEnabledStatisticsReporting"))
+ {
+ setConfigurationProperty("statistics.reporting.period", "10");
+ }
+
+ _monitor = new LogMonitor(_outputFile);
+ }
+
+ /**
+ * Test enabling reporting.
+ */
+ public void testEnabledStatisticsReporting() throws Exception
+ {
+ sendUsing(_test, 10, 100);
+ sendUsing(_dev, 20, 100);
+ sendUsing(_local, 15, 100);
+
+ Thread.sleep(10 * 1000); // 15s
+
+ List<String> brokerStatsData = _monitor.findMatches("BRK-1008");
+ List<String> brokerStatsMessages = _monitor.findMatches("BRK-1009");
+ List<String> vhostStatsData = _monitor.findMatches("VHT-1003");
+ List<String> vhostStatsMessages = _monitor.findMatches("VHT-1004");
+
+ assertEquals("Incorrect number of broker data stats log messages", 2, brokerStatsData.size());
+ assertEquals("Incorrect number of broker message stats log messages", 2, brokerStatsMessages.size());
+ assertEquals("Incorrect number of virtualhost data stats log messages", 6, vhostStatsData.size());
+ assertEquals("Incorrect number of virtualhost message stats log messages", 6, vhostStatsMessages.size());
+ }
+
+ /**
+ * Test not enabling reporting.
+ */
+ public void testNotEnabledStatisticsReporting() throws Exception
+ {
+ sendUsing(_test, 10, 100);
+ sendUsing(_dev, 20, 100);
+ sendUsing(_local, 15, 100);
+
+ Thread.sleep(10 * 1000); // 15s
+
+ List<String> brokerStatsData = _monitor.findMatches("BRK-1008");
+ List<String> brokerStatsMessages = _monitor.findMatches("BRK-1009");
+ List<String> vhostStatsData = _monitor.findMatches("VHT-1003");
+ List<String> vhostStatsMessages = _monitor.findMatches("VHT-1004");
+
+ assertEquals("Incorrect number of broker data stats log messages", 0, brokerStatsData.size());
+ assertEquals("Incorrect number of broker message stats log messages", 0, brokerStatsMessages.size());
+ assertEquals("Incorrect number of virtualhost data stats log messages", 0, vhostStatsData.size());
+ assertEquals("Incorrect number of virtualhost message stats log messages", 0, vhostStatsMessages.size());
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java
new file mode 100644
index 0000000000..50ca51b18a
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java
@@ -0,0 +1,233 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.jmx;
+
+import javax.jms.Connection;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.management.common.mbeans.ManagedBroker;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
+
+/**
+ * Test generation of message statistics.
+ */
+public class MessageStatisticsTest extends MessageStatisticsTestCase
+{
+ public void configureStatistics() throws Exception
+ {
+ setConfigurationProperty("statistics.generation.broker", "true");
+ setConfigurationProperty("statistics.generation.virtualhosts", "true");
+ setConfigurationProperty("statistics.generation.connections", "true");
+ }
+
+ /**
+ * Test message totals.
+ */
+ public void testMessageTotals() throws Exception
+ {
+ sendUsing(_test, 10, 100);
+ sendUsing(_dev, 20, 100);
+ sendUsing(_local, 5, 100);
+ sendUsing(_local, 5, 100);
+ sendUsing(_local, 5, 100);
+ Thread.sleep(2000);
+
+ ManagedBroker test = _jmxUtils.getManagedBroker("test");
+ ManagedBroker dev = _jmxUtils.getManagedBroker("development");
+ ManagedBroker local = _jmxUtils.getManagedBroker("localhost");
+
+ if (!isBroker010())
+ {
+ long total = 0;
+ long data = 0;
+ for (ManagedConnection mc : _jmxUtils.getAllManagedConnections())
+ {
+ total += mc.getTotalMessagesReceived();
+ data += mc.getTotalDataReceived();
+ }
+ assertEquals("Incorrect connection total", 45, total);
+ assertEquals("Incorrect connection data", 4500, data);
+ }
+ if (!_broker.equals(VM))
+ {
+ assertEquals("Incorrect server total", 45, _jmxUtils.getServerInformation().getTotalMessagesReceived());
+ assertEquals("Incorrect server data", 4500, _jmxUtils.getServerInformation().getTotalDataReceived());
+ }
+
+ if (!isBroker010())
+ {
+ long testTotal = 0;
+ long testData = 0;
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+ {
+ testTotal += mc.getTotalMessagesReceived();
+ testData += mc.getTotalDataReceived();
+ }
+ assertEquals("Incorrect test connection total", 10, testTotal);
+ assertEquals("Incorrect test connection data", 1000, testData);
+ }
+ assertEquals("Incorrect test vhost total", 10, test.getTotalMessagesReceived());
+ assertEquals("Incorrect test vhost data", 1000, test.getTotalDataReceived());
+
+ if (!isBroker010())
+ {
+ long devTotal = 0;
+ long devData = 0;
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("development"))
+ {
+ devTotal += mc.getTotalMessagesReceived();
+ devData += mc.getTotalDataReceived();
+ }
+ assertEquals("Incorrect test connection total", 20, devTotal);
+ assertEquals("Incorrect test connection data", 2000, devData);
+ }
+ assertEquals("Incorrect development total", 20, dev.getTotalMessagesReceived());
+ assertEquals("Incorrect development data", 2000, dev.getTotalDataReceived());
+
+ if (!isBroker010())
+ {
+ long localTotal = 0;
+ long localData = 0;
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("localhost"))
+ {
+ localTotal += mc.getTotalMessagesReceived();
+ localData += mc.getTotalDataReceived();
+ }
+ assertEquals("Incorrect test connection total", 15, localTotal);
+ assertEquals("Incorrect test connection data", 1500, localData);
+ }
+ assertEquals("Incorrect localhost total", 15, local.getTotalMessagesReceived());
+ assertEquals("Incorrect localhost data", 1500, local.getTotalDataReceived());
+ }
+
+ /**
+ * Test message totals when a connection is closed.
+ */
+ public void testMessageTotalsWithClosedConnections() throws Exception
+ {
+ Connection temp = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test");
+ temp.start();
+
+ sendUsing(_test, 10, 100);
+ sendUsing(temp, 10, 100);
+ sendUsing(_test, 10, 100);
+ Thread.sleep(2000);
+
+ temp.close();
+
+ ManagedBroker test = _jmxUtils.getManagedBroker("test");
+
+ if (!isBroker010())
+ {
+ long total = 0;
+ long data = 0;
+ for (ManagedConnection mc : _jmxUtils.getAllManagedConnections())
+ {
+ total += mc.getTotalMessagesReceived();
+ data += mc.getTotalDataReceived();
+ }
+ assertEquals("Incorrect active connection total", 20, total);
+ assertEquals("Incorrect active connection data", 2000, data);
+ }
+ if (!_broker.equals(VM))
+ {
+ assertEquals("Incorrect server total", 30, _jmxUtils.getServerInformation().getTotalMessagesReceived());
+ assertEquals("Incorrect server data", 3000, _jmxUtils.getServerInformation().getTotalDataReceived());
+ }
+
+ if (!isBroker010())
+ {
+ long testTotal = 0;
+ long testData = 0;
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+ {
+ testTotal += mc.getTotalMessagesReceived();
+ testData += mc.getTotalDataReceived();
+ }
+ assertEquals("Incorrect test active connection total", 20, testTotal);
+ assertEquals("Incorrect test active connection data", 20 * 100, testData);
+ }
+ assertEquals("Incorrect test vhost total", 30, test.getTotalMessagesReceived());
+ assertEquals("Incorrect test vhost data", 30 * 100, test.getTotalDataReceived());
+ }
+
+ /**
+ * Test message peak rate generation.
+ */
+ public void testMessagePeakRates() throws Exception
+ {
+ sendUsing(_test, 2, 10);
+ Thread.sleep(10000);
+ sendUsing(_dev, 4, 10);
+ Thread.sleep(10000);
+
+ ManagedBroker test = _jmxUtils.getManagedBroker("test");
+ ManagedBroker dev = _jmxUtils.getManagedBroker("development");
+
+ assertApprox("Incorrect test vhost peak messages", 0.2d, 1.0d, test.getPeakMessageReceiptRate());
+ assertApprox("Incorrect test vhost peak data", 0.2d, 10.0d, test.getPeakDataReceiptRate());
+ assertApprox("Incorrect dev vhost peak messages", 0.2d, 2.0d, dev.getPeakMessageReceiptRate());
+ assertApprox("Incorrect dev vhost peak data", 0.2d, 20.0d, dev.getPeakDataReceiptRate());
+
+ if (!_broker.equals(VM))
+ {
+ assertApprox("Incorrect server peak messages", 0.2d, 2.0d, _jmxUtils.getServerInformation().getPeakMessageReceiptRate());
+ assertApprox("Incorrect server peak data", 0.2d, 20.0d, _jmxUtils.getServerInformation().getPeakDataReceiptRate());
+ }
+ }
+
+ /**
+ * Test message totals when a vhost has its statistics reset
+ */
+ public void testMessageTotalVhostReset() throws Exception
+ {
+ sendUsing(_test, 10, 10);
+ sendUsing(_dev, 10, 10);
+ Thread.sleep(2000);
+
+ ManagedBroker test = _jmxUtils.getManagedBroker("test");
+ ManagedBroker dev = _jmxUtils.getManagedBroker("development");
+
+ assertEquals("Incorrect test vhost total messages", 10, test.getTotalMessagesReceived());
+ assertEquals("Incorrect test vhost total data", 100, test.getTotalDataReceived());
+ assertEquals("Incorrect dev vhost total messages", 10, dev.getTotalMessagesReceived());
+ assertEquals("Incorrect dev vhost total data", 100, dev.getTotalDataReceived());
+
+ if (!_broker.equals(VM))
+ {
+ assertEquals("Incorrect server total messages", 20, _jmxUtils.getServerInformation().getTotalMessagesReceived());
+ assertEquals("Incorrect server total data", 200, _jmxUtils.getServerInformation().getTotalDataReceived());
+ }
+
+ test.resetStatistics();
+
+ assertEquals("Incorrect test vhost total messages", 0, test.getTotalMessagesReceived());
+ assertEquals("Incorrect test vhost total data", 0, test.getTotalDataReceived());
+ assertEquals("Incorrect dev vhost total messages", 10, dev.getTotalMessagesReceived());
+ assertEquals("Incorrect dev vhost total data", 100, dev.getTotalDataReceived());
+
+ if (!_broker.equals(VM))
+ {
+ assertEquals("Incorrect server total messages", 20, _jmxUtils.getServerInformation().getTotalMessagesReceived());
+ assertEquals("Incorrect server total data", 200, _jmxUtils.getServerInformation().getTotalDataReceived());
+ }
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java
new file mode 100644
index 0000000000..a5b3aa283c
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java
@@ -0,0 +1,128 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.jmx;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+/**
+ * Test generation of message statistics.
+ */
+public abstract class MessageStatisticsTestCase extends QpidBrokerTestCase
+{
+ protected static final String USER = "admin";
+
+ protected JMXTestUtils _jmxUtils;
+ protected Connection _test, _dev, _local;
+ protected String _queueName = "statistics";
+ protected Destination _queue;
+ protected String _brokerUrl;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ _jmxUtils = new JMXTestUtils(this, USER, USER);
+ _jmxUtils.setUp();
+
+ configureStatistics();
+
+ super.setUp();
+
+ _brokerUrl = getBroker().toString();
+ _test = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test");
+ _dev = new AMQConnection(_brokerUrl, USER, USER, "clientid", "development");
+ _local = new AMQConnection(_brokerUrl, USER, USER, "clientid", "localhost");
+
+ _test.start();
+ _dev.start();
+ _local.start();
+
+ _jmxUtils.open();
+ }
+
+ protected void createQueue(Session session) throws AMQException, JMSException
+ {
+ _queue = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, _queueName);
+ if (!((AMQSession<?,?>) session).isQueueBound((AMQDestination) _queue))
+ {
+ ((AMQSession<?,?>) session).createQueue(new AMQShortString(_queueName), false, true, false, null);
+ ((AMQSession<?,?>) session).declareAndBind((AMQDestination) new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, _queueName));
+ }
+ }
+
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ _jmxUtils.close();
+
+ _test.close();
+ _dev.close();
+ _local.close();
+
+ super.tearDown();
+ }
+
+ /**
+ * Configure statistics generation properties on the broker.
+ */
+ public abstract void configureStatistics() throws Exception;
+
+ protected void sendUsing(Connection con, int number, int size) throws Exception
+ {
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ createQueue(session);
+ MessageProducer producer = session.createProducer(_queue);
+ String content = new String(new byte[size]);
+ TextMessage msg = session.createTextMessage(content);
+ for (int i = 0; i < number; i++)
+ {
+ producer.send(msg);
+ }
+ }
+
+ /**
+ * Asserts that the actual value is within the expected value plus or
+ * minus the given error.
+ */
+ public void assertApprox(String message, double error, double expected, double actual)
+ {
+ double min = expected * (1.0d - error);
+ double max = expected * (1.0d + error);
+ String assertion = String.format("%s: expected %f +/- %d%%, actual %f",
+ message, expected, (int) (error * 100.0d), actual);
+ assertTrue(assertion, actual > min && actual < max);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
index 3fece2130a..9f6963643a 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
@@ -21,6 +21,8 @@
package org.apache.qpid.test.utils;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
import javax.management.JMException;
@@ -31,14 +33,18 @@ import javax.management.ObjectName;
import javax.management.MalformedObjectNameException;
import javax.management.remote.JMXConnector;
+import junit.framework.TestCase;
+
import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.commands.objects.AllObjects;
import org.apache.qpid.management.common.JMXConnnectionFactory;
import org.apache.qpid.management.common.mbeans.ManagedBroker;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
import org.apache.qpid.management.common.mbeans.ManagedExchange;
import org.apache.qpid.management.common.mbeans.LoggingManagement;
import org.apache.qpid.management.common.mbeans.ConfigurationManagement;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
+import org.apache.qpid.management.common.mbeans.ServerInformation;
import org.apache.qpid.management.common.mbeans.UserManagement;
/**
@@ -326,6 +332,16 @@ public class JMXTestUtils
return MBeanServerInvocationHandler.newProxyInstance(_mbsc, objectName, managedClass, false);
}
+ public <T> List<T> getManagedObjectList(Class<T> managedClass, Set<ObjectName> objectNames)
+ {
+ List<T> objects = new ArrayList<T>();
+ for (ObjectName name : objectNames)
+ {
+ objects.add(getManagedObject(managedClass, name));
+ }
+ return objects;
+ }
+
public ManagedBroker getManagedBroker(String virtualHost)
{
return getManagedObject(ManagedBroker.class, getVirtualHostManagerObjectName(virtualHost));
@@ -360,4 +376,54 @@ public class JMXTestUtils
ObjectName objectName = new ObjectName("org.apache.qpid:type=UserManagement,name=UserManagement");
return getManagedObject(UserManagement.class, objectName);
}
+
+ /**
+ * Retrive {@link ServerInformation} JMX MBean.
+ */
+ public ServerInformation getServerInformation()
+ {
+ // Get the name of the test manager
+ AllObjects allObject = new AllObjects(_mbsc);
+ allObject.querystring = "org.apache.qpid:type=ServerInformation,name=ServerInformation,*";
+
+ Set<ObjectName> objectNames = allObject.returnObjects();
+
+ TestCase.assertNotNull("Null ObjectName Set returned", objectNames);
+ TestCase.assertEquals("Incorrect number of objects returned", 1, objectNames.size());
+
+ // We have verified we have only one value in objectNames so return it
+ return getManagedObject(ServerInformation.class, objectNames.iterator().next());
+ }
+
+ /**
+ * Retrive all {@link ManagedConnection} objects.
+ */
+ public List<ManagedConnection> getAllManagedConnections()
+ {
+ // Get the name of the test manager
+ AllObjects allObject = new AllObjects(_mbsc);
+ allObject.querystring = "org.apache.qpid:type=VirtualHost.Connection,VirtualHost=*,name=*";
+
+ Set<ObjectName> objectNames = allObject.returnObjects();
+
+ TestCase.assertNotNull("Null ObjectName Set returned", objectNames);
+
+ return getManagedObjectList(ManagedConnection.class, objectNames);
+ }
+
+ /**
+ * Retrive all {@link ManagedConnection} objects for a particular virtual host.
+ */
+ public List<ManagedConnection> getManagedConnections(String vhost)
+ {
+ // Get the name of the test manager
+ AllObjects allObject = new AllObjects(_mbsc);
+ allObject.querystring = "org.apache.qpid:type=VirtualHost.Connection,VirtualHost=" + ObjectName.quote(vhost) + ",name=*";
+
+ Set<ObjectName> objectNames = allObject.returnObjects();
+
+ TestCase.assertNotNull("Null ObjectName Set returned", objectNames);
+
+ return getManagedObjectList(ManagedConnection.class, objectNames);
+ }
}
diff --git a/qpid/java/test-profiles/Java010Excludes b/qpid/java/test-profiles/Java010Excludes
index 927b220696..c05aad0cb1 100755
--- a/qpid/java/test-profiles/Java010Excludes
+++ b/qpid/java/test-profiles/Java010Excludes
@@ -47,6 +47,7 @@ org.apache.qpid.server.logging.SubscriptionLoggingTest#testSubscriptionSuspend
// 0-10 Broker does not have a JMX connection MBean
org.apache.qpid.management.jmx.ManagementActorLoggingTest#testConnectionCloseViaManagement
+org.apache.qpid.management.jmx.MessageConnectionStatisticsTest#*
// 0-10 has different ideas about clientid and ownership of queues
org.apache.qpid.server.queue.ModelTest#*