summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Donald Kennedy <grkvlt@apache.org>2010-11-08 17:05:08 +0000
committerAndrew Donald Kennedy <grkvlt@apache.org>2010-11-08 17:05:08 +0000
commit20092d63916ebe4d6067190c21db5a0b95552457 (patch)
tree355530d7a95635f141e6a98d009808cb2259fe39
parentdc4b48203d2073720ae679b14abb389afb4f42f7 (diff)
downloadqpid-python-20092d63916ebe4d6067190c21db5a0b95552457.tar.gz
QPID-2932: Add statistics generation for broker message delivery
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1032643 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java63
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java30
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java60
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java14
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java59
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java47
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java126
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java159
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java95
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java135
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java3
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java3
-rw-r--r--qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java63
-rw-r--r--qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java72
-rw-r--r--qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java64
-rw-r--r--qpid/java/systests/derby.log6
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java94
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java162
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java90
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java211
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java99
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java47
30 files changed, 1747 insertions, 46 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
index d2eea55df2..d6720684fa 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
@@ -39,6 +39,7 @@ package org.apache.qpid.server;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -57,16 +58,19 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.exchange.ExchangeType;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.ManagementActor;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSessionMBean;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.AMQQueueMBean;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.ManagementActor;
/**
* This MBean implements the broker management interface and exposes the
@@ -79,6 +83,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
private final ExchangeRegistry _exchangeRegistry;
private final ExchangeFactory _exchangeFactory;
private final MessageStore _messageStore;
+ private final VirtualHost _virtualHost;
private final VirtualHost.VirtualHostMBean _virtualHostMBean;
@@ -88,12 +93,12 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
super(ManagedBroker.class, ManagedBroker.TYPE);
_virtualHostMBean = virtualHostMBean;
- VirtualHost virtualHost = virtualHostMBean.getVirtualHost();
+ _virtualHost = virtualHostMBean.getVirtualHost();
- _queueRegistry = virtualHost.getQueueRegistry();
- _exchangeRegistry = virtualHost.getExchangeRegistry();
- _messageStore = virtualHost.getMessageStore();
- _exchangeFactory = virtualHost.getExchangeFactory();
+ _queueRegistry = _virtualHost.getQueueRegistry();
+ _exchangeRegistry = _virtualHost.getExchangeRegistry();
+ _messageStore = _virtualHost.getMessageStore();
+ _exchangeFactory = _virtualHost.getExchangeFactory();
}
public String getObjectInstanceName()
@@ -348,4 +353,46 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
{
return getObjectNameForSingleInstanceMBean();
}
-} // End of MBean class
+
+ public void resetStatistics() throws Exception
+ {
+ _virtualHost.getMessageStatistics().reset();
+ _virtualHost.getDataStatistics().reset();
+
+ Collection<AMQProtocolSession> connections = _virtualHost.getConnectionRegistry().getConnections();
+ for (AMQProtocolSession con : connections)
+ {
+ ((AMQProtocolSessionMBean) ((AMQMinaProtocolSession) con).getManagedObject()).resetStatistics();
+ }
+ }
+
+ public double getPeakMessageRate()
+ {
+ return _virtualHost.getMessageStatistics().getPeak();
+ }
+
+ public double getPeakDataRate()
+ {
+ return _virtualHost.getDataStatistics().getPeak();
+ }
+
+ public double getMessageRate()
+ {
+ return _virtualHost.getMessageStatistics().getRate();
+ }
+
+ public double getDataRate()
+ {
+ return _virtualHost.getDataStatistics().getRate();
+ }
+
+ public long getTotalMessages()
+ {
+ return _virtualHost.getMessageStatistics().getTotal();
+ }
+
+ public long getTotalData()
+ {
+ return _virtualHost.getDataStatistics().getTotal();
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 4757faa9f6..116c425a99 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -206,6 +206,9 @@ public class AMQChannel
// check and deliver if header says body length is zero
if (_currentMessage.allContentReceived())
{
+ long bodySize = _currentMessage.getContentHeaderBody().bodySize;
+ long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeaderBody().properties).getTimestamp();
+ _session.registerMessageDelivery(bodySize, timestamp);
try
{
_currentMessage.deliverToQueues();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
index c79e4d5eb6..bc129a2be5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -291,7 +291,7 @@ public class Main
configMBean.register();
ServerInformationMBean sysInfoMBean =
- new ServerInformationMBean(QpidProperties.getBuildVersion(), QpidProperties.getReleaseVersion());
+ new ServerInformationMBean((ApplicationRegistry) ApplicationRegistry.getInstance());
sysInfoMBean.register();
//fixme .. use QpidProperties.getVersionString when we have fixed the classpath issues
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index 2adb01316b..8e00539025 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -664,4 +664,34 @@ public class ServerConfiguration implements SignalHandler
getConfig().getLong("housekeeping.expiredMessageCheckPeriod",
DEFAULT_HOUSEKEEPING_PERIOD));
}
+
+ public long getStatisticsSamplePeriod()
+ {
+ return getConfig().getLong("statistics.sample.period", 5000L);
+ }
+
+ public boolean isStatisticsGenerationBrokerEnabled()
+ {
+ return getConfig().getBoolean("statistics.generation.broker", false);
+ }
+
+ public boolean isStatisticsGenerationVirtualhostsEnabled()
+ {
+ return getConfig().getBoolean("statistics.generation.virtualhosts", false);
+ }
+
+ public boolean isStatisticsGenerationConnectionsEnabled()
+ {
+ return getConfig().getBoolean("statistics.generation.connections", false);
+ }
+
+ public long getStatisticsReportingPeriod()
+ {
+ return getConfig().getLong("statistics.reporting.period", 0L);
+ }
+
+ public boolean isStatisticsReportResetEnabled()
+ {
+ return getConfig().getBoolean("statistics.reporting.reset", false);
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java
index db2cc970b2..81c9b37871 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java
@@ -21,12 +21,17 @@
package org.apache.qpid.server.information.management;
import java.io.IOException;
+import java.util.Collection;
+import javax.management.JMException;
+
+import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.management.common.mbeans.ServerInformation;
import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
+import org.apache.qpid.server.AMQBrokerManagerMBean;
import org.apache.qpid.server.management.AMQManagedObject;
-
-import javax.management.JMException;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
/** MBean class for the ServerInformationMBean. */
@MBeanDescription("Server Information Interface")
@@ -34,12 +39,15 @@ public class ServerInformationMBean extends AMQManagedObject implements ServerIn
{
private String buildVersion;
private String productVersion;
+ private ApplicationRegistry registry;
- public ServerInformationMBean(String buildVersion, String productVersion) throws JMException
+ public ServerInformationMBean(ApplicationRegistry applicationRegistry) throws JMException
{
super(ServerInformation.class, ServerInformation.TYPE);
- this.buildVersion = buildVersion;
- this.productVersion = productVersion;
+
+ registry = applicationRegistry;
+ buildVersion = QpidProperties.getBuildVersion();
+ productVersion = QpidProperties.getReleaseVersion();
}
public String getObjectInstanceName()
@@ -67,5 +75,45 @@ public class ServerInformationMBean extends AMQManagedObject implements ServerIn
return productVersion;
}
-
+ public void resetStatistics() throws Exception
+ {
+ registry.getDataStatistics().reset();
+ registry.getMessageStatistics().reset();
+
+ Collection<VirtualHost> virtualhosts = registry.getVirtualHostRegistry().getVirtualHosts();
+ for (VirtualHost vhost : virtualhosts)
+ {
+ ((AMQBrokerManagerMBean) vhost.getBrokerMBean()).resetStatistics();
+ }
+ }
+
+ public double getPeakMessageRate()
+ {
+ return registry.getMessageStatistics().getPeak();
+ }
+
+ public double getPeakDataRate()
+ {
+ return registry.getDataStatistics().getPeak();
+ }
+
+ public double getMessageRate()
+ {
+ return registry.getMessageStatistics().getRate();
+ }
+
+ public double getDataRate()
+ {
+ return registry.getDataStatistics().getRate();
+ }
+
+ public long getTotalMessages()
+ {
+ return registry.getMessageStatistics().getTotal();
+ }
+
+ public long getTotalData()
+ {
+ return registry.getDataStatistics().getTotal();
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties
index 7f6ec704ce..636b7bff17 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties
@@ -222,6 +222,8 @@ BRK_STOPPED = BRK-1005 : Stopped
BRK_CONFIG = BRK-1006 : Using configuration : {0}
# 0 - path
BRK_LOG_CONFIG = BRK-1007 : Using logging configuration : {0}
+BRK_STATS_DATA = BRK-1008 : {0,number,#.###} kB/s, {1,number,#} bytes
+BRK_STATS_MSGS = BRK-1009 : {0,number,#.###} msg/s, {1,number,#} msgs
#ManagementConsole
MNG_STARTUP = MNG-1001 : Startup
@@ -243,6 +245,8 @@ MNG_CLOSE = MNG-1008 : Close
# 0 - name
VHT_CREATED = VHT-1001 : Created : {0}
VHT_CLOSED = VHT-1002 : Closed
+VHT_STATS_DATA = VHT-1003 : {0} : {1,number,#.###} kB/s, {2,number,#} bytes
+VHT_STATS_MSGS = VHT-1004 : {0} : {1,number,#.###} msg/s, {2,number,#} msgs
#MessageStore
# 0 - name
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
index 3b716cb858..ff4b459ec6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
@@ -98,8 +98,9 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
public void start() throws IOException, ConfigurationException
{
-
CurrentActor.get().message(ManagementConsoleMessages.MNG_STARTUP());
+
+ boolean disableCustomSocketFactory = Boolean.getBoolean("qpid.management.disableCustomSocketFactory");
//check if system properties are set to use the JVM's out-of-the-box JMXAgent
if (areOutOfTheBoxJMXOptionsSet())
@@ -225,8 +226,15 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
* As a result, only binds made using the object reference will succeed, thus securing it from external change.
*/
System.setProperty("java.rmi.server.randomIDs", "true");
- _rmiRegistry = LocateRegistry.createRegistry(port, null, new CustomRMIServerSocketFactory());
-
+ if (disableCustomSocketFactory)
+ {
+ _rmiRegistry = LocateRegistry.createRegistry(port, null, null);
+ }
+ else
+ {
+ _rmiRegistry = LocateRegistry.createRegistry(port, null, new CustomRMIServerSocketFactory());
+ }
+
/*
* We must now create the RMI ConnectorServer manually, as the JMX Factory methods use RMI calls
* to bind the ConnectorServer to the registry, which will now fail as for security we have
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 93fc50a2bd..e758febc3c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -78,13 +78,16 @@ import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.transport.Sender;
-public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
+public class AMQMinaProtocolSession implements AMQProtocolSession, Managable, StatisticsGatherer
{
private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
@@ -148,6 +151,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
private LogSubject _logSubject;
private final AtomicBoolean _closing = new AtomicBoolean(false);
+
+ private ApplicationRegistry _registry;
+ private boolean _statisticsEnabled = false;
+ private StatisticsCounter _messageStats, _dataStats;
public ManagedObject getManagedObject()
{
@@ -162,8 +169,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
session.setAttachment(this);
_codecFactory = codecFactory;
+ _registry = virtualHostRegistry.getApplicationRegistry();
- _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
+ _actor = new AMQPConnectionActor(this, _registry.getRootMessageLogger());
_actor.message(ConnectionMessages.CON_OPEN(null, null, false, false));
@@ -178,8 +186,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
e.printStackTrace();
throw e;
-
}
+
+ initialiseStatistics();
}
private AMQProtocolSessionMBean createMBean() throws AMQException
@@ -944,4 +953,48 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
// No-op, interface munging between this and AMQProtocolSession
}
+
+ public void registerMessageDelivery(long messageSize, long timestamp)
+ {
+ if (isStatisticsEnabled())
+ {
+ _logger.info("=== STATS === register " + messageSize);
+ _messageStats.registerEvent(1L, timestamp);
+ _dataStats.registerEvent(messageSize, timestamp);
+ }
+ _virtualHost.registerMessageDelivery(messageSize, timestamp);
+ }
+
+ public StatisticsCounter getMessageStatistics()
+ {
+ return _messageStats;
+ }
+
+ public StatisticsCounter getDataStatistics()
+ {
+ return _dataStats;
+ }
+
+ public void resetStatistics()
+ {
+ _messageStats.reset();
+ _dataStats.reset();
+ }
+
+ public void initialiseStatistics()
+ {
+ setStatisticsEnabled(_registry.getConfiguration().isStatisticsGenerationConnectionsEnabled());
+ _messageStats = new StatisticsCounter("messages-" + getSessionID());
+ _dataStats = new StatisticsCounter("bytes-" + getSessionID());
+ }
+
+ public boolean isStatisticsEnabled()
+ {
+ return _statisticsEnabled;
+ }
+
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ _statisticsEnabled = enabled;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index 6838a16182..ba110c4826 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -221,4 +221,5 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
public ProtocolSessionIdentifier getSessionIdentifier();
+ public void registerMessageDelivery(long messageSize, long timestamp);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
index 91d21c57d8..28f87fb585 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
@@ -341,4 +341,49 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
_broadcaster.sendNotification(n);
}
-} // End of MBean class
+ public void resetStatistics() throws Exception
+ {
+ _session.getMessageStatistics().reset();
+ _session.getDataStatistics().reset();
+ }
+
+ public double getPeakMessageRate()
+ {
+ return _session.getMessageStatistics().getPeak();
+ }
+
+ public double getPeakDataRate()
+ {
+ return _session.getDataStatistics().getPeak();
+ }
+
+ public double getMessageRate()
+ {
+ return _session.getMessageStatistics().getRate();
+ }
+
+ public double getDataRate()
+ {
+ return _session.getDataStatistics().getRate();
+ }
+
+ public long getTotalMessages()
+ {
+ return _session.getMessageStatistics().getTotal();
+ }
+
+ public long getTotalData()
+ {
+ return _session.getDataStatistics().getTotal();
+ }
+
+ public boolean isStatisticsEnabled()
+ {
+ return _session.isStatisticsEnabled();
+ }
+
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ _session.setStatisticsEnabled(enabled);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index 737ac4450a..4b98a408a7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -23,19 +23,28 @@ package org.apache.qpid.server.registry;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.management.ManagedObjectRegistry;
import org.apache.qpid.server.plugins.PluginManager;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.security.access.ACLManager;
import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.qpid.server.logging.messages.BrokerMessages;
+import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.logging.messages.VirtualHostMessages;
+import org.apache.qpid.server.logging.actors.AbstractActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.transport.QpidAcceptor;
@@ -45,7 +54,7 @@ import org.apache.qpid.server.transport.QpidAcceptor;
* <p/>
* Subclasses should handle the construction of the "registered objects" such as the exchange registry.
*/
-public abstract class ApplicationRegistry implements IApplicationRegistry
+public abstract class ApplicationRegistry implements IApplicationRegistry, StatisticsGatherer
{
protected static final Logger _logger = Logger.getLogger(ApplicationRegistry.class);
@@ -75,6 +84,10 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
protected RootMessageLogger _rootMessageLogger;
+ protected Timer _reportingTimer;
+ protected boolean _statisticsEnabled = false;
+ protected StatisticsCounter _messageStats, _dataStats;
+
static
{
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService()));
@@ -220,6 +233,12 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
_logger.info("Shutting down ApplicationRegistry:"+this);
}
+ //Stop Statistics Reporting
+ if (_reportingTimer != null)
+ {
+ _reportingTimer.cancel();
+ }
+
//Stop incomming connections
unbind();
@@ -317,4 +336,109 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
return _rootMessageLogger;
}
+ public void initialiseStatisticsReporting()
+ {
+ long report = _configuration.getStatisticsReportingPeriod() * 1000; // convert to ms
+ final boolean broker = _configuration.isStatisticsGenerationBrokerEnabled();
+ final boolean virtualhost = _configuration.isStatisticsGenerationVirtualhostsEnabled();
+ final boolean reset = _configuration.isStatisticsReportResetEnabled();
+
+ /* add a timer task to report statistics if generation is enabled for broker or virtualhosts */
+ if (report > 0L && (broker || virtualhost))
+ {
+ _reportingTimer = new Timer("Statistics-Reporting", true);
+
+ class StatisticsReportingTask extends TimerTask
+ {
+ Logger _srLogger = Logger.getLogger(StatisticsReportingTask.class);
+
+ public void run()
+ {
+ CurrentActor.set(new AbstractActor(ApplicationRegistry.getInstance().getRootMessageLogger()) {
+ public String getLogMessage()
+ {
+ return "[" + Thread.currentThread().getName() + "] ";
+ }
+ });
+
+ if (broker)
+ {
+ CurrentActor.get().message(BrokerMessages.BRK_STATS_DATA(_dataStats.getPeak() / 1024.0, _dataStats.getTotal()));
+ CurrentActor.get().message(BrokerMessages.BRK_STATS_MSGS(_messageStats.getPeak(), _messageStats.getTotal()));
+ }
+
+ if (virtualhost)
+ {
+ for (VirtualHost vhost : getVirtualHostRegistry().getVirtualHosts())
+ {
+ String name = vhost.getName();
+ StatisticsCounter data = vhost.getDataStatistics();
+ StatisticsCounter messages = vhost.getMessageStatistics();
+
+ CurrentActor.get().message(VirtualHostMessages.VHT_STATS_DATA(name, data.getPeak() / 1024.0, data.getTotal()));
+ CurrentActor.get().message(VirtualHostMessages.VHT_STATS_MSGS(name, messages.getPeak(), messages.getTotal()));
+ }
+ }
+
+ if (reset)
+ {
+ resetStatistics();
+ }
+
+ CurrentActor.remove();
+ }
+ }
+
+ _reportingTimer.scheduleAtFixedRate(new StatisticsReportingTask(),
+ report / 2,
+ report);
+ }
+ }
+
+ public void registerMessageDelivery(long messageSize, long timestamp)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messageStats.registerEvent(1L, timestamp);
+ _dataStats.registerEvent(messageSize, timestamp);
+ }
+ }
+
+ public StatisticsCounter getMessageStatistics()
+ {
+ return _messageStats;
+ }
+
+ public StatisticsCounter getDataStatistics()
+ {
+ return _dataStats;
+ }
+
+ public void resetStatistics()
+ {
+ _messageStats.reset();
+ _dataStats.reset();
+
+ for (VirtualHost vhost : _virtualHostRegistry.getVirtualHosts())
+ {
+ vhost.resetStatistics();
+ }
+ }
+
+ public void initialiseStatistics()
+ {
+ setStatisticsEnabled(getConfiguration().isStatisticsGenerationBrokerEnabled());
+ _messageStats = new StatisticsCounter("messages");
+ _dataStats = new StatisticsCounter("bytes");
+ }
+
+ public boolean isStatisticsEnabled()
+ {
+ return _statisticsEnabled;
+ }
+
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ _statisticsEnabled = enabled;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
index 85f637764a..ef1cf85b5d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.plugins.PluginManager;
import org.apache.qpid.server.security.access.ACLManager;
import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
@@ -76,6 +77,9 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
_databaseManager.initialiseManagement(_configuration);
_managedObjectRegistry.start();
+
+ initialiseStatistics();
+ initialiseStatisticsReporting();
initialiseVirtualHosts();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
index 92accb3499..10c6430012 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
@@ -34,8 +34,12 @@ import org.apache.qpid.server.security.access.ACLManager;
import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.plugins.PluginManager;
+import org.apache.qpid.server.security.access.ACLManager;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.transport.QpidAcceptor;
-import org.apache.mina.common.IoAcceptor;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
public interface IApplicationRegistry
{
@@ -81,4 +85,5 @@ public interface IApplicationRegistry
*/
void addAcceptor(InetSocketAddress bindAddress, QpidAcceptor acceptor);
+ void initialiseStatisticsReporting();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
new file mode 100644
index 0000000000..da53830083
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.stats;
+
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class collects statistics and counts the total, rate per second and
+ * peak rate per second values for the events that are registered with it.
+ */
+public class StatisticsCounter
+{
+ private static final Logger _log = LoggerFactory.getLogger(StatisticsCounter.class);
+
+ private static final String COUNTER = "counter";
+ private static final long DEFAULT_SAMPLE_PERIOD = Long.getLong("qpid.statistics.samplePeriod", 1000L); // 1s
+ private static final boolean _disable = Boolean.getBoolean("qpid.statistics.disable");
+ private static final AtomicLong _counterIds = new AtomicLong(0L);
+
+ private final AtomicLong _peak = new AtomicLong(0L);
+ private final AtomicLong _total = new AtomicLong(0L);
+ private final AtomicLong _last = new AtomicLong(0L);
+ private final AtomicLong _rate = new AtomicLong(0L);
+
+ private long _start;
+
+ private final long _period;
+ private final String _name;
+
+ public StatisticsCounter()
+ {
+ this(COUNTER);
+ }
+
+ public StatisticsCounter(String name)
+ {
+ this(name, DEFAULT_SAMPLE_PERIOD);
+ }
+
+ public StatisticsCounter(String name, long period)
+ {
+ _period = period;
+ _name = name + "-" + + _counterIds.incrementAndGet();
+ reset();
+ }
+
+ public void registerEvent()
+ {
+ registerEvent(1L);
+ }
+
+ public void registerEvent(long value)
+ {
+ registerEvent(value, System.currentTimeMillis());
+ }
+
+ public void registerEvent(long value, long timestamp)
+ {
+ if (_disable)
+ {
+ return;
+ }
+
+ long thisSample = (timestamp / _period);
+ long lastSample;
+ while (thisSample > (lastSample = _last.get()))
+ {
+ if (_last.compareAndSet(lastSample, thisSample))
+ {
+ _rate.set(0L);
+ }
+ }
+
+ _total.addAndGet(value);
+ long current = _rate.addAndGet(value);
+ long peak;
+ while (current > (peak = _peak.get()))
+ {
+ _peak.compareAndSet(peak, current);
+ }
+ }
+
+ /**
+ * Update the current rate and peak - may reset rate to zero if a new
+ * sample period has started.
+ */
+ private void update()
+ {
+ registerEvent(0L, System.currentTimeMillis());
+ }
+
+ /**
+ * Reset
+ */
+ public void reset()
+ {
+ _peak.set(0L);
+ _rate.set(0L);
+ _total.set(0L);
+ _start = System.currentTimeMillis();
+ _last.set(_start / _period);
+ }
+
+ public double getPeak()
+ {
+ return (double) _peak.get() / ((double) _period / 1000.0d);
+ }
+
+ public double getRate()
+ {
+ update();
+ return (double) _rate.get() / ((double) _period / 1000.0d);
+ }
+
+ public long getTotal()
+ {
+ return _total.get();
+ }
+
+ public long getStart()
+ {
+ return _start;
+ }
+
+ public Date getStartTime()
+ {
+ return new Date(_start);
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public long getPeriod()
+ {
+ return _period;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
new file mode 100644
index 0000000000..faaf11aff6
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.stats;
+
+/**
+ * Statistics gatherer.
+ */
+public interface StatisticsGatherer
+{
+ /**
+ *
+ * @param period
+ */
+ void initialiseStatistics();
+
+ /**
+ *
+ * @param messageSize
+ * @param timestamp
+ */
+ void registerMessageDelivery(long messageSize, long timestamp);
+
+ /**
+ *
+ * @return
+ */
+ StatisticsCounter getMessageStatistics();
+
+ /**
+ *
+ * @return
+ */
+ StatisticsCounter getDataStatistics();
+
+ /**
+ *
+ * @return
+ */
+ void resetStatistics();
+
+ /**
+ *
+ * @return
+ */
+ boolean isStatisticsEnabled();
+
+ /**
+ *
+ * @param enabled
+ */
+ void setStatisticsEnabled(boolean enabled);
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 71fcfb964d..20f0437c65 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -20,6 +20,14 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import javax.management.NotCompliantMBeanException;
+
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -28,11 +36,9 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.AMQBrokerManagerMBean;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.logging.actors.AbstractActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.configuration.ExchangeConfiguration;
import org.apache.qpid.server.configuration.QueueConfiguration;
+import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.ConnectionRegistry;
import org.apache.qpid.server.connection.IConnectionRegistry;
@@ -41,30 +47,31 @@ import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.logging.actors.AbstractActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.access.ACLManager;
import org.apache.qpid.server.security.access.Accessable;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
-import javax.management.NotCompliantMBeanException;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-
-public class VirtualHost implements Accessable
+public class VirtualHost implements Accessable, StatisticsGatherer
{
private static final Logger _logger = Logger.getLogger(VirtualHost.class);
@@ -90,6 +97,10 @@ public class VirtualHost implements Accessable
private final Timer _houseKeepingTimer;
private VirtualHostConfiguration _configuration;
+
+ private ApplicationRegistry _registry;
+ private boolean _statisticsEnabled = false;
+ private StatisticsCounter _messageStats, _dataStats;
public void setAccessableName(String name)
{
@@ -163,6 +174,8 @@ public class VirtualHost implements Accessable
{
throw new IllegalArgumentException("Illegal name (" + _name + ") for virtualhost.");
}
+
+ _registry = (ApplicationRegistry) ApplicationRegistry.getInstance();
_virtualHostMBean = new VirtualHostMBean();
@@ -227,11 +240,15 @@ public class VirtualHost implements Accessable
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
_brokerMBean.register();
- initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod());
+
+ initialiseHouseKeeping();
+ initialiseStatistics();
}
- private void initialiseHouseKeeping(long period)
+ private void initialiseHouseKeeping()
{
+ long period = _configuration.getHousekeepingExpiredMessageCheckPeriod();
+
/* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */
if (period > 0L)
{
@@ -244,7 +261,7 @@ public class VirtualHost implements Accessable
CurrentActor.set(new AbstractActor(ApplicationRegistry.getInstance().getRootMessageLogger()) {
public String getLogMessage()
{
- return "[" + Thread.currentThread().getName() + "]";
+ return "[" + Thread.currentThread().getName() + "] ";
}
});
_hkLogger.info("Starting the houseKeeping job");
@@ -472,6 +489,54 @@ public class VirtualHost implements Accessable
{
return _virtualHostMBean;
}
+
+ public void registerMessageDelivery(long messageSize, long timestamp)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messageStats.registerEvent(1L, timestamp);
+ _dataStats.registerEvent(messageSize, timestamp);
+ }
+ _registry.registerMessageDelivery(messageSize, timestamp);
+ }
+
+ public StatisticsCounter getMessageStatistics()
+ {
+ return _messageStats;
+ }
+
+ public StatisticsCounter getDataStatistics()
+ {
+ return _dataStats;
+ }
+
+ public void resetStatistics()
+ {
+ _messageStats.reset();
+ _dataStats.reset();
+
+ for (AMQProtocolSession session : _connectionRegistry.getConnections())
+ {
+ ((AMQMinaProtocolSession) session).resetStatistics();
+ }
+ }
+
+ public void initialiseStatistics()
+ {
+ setStatisticsEnabled(_registry.getConfiguration().isStatisticsGenerationVirtualhostsEnabled());
+ _messageStats = new StatisticsCounter("messages-" + getName());
+ _dataStats = new StatisticsCounter("bytes-" + getName());
+ }
+
+ public boolean isStatisticsEnabled()
+ {
+ return _statisticsEnabled;
+ }
+
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ _statisticsEnabled = enabled;
+ }
/**
* Temporary Startup RT class to record the creation of persistent queues / exchanges.
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java
new file mode 100644
index 0000000000..1e30946b01
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java
@@ -0,0 +1,135 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.stats;
+
+import junit.framework.TestCase;
+
+/**
+ * Unit tests for the {@link StatisticsCounter} class.
+ */
+public class StatisticsCounterTest extends TestCase
+{
+ /**
+ * Check that statistics counters are created correctly.
+ */
+ public void testCreate()
+ {
+ long before = System.currentTimeMillis();
+ StatisticsCounter counter = new StatisticsCounter("name", 1234L);
+ long after = System.currentTimeMillis();
+
+ assertTrue(before <= counter.getStart());
+ assertTrue(after >= counter.getStart());
+ assertTrue(counter.getName().startsWith("name-"));
+ assertEquals(1234L, counter.getPeriod());
+ }
+
+ /**
+ * Check that totals add up correctly.
+ */
+ public void testTotal()
+ {
+ StatisticsCounter counter = new StatisticsCounter("test", 1000L);
+ long start = counter.getStart();
+ for (int i = 0; i < 100; i++)
+ {
+ counter.registerEvent(i, start + i);
+ }
+ assertEquals(99 * 50, counter.getTotal()); // cf. Gauss
+ }
+
+ /**
+ * Test totals add up correctly even when messages are delivered
+ * out-of-order.
+ */
+ public void testTotalOutOfOrder()
+ {
+ StatisticsCounter counter = new StatisticsCounter("test", 1000L);
+ long start = counter.getStart();
+ assertEquals(0, counter.getTotal());
+ counter.registerEvent(10, start + 2500);
+ assertEquals(10, counter.getTotal());
+ counter.registerEvent(20, start + 1500);
+ assertEquals(30, counter.getTotal());
+ counter.registerEvent(10, start + 500);
+ assertEquals(40, counter.getTotal());
+ }
+
+ /**
+ * Test that the peak rate is reported correctly.
+ */
+ public void testPeak()
+ {
+ StatisticsCounter counter = new StatisticsCounter("test", 1000L);
+ long start = counter.getStart();
+ assertEquals(0.0, counter.getPeak());
+ counter.registerEvent(1000, start + 500);
+ assertEquals(1000.0, counter.getPeak());
+ counter.registerEvent(2000, start + 1500);
+ assertEquals(2000.0, counter.getPeak());
+ counter.registerEvent(1000, start + 2500);
+ assertEquals(2000.0, counter.getPeak());
+ }
+
+ /**
+ * Test that peak rate is reported correctly even when messages are
+ * delivered out-of-order.
+ */
+ public void testPeakOutOfOrder() throws Exception
+ {
+ StatisticsCounter counter = new StatisticsCounter("test", 1000L);
+ long start = counter.getStart();
+ assertEquals(0.0, counter.getPeak());
+ counter.registerEvent(1000, start + 2500);
+ assertEquals(1000.0, counter.getPeak());
+ counter.registerEvent(2000, start + 1500);
+ assertEquals(3000.0, counter.getPeak());
+ counter.registerEvent(1000, start + 500);
+ assertEquals(4000.0, counter.getPeak());
+ Thread.sleep(2000);
+ assertEquals(4000.0, counter.getPeak());
+ counter.registerEvent(1000, start + 500);
+ assertEquals(5000.0, counter.getPeak());
+ Thread.sleep(2000);
+ counter.registerEvent(1000);
+ assertEquals(5000.0, counter.getPeak());
+ }
+
+ /**
+ * Test the current rate is generated correctly.
+ */
+ public void testRate() throws Exception
+ {
+ StatisticsCounter counter = new StatisticsCounter("test", 1000L);
+ assertEquals(0.0, counter.getRate());
+ Thread.sleep(100);
+ counter.registerEvent(1000);
+ assertEquals(1000.0, counter.getRate());
+ Thread.sleep(1000);
+ counter.registerEvent(2000);
+ assertEquals(2000.0, counter.getRate());
+ Thread.sleep(1000);
+ counter.registerEvent(1000);
+ assertEquals(1000.0, counter.getRate());
+ Thread.sleep(1000);
+ assertEquals(0.0, counter.getRate());
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java
index 6b8201eefb..9a6c913d62 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java
@@ -71,6 +71,9 @@ public class NullApplicationRegistry extends ApplicationRegistry
_accessManager = new ACLManager(_configuration.getSecurityConfiguration(), _pluginManager, AllowAll.FACTORY);
_authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null);
+
+ initialiseStatistics();
+ initialiseStatisticsReporting();
_managedObjectRegistry = new NoopManagedObjectRegistry();
_virtualHostRegistry = new VirtualHostRegistry(this);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
index 7b7c86bb80..883018a421 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
@@ -94,6 +94,9 @@ public class TestApplicationRegistry extends ApplicationRegistry
_managedObjectRegistry = new NoopManagedObjectRegistry();
_messageStore = new TestableMemoryMessageStore();
+
+ initialiseStatistics();
+ initialiseStatisticsReporting();
_virtualHostRegistry = new VirtualHostRegistry(this);
diff --git a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java
index d02b9b89f4..46ff2c58d5 100644
--- a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java
+++ b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java
@@ -34,9 +34,8 @@ import org.apache.qpid.management.common.mbeans.annotations.MBeanOperationParame
/**
* The ManagedBroker is the management interface to expose management
* features of the Broker.
- *
- * @author Bhupendra Bhardwaj
- * @version 0.1
+ *
+ * @since Qpid JMX API 1.9
*/
public interface ManagedBroker
{
@@ -122,4 +121,62 @@ public interface ManagedBroker
impact= MBeanOperationInfo.ACTION)
void deleteQueue(@MBeanOperationParameter(name= ManagedQueue.TYPE, description="Queue Name")String queueName)
throws IOException, JMException;
+
+ /**
+ * Resets all message and data statistics for the virtual host.
+ *
+ * @since Qpid JMX API 1.9
+ */
+ @MBeanOperation(name="resetStatistics",
+ description="Resets all message and data statistics for the virtual host",
+ impact= MBeanOperationInfo.ACTION)
+ void resetStatistics() throws Exception;
+
+ /**
+ * Peak rate of messages per second for the virtual host.
+ *
+ * @since Qpid JMX API 1.9
+ */
+ @MBeanAttribute(name="peakMessageRate", description=TYPE + " Peak Message Rate")
+ double getPeakMessageRate();
+
+ /**
+ * Peak rate of bytes per second for the virtual host.
+ *
+ * @since Qpid JMX API 1.9
+ */
+ @MBeanAttribute(name="peakDataRate", description=TYPE + " Peak Data Rate")
+ double getPeakDataRate();
+
+ /**
+ * Rate of messages per second for the virtual host.
+ *
+ * @since Qpid JMX API 1.9
+ */
+ @MBeanAttribute(name="messageRate", description=TYPE + " Message Rate")
+ double getMessageRate();
+
+ /**
+ * Rate of bytes per second for the virtual host.
+ *
+ * @since Qpid JMX API 1.9
+ */
+ @MBeanAttribute(name="dataRate", description=TYPE + " Data Rate")
+ double getDataRate();
+
+ /**
+ * Total count of messages for the virtual host.
+ *
+ * @since Qpid JMX API 1.9
+ */
+ @MBeanAttribute(name="totalMessages", description=TYPE + " Total Message Count")
+ long getTotalMessages();
+
+ /**
+ * Total count of bytes for the virtual host.
+ *
+ * @since Qpid JMX API 1.9
+ */
+ @MBeanAttribute(name="totalData", description=TYPE + " Total Bytes")
+ long getTotalData();
}
diff --git a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java
index ff096aa22b..f2be759249 100644
--- a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java
+++ b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java
@@ -35,8 +35,8 @@ import org.apache.qpid.management.common.mbeans.annotations.MBeanOperationParame
/**
* The management interface exposed to allow management of Connections.
- * @author Bhupendra Bhardwaj
- * @version 0.1
+ *
+ * @since Qpid JMX API 1.9
*/
public interface ManagedConnection
{
@@ -139,4 +139,72 @@ public interface ManagedConnection
description="Closes this connection and all related channels",
impact= MBeanOperationInfo.ACTION)
void closeConnection() throws Exception;
+
+ /**
+ * Resets message and data statistics for this connection.
+ *
+ * @since Qpid JMX API 1.9
+ */
+ @MBeanOperation(name="resetStatistics",
+ description="Resets message and data statistics for this connection",
+ impact= MBeanOperationInfo.ACTION)
+ void resetStatistics() throws Exception;
+
+ /**
+ * Peak rate of messages per second on this connection.
+ *
+ * @since Qpid JMX API 1.9
+ */
+ @MBeanAttribute(name="peakMessageRate", description=TYPE + " Peak Message Rate")
+ double getPeakMessageRate();
+
+ /**
+ * Peak rate of bytes per second on this connection.
+ *
+ * @since Qpid JMX API 1.9
+ */
+ @MBeanAttribute(name="peakDataRate", description=TYPE + " Peak Data Rate")
+ double getPeakDataRate();
+
+ /**
+ * Rate of messages per second on this connection.
+ *
+ * @since Qpid JMX API 1.9
+ */
+ @MBeanAttribute(name="messageRate", description=TYPE + " Message Rate")
+ double getMessageRate();
+
+ /**
+ * Rate of bytes per second on this connection.
+ *
+ * @since Qpid JMX API 1.9
+ */
+ @MBeanAttribute(name="dataRate", description=TYPE + " Data Rate")
+ double getDataRate();
+
+ /**
+ * Total count of messages on this connection.
+ *
+ * @since Qpid JMX API 1.9
+ */
+ @MBeanAttribute(name="totalMessages", description=TYPE + " Total Message Count")
+ long getTotalMessages();
+
+ /**
+ * Total count of bytes on this connection.
+ *
+ * @since Qpid JMX API 1.9
+ */
+ @MBeanAttribute(name="totalData", description=TYPE + " Total Bytes")
+ long getTotalData();
+
+ /**
+ * Is statistics collection enabled for this connection.
+ *
+ * @since Qpid JMX API 1.9
+ */
+ @MBeanAttribute(name="statisticsEnabled", description=TYPE + " Statistics Enabled")
+ boolean isStatisticsEnabled();
+
+ void setStatisticsEnabled(boolean enabled);
}
diff --git a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java
index f61c41dea9..0cfe73e69d 100644
--- a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java
+++ b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java
@@ -22,11 +22,15 @@ package org.apache.qpid.management.common.mbeans;
import java.io.IOException;
+import javax.management.MBeanOperationInfo;
+
import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation;
/**
* Interface for the ServerInformation MBean
- * @since Qpid JMX API 1.3
+ *
+ * @since Qpid JMX API 1.9
*/
public interface ServerInformation
{
@@ -80,4 +84,62 @@ public interface ServerInformation
@MBeanAttribute(name="ProductVersion",
description = "The product version string")
String getProductVersion() throws IOException;
+
+ /**
+ * Resets all message and data statistics for the broker.
+ *
+ * @since Qpid JMX API 1.9
+ */
+ @MBeanOperation(name="resetStatistics",
+ description="Resets all message and data statistics for the broker",
+ impact= MBeanOperationInfo.ACTION)
+ void resetStatistics() throws Exception;
+
+ /**
+ * Peak rate of messages per second for the broker.
+ *
+ * @since Qpid JMX API 1.9
+ */
+ @MBeanAttribute(name="peakMessageRate", description=TYPE + " Peak Message Rate")
+ double getPeakMessageRate();
+
+ /**
+ * Peak rate of bytes per second for the broker.
+ *
+ * @since Qpid JMX API 1.9
+ */
+ @MBeanAttribute(name="peakDataRate", description=TYPE + " Peak Data Rate")
+ double getPeakDataRate();
+
+ /**
+ * Rate of messages per second for the broker.
+ *
+ * @since Qpid JMX API 1.9
+ */
+ @MBeanAttribute(name="messageRate", description=TYPE + " Message Rate")
+ double getMessageRate();
+
+ /**
+ * Rate of bytes per second for the broker.
+ *
+ * @since Qpid JMX API 1.9
+ */
+ @MBeanAttribute(name="dataRate", description=TYPE + " Data Rate")
+ double getDataRate();
+
+ /**
+ * Total count of messages for the broker.
+ *
+ * @since Qpid JMX API 1.9
+ */
+ @MBeanAttribute(name="totalMessages", description=TYPE + " Total Message Count")
+ long getTotalMessages();
+
+ /**
+ * Total count of bytes for the broker.
+ *
+ * @since Qpid JMX API 1.9
+ */
+ @MBeanAttribute(name="totalData", description=TYPE + " Total Bytes")
+ long getTotalData();
}
diff --git a/qpid/java/systests/derby.log b/qpid/java/systests/derby.log
new file mode 100644
index 0000000000..177dc5aad7
--- /dev/null
+++ b/qpid/java/systests/derby.log
@@ -0,0 +1,6 @@
+----------------------------------------------------------------
+2010-11-05 16:45:02.145 GMT:
+ Booting Derby version The Apache Software Foundation - Apache Derby - 10.3.2.1 - (599110): instance c013800d-012c-1ced-4a3a-000004b61c38
+on database directory /home/kennedya/workspaces/qpid-0.5/qpid/java/build/work/test/test
+
+Database Class Loader started - derby.database.classpath=''
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java
new file mode 100644
index 0000000000..98da330140
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java
@@ -0,0 +1,94 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.jmx;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.management.common.mbeans.ManagedBroker;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
+
+/**
+ * Test enabling generation of message statistics on a per-connection basis.
+ */
+public class MessageConnectionStatisticsTest extends MessageStatisticsTestCase
+{
+ public void configureStatistics() throws Exception
+ {
+ // no statistics generation configured
+ }
+
+ /**
+ * Test statistics on a single connection
+ */
+ public void testEnablingStatisticsPerConnection() throws Exception
+ {
+ ManagedBroker vhost = _jmxUtils.getManagedBroker("test");
+
+ sendUsing(_test, 5, 200);
+
+ List<String> addresses = new ArrayList<String>();
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+ {
+ assertEquals("Incorrect connection total", 0, mc.getTotalMessages());
+ assertEquals("Incorrect connection data", 0, mc.getTotalData());
+
+ addresses.add(mc.getRemoteAddress());
+ }
+ assertEquals("Incorrect active connection data", 0, vhost.getTotalData());
+ assertEquals("Incorrect active connection data", 0, vhost.getTotalMessages());
+
+ Connection test = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test");
+ test.start();
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+ {
+ if (addresses.contains(mc.getRemoteAddress()))
+ {
+ continue;
+ }
+ mc.setStatisticsEnabled(true);
+ assertEquals("Incorrect connection total", 0, mc.getTotalMessages());
+ assertEquals("Incorrect connection data", 0, mc.getTotalData());
+ }
+
+ sendUsing(test, 5, 200);
+ sendUsing(_test, 5, 200);
+
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+ {
+ if (addresses.contains(mc.getRemoteAddress()))
+ {
+ assertEquals("Incorrect connection total", 0, mc.getTotalMessages());
+ assertEquals("Incorrect connection data", 0, mc.getTotalData());
+ }
+ else
+ {
+ assertEquals("Incorrect connection total", 5, mc.getTotalMessages());
+ assertEquals("Incorrect connection data", 1000, mc.getTotalData());
+ }
+ }
+ assertEquals("Incorrect active connection data", 0, vhost.getTotalData());
+ assertEquals("Incorrect active connection data", 0, vhost.getTotalMessages());
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java
new file mode 100644
index 0000000000..7cb8f7a107
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java
@@ -0,0 +1,162 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.jmx;
+
+import org.apache.qpid.management.common.mbeans.ManagedBroker;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
+
+/**
+ * Test enabling generation of message statistics on a per-connection basis.
+ */
+public class MessageStatisticsConfigurationTest extends MessageStatisticsTestCase
+{
+ public void configureStatistics() throws Exception
+ {
+ setConfigurationProperty("statistics.generation.broker", Boolean.toString(getName().contains("Broker")));
+ setConfigurationProperty("statistics.generation.virtualhosts", Boolean.toString(getName().contains("Virtualhost")));
+ setConfigurationProperty("statistics.generation.connections", Boolean.toString(getName().contains("Connection")));
+ }
+
+ /**
+ * Just broker statistics.
+ */
+ public void testGenerateBrokerStatistics() throws Exception
+ {
+ sendUsing(_test, 5, 200);
+ Thread.sleep(1000);
+
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+ {
+ assertEquals("Incorrect connection total", 0, mc.getTotalMessages());
+ assertEquals("Incorrect connection data", 0, mc.getTotalData());
+ }
+
+ ManagedBroker vhost = _jmxUtils.getManagedBroker("test");
+ assertEquals("Incorrect vhost data", 0, vhost.getTotalMessages());
+ assertEquals("Incorrect vhost data", 0, vhost.getTotalData());
+
+ if (!_broker.equals(VM))
+ {
+ assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessages());
+ assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalData());
+ }
+ }
+
+ /**
+ * Just virtualhost statistics.
+ */
+ public void testGenerateVirtualhostStatistics() throws Exception
+ {
+ sendUsing(_test, 5, 200);
+ Thread.sleep(1000);
+
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+ {
+ assertEquals("Incorrect connection total", 0, mc.getTotalMessages());
+ assertEquals("Incorrect connection data", 0, mc.getTotalData());
+ }
+
+ ManagedBroker vhost = _jmxUtils.getManagedBroker("test");
+ assertEquals("Incorrect vhost data", 5, vhost.getTotalMessages());
+ assertEquals("Incorrect vhost data", 1000, vhost.getTotalData());
+
+ if (!_broker.equals(VM))
+ {
+ assertEquals("Incorrect server total messages", 0, _jmxUtils.getServerInformation().getTotalMessages());
+ assertEquals("Incorrect server total data", 0, _jmxUtils.getServerInformation().getTotalData());
+ }
+ }
+
+ /**
+ * Just connection statistics.
+ */
+ public void testGenerateConnectionStatistics() throws Exception
+ {
+ sendUsing(_test, 5, 200);
+ Thread.sleep(1000);
+
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+ {
+ assertEquals("Incorrect connection total", 5, mc.getTotalMessages());
+ assertEquals("Incorrect connection data", 1000, mc.getTotalData());
+ }
+
+ ManagedBroker vhost = _jmxUtils.getManagedBroker("test");
+ assertEquals("Incorrect vhost data", 0, vhost.getTotalMessages());
+ assertEquals("Incorrect vhost data", 0, vhost.getTotalData());
+
+ if (!_broker.equals(VM))
+ {
+ assertEquals("Incorrect server total messages", 0, _jmxUtils.getServerInformation().getTotalMessages());
+ assertEquals("Incorrect server total data", 0, _jmxUtils.getServerInformation().getTotalData());
+ }
+ }
+
+ /**
+ * Both broker and virtualhost statistics.
+ */
+ public void testGenerateBrokerAndVirtualhostStatistics() throws Exception
+ {
+ sendUsing(_test, 5, 200);
+ Thread.sleep(1000);
+
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+ {
+ assertEquals("Incorrect connection total", 0, mc.getTotalMessages());
+ assertEquals("Incorrect connection data", 0, mc.getTotalData());
+ }
+
+ ManagedBroker vhost = _jmxUtils.getManagedBroker("test");
+ assertEquals("Incorrect vhost data", 5, vhost.getTotalMessages());
+ assertEquals("Incorrect vhost data", 1000, vhost.getTotalData());
+
+ if (!_broker.equals(VM))
+ {
+ assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessages());
+ assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalData());
+ }
+ }
+
+ /**
+ * Broker, virtualhost and connection statistics.
+ */
+ public void testGenerateBrokerVirtualhostAndConnectionStatistics() throws Exception
+ {
+ sendUsing(_test, 5, 200);
+ Thread.sleep(1000);
+
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+ {
+ assertEquals("Incorrect connection total", 5, mc.getTotalMessages());
+ assertEquals("Incorrect connection data", 1000, mc.getTotalData());
+ }
+
+ ManagedBroker vhost = _jmxUtils.getManagedBroker("test");
+ assertEquals("Incorrect vhost data", 5, vhost.getTotalMessages());
+ assertEquals("Incorrect vhost data", 1000, vhost.getTotalData());
+
+ if (!_broker.equals(VM))
+ {
+ assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessages());
+ assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalData());
+ }
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java
new file mode 100644
index 0000000000..7051c426b9
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.jmx;
+
+import java.util.List;
+
+import org.apache.qpid.util.LogMonitor;
+
+/**
+ * Test generation of message statistics reporting.
+ */
+public class MessageStatisticsReportingTest extends MessageStatisticsTestCase
+{
+ protected LogMonitor _monitor;
+
+ public void configureStatistics() throws Exception
+ {
+ setConfigurationProperty("statistics.generation.broker", "true");
+ setConfigurationProperty("statistics.generation.virtualhosts", "true");
+
+ if (getName().equals("testEnabledStatisticsReporting"))
+ {
+ setConfigurationProperty("statistics.reporting.period", "10");
+ }
+
+ _monitor = new LogMonitor(_outputFile);
+ }
+
+ /**
+ * Test enabling reporting.
+ */
+ public void testEnabledStatisticsReporting() throws Exception
+ {
+ sendUsing(_test, 10, 100);
+ sendUsing(_dev, 20, 100);
+ sendUsing(_local, 15, 100);
+
+ Thread.sleep(10 * 1000); // 15s
+
+ List<String> brokerStatsData = _monitor.findMatches("BRK-1008");
+ List<String> brokerStatsMessages = _monitor.findMatches("BRK-1009");
+ List<String> vhostStatsData = _monitor.findMatches("VHT-1003");
+ List<String> vhostStatsMessages = _monitor.findMatches("VHT-1004");
+
+ assertEquals("Incorrect number of broker data stats log messages", 1, brokerStatsData.size());
+ assertEquals("Incorrect number of broker message stats log messages", 1, brokerStatsMessages.size());
+ assertEquals("Incorrect number of virtualhost data stats log messages", 3, vhostStatsData.size());
+ assertEquals("Incorrect number of virtualhost message stats log messages", 3, vhostStatsMessages.size());
+ }
+
+ /**
+ * Test not enabling reporting.
+ */
+ public void testNotEnabledStatisticsReporting() throws Exception
+ {
+ sendUsing(_test, 10, 100);
+ sendUsing(_dev, 20, 100);
+ sendUsing(_local, 15, 100);
+
+ Thread.sleep(10 * 1000); // 15s
+
+ List<String> brokerStatsData = _monitor.findMatches("BRK-1008");
+ List<String> brokerStatsMessages = _monitor.findMatches("BRK-1009");
+ List<String> vhostStatsData = _monitor.findMatches("VHT-1003");
+ List<String> vhostStatsMessages = _monitor.findMatches("VHT-1004");
+
+ assertEquals("Incorrect number of broker data stats log messages", 0, brokerStatsData.size());
+ assertEquals("Incorrect number of broker message stats log messages", 0, brokerStatsMessages.size());
+ assertEquals("Incorrect number of virtualhost data stats log messages", 0, vhostStatsData.size());
+ assertEquals("Incorrect number of virtualhost message stats log messages", 0, vhostStatsMessages.size());
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java
new file mode 100644
index 0000000000..e97fd1eec9
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java
@@ -0,0 +1,211 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.jmx;
+
+import javax.jms.Connection;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.management.common.mbeans.ManagedBroker;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
+
+/**
+ * Test generation of message statistics.
+ */
+public class MessageStatisticsTest extends MessageStatisticsTestCase
+{
+ public void configureStatistics() throws Exception
+ {
+ setConfigurationProperty("statistics.generation.broker", "true");
+ setConfigurationProperty("statistics.generation.virtualhosts", "true");
+ setConfigurationProperty("statistics.generation.connections", "true");
+ }
+
+ /**
+ * Test message totals.
+ */
+ public void testMessageTotals() throws Exception
+ {
+ sendUsing(_test, 10, 100);
+ sendUsing(_dev, 20, 100);
+ sendUsing(_local, 5, 100);
+ sendUsing(_local, 5, 100);
+ sendUsing(_local, 5, 100);
+
+ ManagedBroker test = _jmxUtils.getManagedBroker("test");
+ ManagedBroker dev = _jmxUtils.getManagedBroker("development");
+ ManagedBroker local = _jmxUtils.getManagedBroker("localhost");
+
+ long total = 0;
+ long data = 0;
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("*"))
+ {
+ total += mc.getTotalMessages();
+ data += mc.getTotalData();
+ }
+ assertEquals("Incorrect connection total", 45, total);
+ assertEquals("Incorrect connection data", 45 * 100, data);
+ if (!_broker.equals(VM))
+ {
+ assertEquals("Incorrect server total", 45, _jmxUtils.getServerInformation().getTotalMessages());
+ assertEquals("Incorrect server data", 45 * 100, _jmxUtils.getServerInformation().getTotalData());
+ }
+
+ long testTotal = 0;
+ long testData = 0;
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+ {
+ testTotal += mc.getTotalMessages();
+ testData += mc.getTotalData();
+ }
+ assertEquals("Incorrect test connection total", 10, testTotal);
+ assertEquals("Incorrect test vhost total", 10, test.getTotalMessages());
+ assertEquals("Incorrect test connection data", 10 * 100, testData);
+ assertEquals("Incorrect test vhost data", 10 * 100, test.getTotalData());
+
+ long devTotal = 0;
+ long devData = 0;
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("development"))
+ {
+ devTotal += mc.getTotalMessages();
+ devData += mc.getTotalData();
+ }
+ assertEquals("Incorrect test connection total", 20, devTotal);
+ assertEquals("Incorrect development total", 20, dev.getTotalMessages());
+ assertEquals("Incorrect test connection data", 20 * 100, devData);
+ assertEquals("Incorrect development data", 20 * 100, dev.getTotalData());
+
+ long localTotal = 0;
+ long localData = 0;
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("localhost"))
+ {
+ localTotal += mc.getTotalMessages();
+ localData += mc.getTotalData();
+ }
+ assertEquals("Incorrect test connection total", 15, localTotal);
+ assertEquals("Incorrect localhost total", 15, local.getTotalMessages());
+ assertEquals("Incorrect test connection data", 15 * 100, localData);
+ assertEquals("Incorrect localhost data", 15 * 100, local.getTotalData());
+ }
+
+ /**
+ * Test message totals when a connection is closed.
+ */
+ public void testMessageTotalsWithClosedConnections() throws Exception
+ {
+ Connection temp = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test");
+ temp.start();
+
+ sendUsing(_test, 10, 100);
+ sendUsing(temp, 10, 100);
+ sendUsing(_test, 10, 100);
+
+ temp.close();
+
+ ManagedBroker test = _jmxUtils.getManagedBroker("test");
+
+ long total = 0;
+ long data = 0;
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("*"))
+ {
+ total += mc.getTotalMessages();
+ data += mc.getTotalData();
+ }
+ assertEquals("Incorrect active connection total", 20, total);
+ assertEquals("Incorrect active connection data", 20 * 100, data);
+ if (!_broker.equals(VM))
+ {
+ assertEquals("Incorrect server total", 30, _jmxUtils.getServerInformation().getTotalMessages());
+ assertEquals("Incorrect server data", 30 * 100, _jmxUtils.getServerInformation().getTotalData());
+ }
+
+ long testTotal = 0;
+ long testData = 0;
+ for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
+ {
+ testTotal += mc.getTotalMessages();
+ testData += mc.getTotalData();
+ }
+ assertEquals("Incorrect test active connection total", 20, testTotal);
+ assertEquals("Incorrect test vhost total", 30, test.getTotalMessages());
+ assertEquals("Incorrect test active connection data", 20 * 100, testData);
+ assertEquals("Incorrect test vhost data", 30 * 100, test.getTotalData());
+ }
+
+ /**
+ * Test message peak rate generation.
+ */
+ public void testMessagePeakRates() throws Exception
+ {
+ sendUsing(_test, 1, 10000);
+ Thread.sleep(10 * 1000);
+ sendUsing(_dev, 10, 10);
+
+ ManagedBroker test = _jmxUtils.getManagedBroker("test");
+ ManagedBroker dev = _jmxUtils.getManagedBroker("development");
+
+ assertEquals("Incorrect test vhost peak messages", 1.0d, test.getPeakMessageRate());
+ assertEquals("Incorrect test vhost peak data", 10000.0d, test.getPeakDataRate());
+ assertEquals("Incorrect dev vhost peak messages", 10.0d, dev.getPeakMessageRate());
+ assertEquals("Incorrect dev vhost peak data", 100.0d, dev.getPeakDataRate());
+
+ if (!_broker.equals(VM))
+ {
+ assertEquals("Incorrect server peak messages", 10.0d, _jmxUtils.getServerInformation().getPeakMessageRate());
+ assertEquals("Incorrect server peak data", 10000.0d, _jmxUtils.getServerInformation().getPeakDataRate());
+ }
+ }
+
+ /**
+ * Test message totals when a vhost has its statistics reset
+ */
+ public void testMessageTotalVhostReset() throws Exception
+ {
+ sendUsing(_test, 10, 10);
+ sendUsing(_dev, 10, 10);
+
+ ManagedBroker test = _jmxUtils.getManagedBroker("test");
+ ManagedBroker dev = _jmxUtils.getManagedBroker("development");
+
+ assertEquals("Incorrect test vhost total messages", 10, test.getTotalMessages());
+ assertEquals("Incorrect test vhost total data", 100, test.getTotalData());
+ assertEquals("Incorrect dev vhost total messages", 10, dev.getTotalMessages());
+ assertEquals("Incorrect dev vhost total data", 100, dev.getTotalData());
+
+ if (!_broker.equals(VM))
+ {
+ assertEquals("Incorrect server total messages", 20, _jmxUtils.getServerInformation().getTotalMessages());
+ assertEquals("Incorrect server total data", 200, _jmxUtils.getServerInformation().getTotalData());
+ }
+
+ test.resetStatistics();
+
+ assertEquals("Incorrect test vhost total messages", 0, test.getTotalMessages());
+ assertEquals("Incorrect test vhost total data", 0, test.getTotalData());
+ assertEquals("Incorrect dev vhost total messages", 10, dev.getTotalMessages());
+ assertEquals("Incorrect dev vhost total data", 100, dev.getTotalData());
+
+ if (!_broker.equals(VM))
+ {
+ assertEquals("Incorrect server total messages", 20, _jmxUtils.getServerInformation().getTotalMessages());
+ assertEquals("Incorrect server total data", 200, _jmxUtils.getServerInformation().getTotalData());
+ }
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java
new file mode 100644
index 0000000000..4df1b6e8b6
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.jmx;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+/**
+ * Test generation of message statistics.
+ */
+public abstract class MessageStatisticsTestCase extends QpidTestCase
+{
+ protected static final String USER = "admin";
+
+ protected JMXTestUtils _jmxUtils;
+ protected Connection _test, _dev, _local;
+ protected Destination _queue;
+ protected String _brokerUrl;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ _jmxUtils = new JMXTestUtils(this, USER, USER);
+ _jmxUtils.setUp();
+
+ configureStatistics();
+
+ super.setUp();
+
+ _queue = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, "queue");
+
+ _brokerUrl = getBroker().toString();
+ _test = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test");
+ _dev = new AMQConnection(_brokerUrl, USER, USER, "clientid", "development");
+ _local = new AMQConnection(_brokerUrl, USER, USER, "clientid", "localhost");
+
+ _test.start();
+ _dev.start();
+ _local.start();
+
+ _jmxUtils.open();
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ _jmxUtils.close();
+
+ _test.close();
+ _dev.close();
+ _local.close();
+
+ super.tearDown();
+ }
+
+ /**
+ * Configure statistics generation properties on the broker.
+ */
+ public abstract void configureStatistics() throws Exception;
+
+ protected void sendUsing(Connection con, int number, int size) throws Exception
+ {
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(_queue);
+ String content = new String(new byte[size]);
+ TextMessage msg = session.createTextMessage(content);
+ for (int i = 0; i < number; i++)
+ {
+ producer.send(msg);
+ }
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java
index 5d8ee785ec..e85c53e1a8 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java
@@ -92,7 +92,7 @@ public class ConnectionCloseTest extends QpidTestCase
assertTrue("Spurious thread creation exceeded threshold, " +
delta.size() + " threads created.",
- delta.size() < 10);
+ delta.size() < 20);
}
private void dumpStacks(Map<Thread,StackTraceElement[]> map)
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
index ca59a0536b..81f7f07936 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
@@ -24,7 +24,9 @@ import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.commands.objects.AllObjects;
import org.apache.qpid.management.common.JMXConnnectionFactory;
import org.apache.qpid.management.common.mbeans.ManagedBroker;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
import org.apache.qpid.management.common.mbeans.ManagedExchange;
+import org.apache.qpid.management.common.mbeans.ServerInformation;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -33,8 +35,12 @@ import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
+import junit.framework.TestCase;
+
/**
*
*/
@@ -57,6 +63,7 @@ public class JMXTestUtils
public void setUp() throws IOException, ConfigurationException, Exception
{
_test.setConfigurationProperty("management.enabled", "true");
+ _test.setSystemProperty("qpid.management.disableCustomSocketFactory", "true");
}
public void open() throws Exception
@@ -209,4 +216,44 @@ public class JMXTestUtils
newProxyInstance(_mbsc, getExchangeObjectName("test", exchangeName),
ManagedExchange.class, false);
}
+
+ /**
+ * Retrive {@link ServerInformation} JMX MBean.
+ */
+ public ServerInformation getServerInformation()
+ {
+ // Get the name of the test manager
+ AllObjects allObject = new AllObjects(_mbsc);
+ allObject.querystring = "org.apache.qpid:type=ServerInformation,name=ServerInformation,*";
+
+ Set<ObjectName> objectNames = allObject.returnObjects();
+
+ TestCase.assertNotNull("Null ObjectName Set returned", objectNames);
+ TestCase.assertEquals("Incorrect number of objects returned", 1, objectNames.size());
+
+ // We have verified we have only one value in objectNames so return it
+ return getManagedObject(ServerInformation.class, objectNames.iterator().next());
+ }
+
+ /**
+ * Retrive all {@link ManagedConnection} objects.
+ */
+ public List<ManagedConnection> getManagedConnections(String vhost)
+ {
+ // Get the name of the test manager
+ AllObjects allObject = new AllObjects(_mbsc);
+ allObject.querystring = "org.apache.qpid:type=VirtualHost.Connection,VirtualHost=" + vhost + ",name=*";
+
+ Set<ObjectName> objectNames = allObject.returnObjects();
+
+ TestCase.assertNotNull("Null ObjectName Set returned", objectNames);
+
+ // Collect all the connection objects
+ List<ManagedConnection> connections = new ArrayList<ManagedConnection>();
+ for (ObjectName name : objectNames)
+ {
+ connections.add(getManagedObject(ManagedConnection.class, name));
+ }
+ return connections;
+ }
}