From 20092d63916ebe4d6067190c21db5a0b95552457 Mon Sep 17 00:00:00 2001 From: Andrew Donald Kennedy Date: Mon, 8 Nov 2010 17:05:08 +0000 Subject: QPID-2932: Add statistics generation for broker message delivery git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1032643 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/server/AMQBrokerManagerMBean.java | 63 +++++- .../java/org/apache/qpid/server/AMQChannel.java | 3 + .../src/main/java/org/apache/qpid/server/Main.java | 2 +- .../server/configuration/ServerConfiguration.java | 30 +++ .../management/ServerInformationMBean.java | 60 +++++- .../server/logging/messages/LogMessages.properties | 4 + .../management/JMXManagedObjectRegistry.java | 14 +- .../server/protocol/AMQMinaProtocolSession.java | 59 +++++- .../qpid/server/protocol/AMQProtocolSession.java | 1 + .../server/protocol/AMQProtocolSessionMBean.java | 47 ++++- .../qpid/server/registry/ApplicationRegistry.java | 126 +++++++++++- .../ConfigurationFileApplicationRegistry.java | 4 + .../qpid/server/registry/IApplicationRegistry.java | 7 +- .../qpid/server/stats/StatisticsCounter.java | 159 ++++++++++++++++ .../qpid/server/stats/StatisticsGatherer.java | 68 +++++++ .../qpid/server/virtualhost/VirtualHost.java | 95 ++++++++-- .../qpid/server/stats/StatisticsCounterTest.java | 135 +++++++++++++ .../qpid/server/util/NullApplicationRegistry.java | 3 + .../qpid/server/util/TestApplicationRegistry.java | 3 + .../management/common/mbeans/ManagedBroker.java | 63 +++++- .../common/mbeans/ManagedConnection.java | 72 ++++++- .../common/mbeans/ServerInformation.java | 64 ++++++- qpid/java/systests/derby.log | 6 + .../jmx/MessageConnectionStatisticsTest.java | 94 +++++++++ .../jmx/MessageStatisticsConfigurationTest.java | 162 ++++++++++++++++ .../jmx/MessageStatisticsReportingTest.java | 90 +++++++++ .../qpid/management/jmx/MessageStatisticsTest.java | 211 +++++++++++++++++++++ .../management/jmx/MessageStatisticsTestCase.java | 99 ++++++++++ .../client/connection/ConnectionCloseTest.java | 2 +- .../org/apache/qpid/test/utils/JMXTestUtils.java | 47 +++++ 30 files changed, 1747 insertions(+), 46 deletions(-) create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java create mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java create mode 100644 qpid/java/systests/derby.log create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index d2eea55df2..d6720684fa 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -39,6 +39,7 @@ package org.apache.qpid.server; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; @@ -57,16 +58,19 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.exchange.ExchangeType; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.ManagementActor; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.protocol.AMQMinaProtocolSession; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.AMQProtocolSessionMBean; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.AMQQueueMBean; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.ManagementActor; /** * This MBean implements the broker management interface and exposes the @@ -79,6 +83,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr private final ExchangeRegistry _exchangeRegistry; private final ExchangeFactory _exchangeFactory; private final MessageStore _messageStore; + private final VirtualHost _virtualHost; private final VirtualHost.VirtualHostMBean _virtualHostMBean; @@ -88,12 +93,12 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr super(ManagedBroker.class, ManagedBroker.TYPE); _virtualHostMBean = virtualHostMBean; - VirtualHost virtualHost = virtualHostMBean.getVirtualHost(); + _virtualHost = virtualHostMBean.getVirtualHost(); - _queueRegistry = virtualHost.getQueueRegistry(); - _exchangeRegistry = virtualHost.getExchangeRegistry(); - _messageStore = virtualHost.getMessageStore(); - _exchangeFactory = virtualHost.getExchangeFactory(); + _queueRegistry = _virtualHost.getQueueRegistry(); + _exchangeRegistry = _virtualHost.getExchangeRegistry(); + _messageStore = _virtualHost.getMessageStore(); + _exchangeFactory = _virtualHost.getExchangeFactory(); } public String getObjectInstanceName() @@ -348,4 +353,46 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr { return getObjectNameForSingleInstanceMBean(); } -} // End of MBean class + + public void resetStatistics() throws Exception + { + _virtualHost.getMessageStatistics().reset(); + _virtualHost.getDataStatistics().reset(); + + Collection connections = _virtualHost.getConnectionRegistry().getConnections(); + for (AMQProtocolSession con : connections) + { + ((AMQProtocolSessionMBean) ((AMQMinaProtocolSession) con).getManagedObject()).resetStatistics(); + } + } + + public double getPeakMessageRate() + { + return _virtualHost.getMessageStatistics().getPeak(); + } + + public double getPeakDataRate() + { + return _virtualHost.getDataStatistics().getPeak(); + } + + public double getMessageRate() + { + return _virtualHost.getMessageStatistics().getRate(); + } + + public double getDataRate() + { + return _virtualHost.getDataStatistics().getRate(); + } + + public long getTotalMessages() + { + return _virtualHost.getMessageStatistics().getTotal(); + } + + public long getTotalData() + { + return _virtualHost.getDataStatistics().getTotal(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 4757faa9f6..116c425a99 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -206,6 +206,9 @@ public class AMQChannel // check and deliver if header says body length is zero if (_currentMessage.allContentReceived()) { + long bodySize = _currentMessage.getContentHeaderBody().bodySize; + long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeaderBody().properties).getTimestamp(); + _session.registerMessageDelivery(bodySize, timestamp); try { _currentMessage.deliverToQueues(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java index c79e4d5eb6..bc129a2be5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -291,7 +291,7 @@ public class Main configMBean.register(); ServerInformationMBean sysInfoMBean = - new ServerInformationMBean(QpidProperties.getBuildVersion(), QpidProperties.getReleaseVersion()); + new ServerInformationMBean((ApplicationRegistry) ApplicationRegistry.getInstance()); sysInfoMBean.register(); //fixme .. use QpidProperties.getVersionString when we have fixed the classpath issues diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index 2adb01316b..8e00539025 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -664,4 +664,34 @@ public class ServerConfiguration implements SignalHandler getConfig().getLong("housekeeping.expiredMessageCheckPeriod", DEFAULT_HOUSEKEEPING_PERIOD)); } + + public long getStatisticsSamplePeriod() + { + return getConfig().getLong("statistics.sample.period", 5000L); + } + + public boolean isStatisticsGenerationBrokerEnabled() + { + return getConfig().getBoolean("statistics.generation.broker", false); + } + + public boolean isStatisticsGenerationVirtualhostsEnabled() + { + return getConfig().getBoolean("statistics.generation.virtualhosts", false); + } + + public boolean isStatisticsGenerationConnectionsEnabled() + { + return getConfig().getBoolean("statistics.generation.connections", false); + } + + public long getStatisticsReportingPeriod() + { + return getConfig().getLong("statistics.reporting.period", 0L); + } + + public boolean isStatisticsReportResetEnabled() + { + return getConfig().getBoolean("statistics.reporting.reset", false); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java index db2cc970b2..81c9b37871 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java @@ -21,12 +21,17 @@ package org.apache.qpid.server.information.management; import java.io.IOException; +import java.util.Collection; +import javax.management.JMException; + +import org.apache.qpid.common.QpidProperties; import org.apache.qpid.management.common.mbeans.ServerInformation; import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; +import org.apache.qpid.server.AMQBrokerManagerMBean; import org.apache.qpid.server.management.AMQManagedObject; - -import javax.management.JMException; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; /** MBean class for the ServerInformationMBean. */ @MBeanDescription("Server Information Interface") @@ -34,12 +39,15 @@ public class ServerInformationMBean extends AMQManagedObject implements ServerIn { private String buildVersion; private String productVersion; + private ApplicationRegistry registry; - public ServerInformationMBean(String buildVersion, String productVersion) throws JMException + public ServerInformationMBean(ApplicationRegistry applicationRegistry) throws JMException { super(ServerInformation.class, ServerInformation.TYPE); - this.buildVersion = buildVersion; - this.productVersion = productVersion; + + registry = applicationRegistry; + buildVersion = QpidProperties.getBuildVersion(); + productVersion = QpidProperties.getReleaseVersion(); } public String getObjectInstanceName() @@ -67,5 +75,45 @@ public class ServerInformationMBean extends AMQManagedObject implements ServerIn return productVersion; } - + public void resetStatistics() throws Exception + { + registry.getDataStatistics().reset(); + registry.getMessageStatistics().reset(); + + Collection virtualhosts = registry.getVirtualHostRegistry().getVirtualHosts(); + for (VirtualHost vhost : virtualhosts) + { + ((AMQBrokerManagerMBean) vhost.getBrokerMBean()).resetStatistics(); + } + } + + public double getPeakMessageRate() + { + return registry.getMessageStatistics().getPeak(); + } + + public double getPeakDataRate() + { + return registry.getDataStatistics().getPeak(); + } + + public double getMessageRate() + { + return registry.getMessageStatistics().getRate(); + } + + public double getDataRate() + { + return registry.getDataStatistics().getRate(); + } + + public long getTotalMessages() + { + return registry.getMessageStatistics().getTotal(); + } + + public long getTotalData() + { + return registry.getDataStatistics().getTotal(); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties index 7f6ec704ce..636b7bff17 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties @@ -222,6 +222,8 @@ BRK_STOPPED = BRK-1005 : Stopped BRK_CONFIG = BRK-1006 : Using configuration : {0} # 0 - path BRK_LOG_CONFIG = BRK-1007 : Using logging configuration : {0} +BRK_STATS_DATA = BRK-1008 : {0,number,#.###} kB/s, {1,number,#} bytes +BRK_STATS_MSGS = BRK-1009 : {0,number,#.###} msg/s, {1,number,#} msgs #ManagementConsole MNG_STARTUP = MNG-1001 : Startup @@ -243,6 +245,8 @@ MNG_CLOSE = MNG-1008 : Close # 0 - name VHT_CREATED = VHT-1001 : Created : {0} VHT_CLOSED = VHT-1002 : Closed +VHT_STATS_DATA = VHT-1003 : {0} : {1,number,#.###} kB/s, {2,number,#} bytes +VHT_STATS_MSGS = VHT-1004 : {0} : {1,number,#.###} msg/s, {2,number,#} msgs #MessageStore # 0 - name diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java index 3b716cb858..ff4b459ec6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java @@ -98,8 +98,9 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry public void start() throws IOException, ConfigurationException { - CurrentActor.get().message(ManagementConsoleMessages.MNG_STARTUP()); + + boolean disableCustomSocketFactory = Boolean.getBoolean("qpid.management.disableCustomSocketFactory"); //check if system properties are set to use the JVM's out-of-the-box JMXAgent if (areOutOfTheBoxJMXOptionsSet()) @@ -225,8 +226,15 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry * As a result, only binds made using the object reference will succeed, thus securing it from external change. */ System.setProperty("java.rmi.server.randomIDs", "true"); - _rmiRegistry = LocateRegistry.createRegistry(port, null, new CustomRMIServerSocketFactory()); - + if (disableCustomSocketFactory) + { + _rmiRegistry = LocateRegistry.createRegistry(port, null, null); + } + else + { + _rmiRegistry = LocateRegistry.createRegistry(port, null, new CustomRMIServerSocketFactory()); + } + /* * We must now create the RMI ConnectorServer manually, as the JMX Factory methods use RMI calls * to bind the ConnectorServer to the registry, which will now fail as for security we have diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 93fc50a2bd..e758febc3c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -78,13 +78,16 @@ import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.transport.Sender; -public class AMQMinaProtocolSession implements AMQProtocolSession, Managable +public class AMQMinaProtocolSession implements AMQProtocolSession, Managable, StatisticsGatherer { private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class); @@ -148,6 +151,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable private LogSubject _logSubject; private final AtomicBoolean _closing = new AtomicBoolean(false); + + private ApplicationRegistry _registry; + private boolean _statisticsEnabled = false; + private StatisticsCounter _messageStats, _dataStats; public ManagedObject getManagedObject() { @@ -162,8 +169,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable session.setAttachment(this); _codecFactory = codecFactory; + _registry = virtualHostRegistry.getApplicationRegistry(); - _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); + _actor = new AMQPConnectionActor(this, _registry.getRootMessageLogger()); _actor.message(ConnectionMessages.CON_OPEN(null, null, false, false)); @@ -178,8 +186,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable { e.printStackTrace(); throw e; - } + + initialiseStatistics(); } private AMQProtocolSessionMBean createMBean() throws AMQException @@ -944,4 +953,48 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable { // No-op, interface munging between this and AMQProtocolSession } + + public void registerMessageDelivery(long messageSize, long timestamp) + { + if (isStatisticsEnabled()) + { + _logger.info("=== STATS === register " + messageSize); + _messageStats.registerEvent(1L, timestamp); + _dataStats.registerEvent(messageSize, timestamp); + } + _virtualHost.registerMessageDelivery(messageSize, timestamp); + } + + public StatisticsCounter getMessageStatistics() + { + return _messageStats; + } + + public StatisticsCounter getDataStatistics() + { + return _dataStats; + } + + public void resetStatistics() + { + _messageStats.reset(); + _dataStats.reset(); + } + + public void initialiseStatistics() + { + setStatisticsEnabled(_registry.getConfiguration().isStatisticsGenerationConnectionsEnabled()); + _messageStats = new StatisticsCounter("messages-" + getSessionID()); + _dataStats = new StatisticsCounter("bytes-" + getSessionID()); + } + + public boolean isStatisticsEnabled() + { + return _statisticsEnabled; + } + + public void setStatisticsEnabled(boolean enabled) + { + _statisticsEnabled = enabled; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 6838a16182..ba110c4826 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -221,4 +221,5 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession public ProtocolSessionIdentifier getSessionIdentifier(); + public void registerMessageDelivery(long messageSize, long timestamp); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java index 91d21c57d8..28f87fb585 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java @@ -341,4 +341,49 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed _broadcaster.sendNotification(n); } -} // End of MBean class + public void resetStatistics() throws Exception + { + _session.getMessageStatistics().reset(); + _session.getDataStatistics().reset(); + } + + public double getPeakMessageRate() + { + return _session.getMessageStatistics().getPeak(); + } + + public double getPeakDataRate() + { + return _session.getDataStatistics().getPeak(); + } + + public double getMessageRate() + { + return _session.getMessageStatistics().getRate(); + } + + public double getDataRate() + { + return _session.getDataStatistics().getRate(); + } + + public long getTotalMessages() + { + return _session.getMessageStatistics().getTotal(); + } + + public long getTotalData() + { + return _session.getDataStatistics().getTotal(); + } + + public boolean isStatisticsEnabled() + { + return _session.isStatisticsEnabled(); + } + + public void setStatisticsEnabled(boolean enabled) + { + _session.setStatisticsEnabled(enabled); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 737ac4450a..4b98a408a7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -23,19 +23,28 @@ package org.apache.qpid.server.registry; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.management.ManagedObjectRegistry; import org.apache.qpid.server.plugins.PluginManager; +import org.apache.qpid.server.protocol.AMQMinaProtocolSession; +import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.security.access.ACLManager; import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.server.logging.RootMessageLogger; import org.apache.qpid.server.logging.messages.BrokerMessages; +import org.apache.qpid.server.logging.messages.ConnectionMessages; +import org.apache.qpid.server.logging.messages.VirtualHostMessages; +import org.apache.qpid.server.logging.actors.AbstractActor; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.transport.QpidAcceptor; @@ -45,7 +54,7 @@ import org.apache.qpid.server.transport.QpidAcceptor; *

* Subclasses should handle the construction of the "registered objects" such as the exchange registry. */ -public abstract class ApplicationRegistry implements IApplicationRegistry +public abstract class ApplicationRegistry implements IApplicationRegistry, StatisticsGatherer { protected static final Logger _logger = Logger.getLogger(ApplicationRegistry.class); @@ -75,6 +84,10 @@ public abstract class ApplicationRegistry implements IApplicationRegistry protected RootMessageLogger _rootMessageLogger; + protected Timer _reportingTimer; + protected boolean _statisticsEnabled = false; + protected StatisticsCounter _messageStats, _dataStats; + static { Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService())); @@ -220,6 +233,12 @@ public abstract class ApplicationRegistry implements IApplicationRegistry _logger.info("Shutting down ApplicationRegistry:"+this); } + //Stop Statistics Reporting + if (_reportingTimer != null) + { + _reportingTimer.cancel(); + } + //Stop incomming connections unbind(); @@ -317,4 +336,109 @@ public abstract class ApplicationRegistry implements IApplicationRegistry return _rootMessageLogger; } + public void initialiseStatisticsReporting() + { + long report = _configuration.getStatisticsReportingPeriod() * 1000; // convert to ms + final boolean broker = _configuration.isStatisticsGenerationBrokerEnabled(); + final boolean virtualhost = _configuration.isStatisticsGenerationVirtualhostsEnabled(); + final boolean reset = _configuration.isStatisticsReportResetEnabled(); + + /* add a timer task to report statistics if generation is enabled for broker or virtualhosts */ + if (report > 0L && (broker || virtualhost)) + { + _reportingTimer = new Timer("Statistics-Reporting", true); + + class StatisticsReportingTask extends TimerTask + { + Logger _srLogger = Logger.getLogger(StatisticsReportingTask.class); + + public void run() + { + CurrentActor.set(new AbstractActor(ApplicationRegistry.getInstance().getRootMessageLogger()) { + public String getLogMessage() + { + return "[" + Thread.currentThread().getName() + "] "; + } + }); + + if (broker) + { + CurrentActor.get().message(BrokerMessages.BRK_STATS_DATA(_dataStats.getPeak() / 1024.0, _dataStats.getTotal())); + CurrentActor.get().message(BrokerMessages.BRK_STATS_MSGS(_messageStats.getPeak(), _messageStats.getTotal())); + } + + if (virtualhost) + { + for (VirtualHost vhost : getVirtualHostRegistry().getVirtualHosts()) + { + String name = vhost.getName(); + StatisticsCounter data = vhost.getDataStatistics(); + StatisticsCounter messages = vhost.getMessageStatistics(); + + CurrentActor.get().message(VirtualHostMessages.VHT_STATS_DATA(name, data.getPeak() / 1024.0, data.getTotal())); + CurrentActor.get().message(VirtualHostMessages.VHT_STATS_MSGS(name, messages.getPeak(), messages.getTotal())); + } + } + + if (reset) + { + resetStatistics(); + } + + CurrentActor.remove(); + } + } + + _reportingTimer.scheduleAtFixedRate(new StatisticsReportingTask(), + report / 2, + report); + } + } + + public void registerMessageDelivery(long messageSize, long timestamp) + { + if (isStatisticsEnabled()) + { + _messageStats.registerEvent(1L, timestamp); + _dataStats.registerEvent(messageSize, timestamp); + } + } + + public StatisticsCounter getMessageStatistics() + { + return _messageStats; + } + + public StatisticsCounter getDataStatistics() + { + return _dataStats; + } + + public void resetStatistics() + { + _messageStats.reset(); + _dataStats.reset(); + + for (VirtualHost vhost : _virtualHostRegistry.getVirtualHosts()) + { + vhost.resetStatistics(); + } + } + + public void initialiseStatistics() + { + setStatisticsEnabled(getConfiguration().isStatisticsGenerationBrokerEnabled()); + _messageStats = new StatisticsCounter("messages"); + _dataStats = new StatisticsCounter("bytes"); + } + + public boolean isStatisticsEnabled() + { + return _statisticsEnabled; + } + + public void setStatisticsEnabled(boolean enabled) + { + _statisticsEnabled = enabled; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java index 85f637764a..ef1cf85b5d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.plugins.PluginManager; import org.apache.qpid.server.security.access.ACLManager; import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalDatabaseManager; import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; +import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; @@ -76,6 +77,9 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry _databaseManager.initialiseManagement(_configuration); _managedObjectRegistry.start(); + + initialiseStatistics(); + initialiseStatisticsReporting(); initialiseVirtualHosts(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java index 92accb3499..10c6430012 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java @@ -34,8 +34,12 @@ import org.apache.qpid.server.security.access.ACLManager; import org.apache.qpid.server.security.access.ACLPlugin; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.plugins.PluginManager; +import org.apache.qpid.server.security.access.ACLManager; +import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.transport.QpidAcceptor; -import org.apache.mina.common.IoAcceptor; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; public interface IApplicationRegistry { @@ -81,4 +85,5 @@ public interface IApplicationRegistry */ void addAcceptor(InetSocketAddress bindAddress, QpidAcceptor acceptor); + void initialiseStatisticsReporting(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java new file mode 100644 index 0000000000..da53830083 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.stats; + +import java.util.Date; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class collects statistics and counts the total, rate per second and + * peak rate per second values for the events that are registered with it. + */ +public class StatisticsCounter +{ + private static final Logger _log = LoggerFactory.getLogger(StatisticsCounter.class); + + private static final String COUNTER = "counter"; + private static final long DEFAULT_SAMPLE_PERIOD = Long.getLong("qpid.statistics.samplePeriod", 1000L); // 1s + private static final boolean _disable = Boolean.getBoolean("qpid.statistics.disable"); + private static final AtomicLong _counterIds = new AtomicLong(0L); + + private final AtomicLong _peak = new AtomicLong(0L); + private final AtomicLong _total = new AtomicLong(0L); + private final AtomicLong _last = new AtomicLong(0L); + private final AtomicLong _rate = new AtomicLong(0L); + + private long _start; + + private final long _period; + private final String _name; + + public StatisticsCounter() + { + this(COUNTER); + } + + public StatisticsCounter(String name) + { + this(name, DEFAULT_SAMPLE_PERIOD); + } + + public StatisticsCounter(String name, long period) + { + _period = period; + _name = name + "-" + + _counterIds.incrementAndGet(); + reset(); + } + + public void registerEvent() + { + registerEvent(1L); + } + + public void registerEvent(long value) + { + registerEvent(value, System.currentTimeMillis()); + } + + public void registerEvent(long value, long timestamp) + { + if (_disable) + { + return; + } + + long thisSample = (timestamp / _period); + long lastSample; + while (thisSample > (lastSample = _last.get())) + { + if (_last.compareAndSet(lastSample, thisSample)) + { + _rate.set(0L); + } + } + + _total.addAndGet(value); + long current = _rate.addAndGet(value); + long peak; + while (current > (peak = _peak.get())) + { + _peak.compareAndSet(peak, current); + } + } + + /** + * Update the current rate and peak - may reset rate to zero if a new + * sample period has started. + */ + private void update() + { + registerEvent(0L, System.currentTimeMillis()); + } + + /** + * Reset + */ + public void reset() + { + _peak.set(0L); + _rate.set(0L); + _total.set(0L); + _start = System.currentTimeMillis(); + _last.set(_start / _period); + } + + public double getPeak() + { + return (double) _peak.get() / ((double) _period / 1000.0d); + } + + public double getRate() + { + update(); + return (double) _rate.get() / ((double) _period / 1000.0d); + } + + public long getTotal() + { + return _total.get(); + } + + public long getStart() + { + return _start; + } + + public Date getStartTime() + { + return new Date(_start); + } + + public String getName() + { + return _name; + } + + public long getPeriod() + { + return _period; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java new file mode 100644 index 0000000000..faaf11aff6 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.stats; + +/** + * Statistics gatherer. + */ +public interface StatisticsGatherer +{ + /** + * + * @param period + */ + void initialiseStatistics(); + + /** + * + * @param messageSize + * @param timestamp + */ + void registerMessageDelivery(long messageSize, long timestamp); + + /** + * + * @return + */ + StatisticsCounter getMessageStatistics(); + + /** + * + * @return + */ + StatisticsCounter getDataStatistics(); + + /** + * + * @return + */ + void resetStatistics(); + + /** + * + * @return + */ + boolean isStatisticsEnabled(); + + /** + * + * @param enabled + */ + void setStatisticsEnabled(boolean enabled); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 71fcfb964d..20f0437c65 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -20,6 +20,14 @@ */ package org.apache.qpid.server.virtualhost; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; + +import javax.management.NotCompliantMBeanException; + import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -28,11 +36,9 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.AMQBrokerManagerMBean; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.logging.actors.AbstractActor; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.configuration.ExchangeConfiguration; import org.apache.qpid.server.configuration.QueueConfiguration; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.ConnectionRegistry; import org.apache.qpid.server.connection.IConnectionRegistry; @@ -41,30 +47,31 @@ import org.apache.qpid.server.exchange.DefaultExchangeRegistry; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.logging.actors.AbstractActor; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ConnectionMessages; +import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.protocol.AMQMinaProtocolSession; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.DefaultQueueRegistry; -import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.MessageMetaData; +import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.access.ACLManager; import org.apache.qpid.server.security.access.Accessable; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; +import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; -import javax.management.NotCompliantMBeanException; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Timer; -import java.util.TimerTask; - -public class VirtualHost implements Accessable +public class VirtualHost implements Accessable, StatisticsGatherer { private static final Logger _logger = Logger.getLogger(VirtualHost.class); @@ -90,6 +97,10 @@ public class VirtualHost implements Accessable private final Timer _houseKeepingTimer; private VirtualHostConfiguration _configuration; + + private ApplicationRegistry _registry; + private boolean _statisticsEnabled = false; + private StatisticsCounter _messageStats, _dataStats; public void setAccessableName(String name) { @@ -163,6 +174,8 @@ public class VirtualHost implements Accessable { throw new IllegalArgumentException("Illegal name (" + _name + ") for virtualhost."); } + + _registry = (ApplicationRegistry) ApplicationRegistry.getInstance(); _virtualHostMBean = new VirtualHostMBean(); @@ -227,11 +240,15 @@ public class VirtualHost implements Accessable _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); _brokerMBean.register(); - initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod()); + + initialiseHouseKeeping(); + initialiseStatistics(); } - private void initialiseHouseKeeping(long period) + private void initialiseHouseKeeping() { + long period = _configuration.getHousekeepingExpiredMessageCheckPeriod(); + /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */ if (period > 0L) { @@ -244,7 +261,7 @@ public class VirtualHost implements Accessable CurrentActor.set(new AbstractActor(ApplicationRegistry.getInstance().getRootMessageLogger()) { public String getLogMessage() { - return "[" + Thread.currentThread().getName() + "]"; + return "[" + Thread.currentThread().getName() + "] "; } }); _hkLogger.info("Starting the houseKeeping job"); @@ -472,6 +489,54 @@ public class VirtualHost implements Accessable { return _virtualHostMBean; } + + public void registerMessageDelivery(long messageSize, long timestamp) + { + if (isStatisticsEnabled()) + { + _messageStats.registerEvent(1L, timestamp); + _dataStats.registerEvent(messageSize, timestamp); + } + _registry.registerMessageDelivery(messageSize, timestamp); + } + + public StatisticsCounter getMessageStatistics() + { + return _messageStats; + } + + public StatisticsCounter getDataStatistics() + { + return _dataStats; + } + + public void resetStatistics() + { + _messageStats.reset(); + _dataStats.reset(); + + for (AMQProtocolSession session : _connectionRegistry.getConnections()) + { + ((AMQMinaProtocolSession) session).resetStatistics(); + } + } + + public void initialiseStatistics() + { + setStatisticsEnabled(_registry.getConfiguration().isStatisticsGenerationVirtualhostsEnabled()); + _messageStats = new StatisticsCounter("messages-" + getName()); + _dataStats = new StatisticsCounter("bytes-" + getName()); + } + + public boolean isStatisticsEnabled() + { + return _statisticsEnabled; + } + + public void setStatisticsEnabled(boolean enabled) + { + _statisticsEnabled = enabled; + } /** * Temporary Startup RT class to record the creation of persistent queues / exchanges. diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java new file mode 100644 index 0000000000..1e30946b01 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java @@ -0,0 +1,135 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.stats; + +import junit.framework.TestCase; + +/** + * Unit tests for the {@link StatisticsCounter} class. + */ +public class StatisticsCounterTest extends TestCase +{ + /** + * Check that statistics counters are created correctly. + */ + public void testCreate() + { + long before = System.currentTimeMillis(); + StatisticsCounter counter = new StatisticsCounter("name", 1234L); + long after = System.currentTimeMillis(); + + assertTrue(before <= counter.getStart()); + assertTrue(after >= counter.getStart()); + assertTrue(counter.getName().startsWith("name-")); + assertEquals(1234L, counter.getPeriod()); + } + + /** + * Check that totals add up correctly. + */ + public void testTotal() + { + StatisticsCounter counter = new StatisticsCounter("test", 1000L); + long start = counter.getStart(); + for (int i = 0; i < 100; i++) + { + counter.registerEvent(i, start + i); + } + assertEquals(99 * 50, counter.getTotal()); // cf. Gauss + } + + /** + * Test totals add up correctly even when messages are delivered + * out-of-order. + */ + public void testTotalOutOfOrder() + { + StatisticsCounter counter = new StatisticsCounter("test", 1000L); + long start = counter.getStart(); + assertEquals(0, counter.getTotal()); + counter.registerEvent(10, start + 2500); + assertEquals(10, counter.getTotal()); + counter.registerEvent(20, start + 1500); + assertEquals(30, counter.getTotal()); + counter.registerEvent(10, start + 500); + assertEquals(40, counter.getTotal()); + } + + /** + * Test that the peak rate is reported correctly. + */ + public void testPeak() + { + StatisticsCounter counter = new StatisticsCounter("test", 1000L); + long start = counter.getStart(); + assertEquals(0.0, counter.getPeak()); + counter.registerEvent(1000, start + 500); + assertEquals(1000.0, counter.getPeak()); + counter.registerEvent(2000, start + 1500); + assertEquals(2000.0, counter.getPeak()); + counter.registerEvent(1000, start + 2500); + assertEquals(2000.0, counter.getPeak()); + } + + /** + * Test that peak rate is reported correctly even when messages are + * delivered out-of-order. + */ + public void testPeakOutOfOrder() throws Exception + { + StatisticsCounter counter = new StatisticsCounter("test", 1000L); + long start = counter.getStart(); + assertEquals(0.0, counter.getPeak()); + counter.registerEvent(1000, start + 2500); + assertEquals(1000.0, counter.getPeak()); + counter.registerEvent(2000, start + 1500); + assertEquals(3000.0, counter.getPeak()); + counter.registerEvent(1000, start + 500); + assertEquals(4000.0, counter.getPeak()); + Thread.sleep(2000); + assertEquals(4000.0, counter.getPeak()); + counter.registerEvent(1000, start + 500); + assertEquals(5000.0, counter.getPeak()); + Thread.sleep(2000); + counter.registerEvent(1000); + assertEquals(5000.0, counter.getPeak()); + } + + /** + * Test the current rate is generated correctly. + */ + public void testRate() throws Exception + { + StatisticsCounter counter = new StatisticsCounter("test", 1000L); + assertEquals(0.0, counter.getRate()); + Thread.sleep(100); + counter.registerEvent(1000); + assertEquals(1000.0, counter.getRate()); + Thread.sleep(1000); + counter.registerEvent(2000); + assertEquals(2000.0, counter.getRate()); + Thread.sleep(1000); + counter.registerEvent(1000); + assertEquals(1000.0, counter.getRate()); + Thread.sleep(1000); + assertEquals(0.0, counter.getRate()); + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java index 6b8201eefb..9a6c913d62 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java @@ -71,6 +71,9 @@ public class NullApplicationRegistry extends ApplicationRegistry _accessManager = new ACLManager(_configuration.getSecurityConfiguration(), _pluginManager, AllowAll.FACTORY); _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null); + + initialiseStatistics(); + initialiseStatisticsReporting(); _managedObjectRegistry = new NoopManagedObjectRegistry(); _virtualHostRegistry = new VirtualHostRegistry(this); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java index 7b7c86bb80..883018a421 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java @@ -94,6 +94,9 @@ public class TestApplicationRegistry extends ApplicationRegistry _managedObjectRegistry = new NoopManagedObjectRegistry(); _messageStore = new TestableMemoryMessageStore(); + + initialiseStatistics(); + initialiseStatisticsReporting(); _virtualHostRegistry = new VirtualHostRegistry(this); diff --git a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java index d02b9b89f4..46ff2c58d5 100644 --- a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java +++ b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java @@ -34,9 +34,8 @@ import org.apache.qpid.management.common.mbeans.annotations.MBeanOperationParame /** * The ManagedBroker is the management interface to expose management * features of the Broker. - * - * @author Bhupendra Bhardwaj - * @version 0.1 + * + * @since Qpid JMX API 1.9 */ public interface ManagedBroker { @@ -122,4 +121,62 @@ public interface ManagedBroker impact= MBeanOperationInfo.ACTION) void deleteQueue(@MBeanOperationParameter(name= ManagedQueue.TYPE, description="Queue Name")String queueName) throws IOException, JMException; + + /** + * Resets all message and data statistics for the virtual host. + * + * @since Qpid JMX API 1.9 + */ + @MBeanOperation(name="resetStatistics", + description="Resets all message and data statistics for the virtual host", + impact= MBeanOperationInfo.ACTION) + void resetStatistics() throws Exception; + + /** + * Peak rate of messages per second for the virtual host. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="peakMessageRate", description=TYPE + " Peak Message Rate") + double getPeakMessageRate(); + + /** + * Peak rate of bytes per second for the virtual host. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="peakDataRate", description=TYPE + " Peak Data Rate") + double getPeakDataRate(); + + /** + * Rate of messages per second for the virtual host. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="messageRate", description=TYPE + " Message Rate") + double getMessageRate(); + + /** + * Rate of bytes per second for the virtual host. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="dataRate", description=TYPE + " Data Rate") + double getDataRate(); + + /** + * Total count of messages for the virtual host. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="totalMessages", description=TYPE + " Total Message Count") + long getTotalMessages(); + + /** + * Total count of bytes for the virtual host. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="totalData", description=TYPE + " Total Bytes") + long getTotalData(); } diff --git a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java index ff096aa22b..f2be759249 100644 --- a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java +++ b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java @@ -35,8 +35,8 @@ import org.apache.qpid.management.common.mbeans.annotations.MBeanOperationParame /** * The management interface exposed to allow management of Connections. - * @author Bhupendra Bhardwaj - * @version 0.1 + * + * @since Qpid JMX API 1.9 */ public interface ManagedConnection { @@ -139,4 +139,72 @@ public interface ManagedConnection description="Closes this connection and all related channels", impact= MBeanOperationInfo.ACTION) void closeConnection() throws Exception; + + /** + * Resets message and data statistics for this connection. + * + * @since Qpid JMX API 1.9 + */ + @MBeanOperation(name="resetStatistics", + description="Resets message and data statistics for this connection", + impact= MBeanOperationInfo.ACTION) + void resetStatistics() throws Exception; + + /** + * Peak rate of messages per second on this connection. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="peakMessageRate", description=TYPE + " Peak Message Rate") + double getPeakMessageRate(); + + /** + * Peak rate of bytes per second on this connection. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="peakDataRate", description=TYPE + " Peak Data Rate") + double getPeakDataRate(); + + /** + * Rate of messages per second on this connection. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="messageRate", description=TYPE + " Message Rate") + double getMessageRate(); + + /** + * Rate of bytes per second on this connection. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="dataRate", description=TYPE + " Data Rate") + double getDataRate(); + + /** + * Total count of messages on this connection. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="totalMessages", description=TYPE + " Total Message Count") + long getTotalMessages(); + + /** + * Total count of bytes on this connection. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="totalData", description=TYPE + " Total Bytes") + long getTotalData(); + + /** + * Is statistics collection enabled for this connection. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="statisticsEnabled", description=TYPE + " Statistics Enabled") + boolean isStatisticsEnabled(); + + void setStatisticsEnabled(boolean enabled); } diff --git a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java index f61c41dea9..0cfe73e69d 100644 --- a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java +++ b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java @@ -22,11 +22,15 @@ package org.apache.qpid.management.common.mbeans; import java.io.IOException; +import javax.management.MBeanOperationInfo; + import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute; +import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation; /** * Interface for the ServerInformation MBean - * @since Qpid JMX API 1.3 + * + * @since Qpid JMX API 1.9 */ public interface ServerInformation { @@ -80,4 +84,62 @@ public interface ServerInformation @MBeanAttribute(name="ProductVersion", description = "The product version string") String getProductVersion() throws IOException; + + /** + * Resets all message and data statistics for the broker. + * + * @since Qpid JMX API 1.9 + */ + @MBeanOperation(name="resetStatistics", + description="Resets all message and data statistics for the broker", + impact= MBeanOperationInfo.ACTION) + void resetStatistics() throws Exception; + + /** + * Peak rate of messages per second for the broker. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="peakMessageRate", description=TYPE + " Peak Message Rate") + double getPeakMessageRate(); + + /** + * Peak rate of bytes per second for the broker. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="peakDataRate", description=TYPE + " Peak Data Rate") + double getPeakDataRate(); + + /** + * Rate of messages per second for the broker. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="messageRate", description=TYPE + " Message Rate") + double getMessageRate(); + + /** + * Rate of bytes per second for the broker. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="dataRate", description=TYPE + " Data Rate") + double getDataRate(); + + /** + * Total count of messages for the broker. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="totalMessages", description=TYPE + " Total Message Count") + long getTotalMessages(); + + /** + * Total count of bytes for the broker. + * + * @since Qpid JMX API 1.9 + */ + @MBeanAttribute(name="totalData", description=TYPE + " Total Bytes") + long getTotalData(); } diff --git a/qpid/java/systests/derby.log b/qpid/java/systests/derby.log new file mode 100644 index 0000000000..177dc5aad7 --- /dev/null +++ b/qpid/java/systests/derby.log @@ -0,0 +1,6 @@ +---------------------------------------------------------------- +2010-11-05 16:45:02.145 GMT: + Booting Derby version The Apache Software Foundation - Apache Derby - 10.3.2.1 - (599110): instance c013800d-012c-1ced-4a3a-000004b61c38 +on database directory /home/kennedya/workspaces/qpid-0.5/qpid/java/build/work/test/test + +Database Class Loader started - derby.database.classpath='' diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java new file mode 100644 index 0000000000..98da330140 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java @@ -0,0 +1,94 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.management.jmx; + +import java.util.ArrayList; +import java.util.List; + +import javax.jms.Connection; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.management.common.mbeans.ManagedConnection; + +/** + * Test enabling generation of message statistics on a per-connection basis. + */ +public class MessageConnectionStatisticsTest extends MessageStatisticsTestCase +{ + public void configureStatistics() throws Exception + { + // no statistics generation configured + } + + /** + * Test statistics on a single connection + */ + public void testEnablingStatisticsPerConnection() throws Exception + { + ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); + + sendUsing(_test, 5, 200); + + List addresses = new ArrayList(); + for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) + { + assertEquals("Incorrect connection total", 0, mc.getTotalMessages()); + assertEquals("Incorrect connection data", 0, mc.getTotalData()); + + addresses.add(mc.getRemoteAddress()); + } + assertEquals("Incorrect active connection data", 0, vhost.getTotalData()); + assertEquals("Incorrect active connection data", 0, vhost.getTotalMessages()); + + Connection test = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test"); + test.start(); + for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) + { + if (addresses.contains(mc.getRemoteAddress())) + { + continue; + } + mc.setStatisticsEnabled(true); + assertEquals("Incorrect connection total", 0, mc.getTotalMessages()); + assertEquals("Incorrect connection data", 0, mc.getTotalData()); + } + + sendUsing(test, 5, 200); + sendUsing(_test, 5, 200); + + for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) + { + if (addresses.contains(mc.getRemoteAddress())) + { + assertEquals("Incorrect connection total", 0, mc.getTotalMessages()); + assertEquals("Incorrect connection data", 0, mc.getTotalData()); + } + else + { + assertEquals("Incorrect connection total", 5, mc.getTotalMessages()); + assertEquals("Incorrect connection data", 1000, mc.getTotalData()); + } + } + assertEquals("Incorrect active connection data", 0, vhost.getTotalData()); + assertEquals("Incorrect active connection data", 0, vhost.getTotalMessages()); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java new file mode 100644 index 0000000000..7cb8f7a107 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java @@ -0,0 +1,162 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.management.jmx; + +import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.management.common.mbeans.ManagedConnection; + +/** + * Test enabling generation of message statistics on a per-connection basis. + */ +public class MessageStatisticsConfigurationTest extends MessageStatisticsTestCase +{ + public void configureStatistics() throws Exception + { + setConfigurationProperty("statistics.generation.broker", Boolean.toString(getName().contains("Broker"))); + setConfigurationProperty("statistics.generation.virtualhosts", Boolean.toString(getName().contains("Virtualhost"))); + setConfigurationProperty("statistics.generation.connections", Boolean.toString(getName().contains("Connection"))); + } + + /** + * Just broker statistics. + */ + public void testGenerateBrokerStatistics() throws Exception + { + sendUsing(_test, 5, 200); + Thread.sleep(1000); + + for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) + { + assertEquals("Incorrect connection total", 0, mc.getTotalMessages()); + assertEquals("Incorrect connection data", 0, mc.getTotalData()); + } + + ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); + assertEquals("Incorrect vhost data", 0, vhost.getTotalMessages()); + assertEquals("Incorrect vhost data", 0, vhost.getTotalData()); + + if (!_broker.equals(VM)) + { + assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessages()); + assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalData()); + } + } + + /** + * Just virtualhost statistics. + */ + public void testGenerateVirtualhostStatistics() throws Exception + { + sendUsing(_test, 5, 200); + Thread.sleep(1000); + + for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) + { + assertEquals("Incorrect connection total", 0, mc.getTotalMessages()); + assertEquals("Incorrect connection data", 0, mc.getTotalData()); + } + + ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); + assertEquals("Incorrect vhost data", 5, vhost.getTotalMessages()); + assertEquals("Incorrect vhost data", 1000, vhost.getTotalData()); + + if (!_broker.equals(VM)) + { + assertEquals("Incorrect server total messages", 0, _jmxUtils.getServerInformation().getTotalMessages()); + assertEquals("Incorrect server total data", 0, _jmxUtils.getServerInformation().getTotalData()); + } + } + + /** + * Just connection statistics. + */ + public void testGenerateConnectionStatistics() throws Exception + { + sendUsing(_test, 5, 200); + Thread.sleep(1000); + + for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) + { + assertEquals("Incorrect connection total", 5, mc.getTotalMessages()); + assertEquals("Incorrect connection data", 1000, mc.getTotalData()); + } + + ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); + assertEquals("Incorrect vhost data", 0, vhost.getTotalMessages()); + assertEquals("Incorrect vhost data", 0, vhost.getTotalData()); + + if (!_broker.equals(VM)) + { + assertEquals("Incorrect server total messages", 0, _jmxUtils.getServerInformation().getTotalMessages()); + assertEquals("Incorrect server total data", 0, _jmxUtils.getServerInformation().getTotalData()); + } + } + + /** + * Both broker and virtualhost statistics. + */ + public void testGenerateBrokerAndVirtualhostStatistics() throws Exception + { + sendUsing(_test, 5, 200); + Thread.sleep(1000); + + for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) + { + assertEquals("Incorrect connection total", 0, mc.getTotalMessages()); + assertEquals("Incorrect connection data", 0, mc.getTotalData()); + } + + ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); + assertEquals("Incorrect vhost data", 5, vhost.getTotalMessages()); + assertEquals("Incorrect vhost data", 1000, vhost.getTotalData()); + + if (!_broker.equals(VM)) + { + assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessages()); + assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalData()); + } + } + + /** + * Broker, virtualhost and connection statistics. + */ + public void testGenerateBrokerVirtualhostAndConnectionStatistics() throws Exception + { + sendUsing(_test, 5, 200); + Thread.sleep(1000); + + for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) + { + assertEquals("Incorrect connection total", 5, mc.getTotalMessages()); + assertEquals("Incorrect connection data", 1000, mc.getTotalData()); + } + + ManagedBroker vhost = _jmxUtils.getManagedBroker("test"); + assertEquals("Incorrect vhost data", 5, vhost.getTotalMessages()); + assertEquals("Incorrect vhost data", 1000, vhost.getTotalData()); + + if (!_broker.equals(VM)) + { + assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessages()); + assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalData()); + } + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java new file mode 100644 index 0000000000..7051c426b9 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.management.jmx; + +import java.util.List; + +import org.apache.qpid.util.LogMonitor; + +/** + * Test generation of message statistics reporting. + */ +public class MessageStatisticsReportingTest extends MessageStatisticsTestCase +{ + protected LogMonitor _monitor; + + public void configureStatistics() throws Exception + { + setConfigurationProperty("statistics.generation.broker", "true"); + setConfigurationProperty("statistics.generation.virtualhosts", "true"); + + if (getName().equals("testEnabledStatisticsReporting")) + { + setConfigurationProperty("statistics.reporting.period", "10"); + } + + _monitor = new LogMonitor(_outputFile); + } + + /** + * Test enabling reporting. + */ + public void testEnabledStatisticsReporting() throws Exception + { + sendUsing(_test, 10, 100); + sendUsing(_dev, 20, 100); + sendUsing(_local, 15, 100); + + Thread.sleep(10 * 1000); // 15s + + List brokerStatsData = _monitor.findMatches("BRK-1008"); + List brokerStatsMessages = _monitor.findMatches("BRK-1009"); + List vhostStatsData = _monitor.findMatches("VHT-1003"); + List vhostStatsMessages = _monitor.findMatches("VHT-1004"); + + assertEquals("Incorrect number of broker data stats log messages", 1, brokerStatsData.size()); + assertEquals("Incorrect number of broker message stats log messages", 1, brokerStatsMessages.size()); + assertEquals("Incorrect number of virtualhost data stats log messages", 3, vhostStatsData.size()); + assertEquals("Incorrect number of virtualhost message stats log messages", 3, vhostStatsMessages.size()); + } + + /** + * Test not enabling reporting. + */ + public void testNotEnabledStatisticsReporting() throws Exception + { + sendUsing(_test, 10, 100); + sendUsing(_dev, 20, 100); + sendUsing(_local, 15, 100); + + Thread.sleep(10 * 1000); // 15s + + List brokerStatsData = _monitor.findMatches("BRK-1008"); + List brokerStatsMessages = _monitor.findMatches("BRK-1009"); + List vhostStatsData = _monitor.findMatches("VHT-1003"); + List vhostStatsMessages = _monitor.findMatches("VHT-1004"); + + assertEquals("Incorrect number of broker data stats log messages", 0, brokerStatsData.size()); + assertEquals("Incorrect number of broker message stats log messages", 0, brokerStatsMessages.size()); + assertEquals("Incorrect number of virtualhost data stats log messages", 0, vhostStatsData.size()); + assertEquals("Incorrect number of virtualhost message stats log messages", 0, vhostStatsMessages.size()); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java new file mode 100644 index 0000000000..e97fd1eec9 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java @@ -0,0 +1,211 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.management.jmx; + +import javax.jms.Connection; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.management.common.mbeans.ManagedConnection; + +/** + * Test generation of message statistics. + */ +public class MessageStatisticsTest extends MessageStatisticsTestCase +{ + public void configureStatistics() throws Exception + { + setConfigurationProperty("statistics.generation.broker", "true"); + setConfigurationProperty("statistics.generation.virtualhosts", "true"); + setConfigurationProperty("statistics.generation.connections", "true"); + } + + /** + * Test message totals. + */ + public void testMessageTotals() throws Exception + { + sendUsing(_test, 10, 100); + sendUsing(_dev, 20, 100); + sendUsing(_local, 5, 100); + sendUsing(_local, 5, 100); + sendUsing(_local, 5, 100); + + ManagedBroker test = _jmxUtils.getManagedBroker("test"); + ManagedBroker dev = _jmxUtils.getManagedBroker("development"); + ManagedBroker local = _jmxUtils.getManagedBroker("localhost"); + + long total = 0; + long data = 0; + for (ManagedConnection mc : _jmxUtils.getManagedConnections("*")) + { + total += mc.getTotalMessages(); + data += mc.getTotalData(); + } + assertEquals("Incorrect connection total", 45, total); + assertEquals("Incorrect connection data", 45 * 100, data); + if (!_broker.equals(VM)) + { + assertEquals("Incorrect server total", 45, _jmxUtils.getServerInformation().getTotalMessages()); + assertEquals("Incorrect server data", 45 * 100, _jmxUtils.getServerInformation().getTotalData()); + } + + long testTotal = 0; + long testData = 0; + for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) + { + testTotal += mc.getTotalMessages(); + testData += mc.getTotalData(); + } + assertEquals("Incorrect test connection total", 10, testTotal); + assertEquals("Incorrect test vhost total", 10, test.getTotalMessages()); + assertEquals("Incorrect test connection data", 10 * 100, testData); + assertEquals("Incorrect test vhost data", 10 * 100, test.getTotalData()); + + long devTotal = 0; + long devData = 0; + for (ManagedConnection mc : _jmxUtils.getManagedConnections("development")) + { + devTotal += mc.getTotalMessages(); + devData += mc.getTotalData(); + } + assertEquals("Incorrect test connection total", 20, devTotal); + assertEquals("Incorrect development total", 20, dev.getTotalMessages()); + assertEquals("Incorrect test connection data", 20 * 100, devData); + assertEquals("Incorrect development data", 20 * 100, dev.getTotalData()); + + long localTotal = 0; + long localData = 0; + for (ManagedConnection mc : _jmxUtils.getManagedConnections("localhost")) + { + localTotal += mc.getTotalMessages(); + localData += mc.getTotalData(); + } + assertEquals("Incorrect test connection total", 15, localTotal); + assertEquals("Incorrect localhost total", 15, local.getTotalMessages()); + assertEquals("Incorrect test connection data", 15 * 100, localData); + assertEquals("Incorrect localhost data", 15 * 100, local.getTotalData()); + } + + /** + * Test message totals when a connection is closed. + */ + public void testMessageTotalsWithClosedConnections() throws Exception + { + Connection temp = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test"); + temp.start(); + + sendUsing(_test, 10, 100); + sendUsing(temp, 10, 100); + sendUsing(_test, 10, 100); + + temp.close(); + + ManagedBroker test = _jmxUtils.getManagedBroker("test"); + + long total = 0; + long data = 0; + for (ManagedConnection mc : _jmxUtils.getManagedConnections("*")) + { + total += mc.getTotalMessages(); + data += mc.getTotalData(); + } + assertEquals("Incorrect active connection total", 20, total); + assertEquals("Incorrect active connection data", 20 * 100, data); + if (!_broker.equals(VM)) + { + assertEquals("Incorrect server total", 30, _jmxUtils.getServerInformation().getTotalMessages()); + assertEquals("Incorrect server data", 30 * 100, _jmxUtils.getServerInformation().getTotalData()); + } + + long testTotal = 0; + long testData = 0; + for (ManagedConnection mc : _jmxUtils.getManagedConnections("test")) + { + testTotal += mc.getTotalMessages(); + testData += mc.getTotalData(); + } + assertEquals("Incorrect test active connection total", 20, testTotal); + assertEquals("Incorrect test vhost total", 30, test.getTotalMessages()); + assertEquals("Incorrect test active connection data", 20 * 100, testData); + assertEquals("Incorrect test vhost data", 30 * 100, test.getTotalData()); + } + + /** + * Test message peak rate generation. + */ + public void testMessagePeakRates() throws Exception + { + sendUsing(_test, 1, 10000); + Thread.sleep(10 * 1000); + sendUsing(_dev, 10, 10); + + ManagedBroker test = _jmxUtils.getManagedBroker("test"); + ManagedBroker dev = _jmxUtils.getManagedBroker("development"); + + assertEquals("Incorrect test vhost peak messages", 1.0d, test.getPeakMessageRate()); + assertEquals("Incorrect test vhost peak data", 10000.0d, test.getPeakDataRate()); + assertEquals("Incorrect dev vhost peak messages", 10.0d, dev.getPeakMessageRate()); + assertEquals("Incorrect dev vhost peak data", 100.0d, dev.getPeakDataRate()); + + if (!_broker.equals(VM)) + { + assertEquals("Incorrect server peak messages", 10.0d, _jmxUtils.getServerInformation().getPeakMessageRate()); + assertEquals("Incorrect server peak data", 10000.0d, _jmxUtils.getServerInformation().getPeakDataRate()); + } + } + + /** + * Test message totals when a vhost has its statistics reset + */ + public void testMessageTotalVhostReset() throws Exception + { + sendUsing(_test, 10, 10); + sendUsing(_dev, 10, 10); + + ManagedBroker test = _jmxUtils.getManagedBroker("test"); + ManagedBroker dev = _jmxUtils.getManagedBroker("development"); + + assertEquals("Incorrect test vhost total messages", 10, test.getTotalMessages()); + assertEquals("Incorrect test vhost total data", 100, test.getTotalData()); + assertEquals("Incorrect dev vhost total messages", 10, dev.getTotalMessages()); + assertEquals("Incorrect dev vhost total data", 100, dev.getTotalData()); + + if (!_broker.equals(VM)) + { + assertEquals("Incorrect server total messages", 20, _jmxUtils.getServerInformation().getTotalMessages()); + assertEquals("Incorrect server total data", 200, _jmxUtils.getServerInformation().getTotalData()); + } + + test.resetStatistics(); + + assertEquals("Incorrect test vhost total messages", 0, test.getTotalMessages()); + assertEquals("Incorrect test vhost total data", 0, test.getTotalData()); + assertEquals("Incorrect dev vhost total messages", 10, dev.getTotalMessages()); + assertEquals("Incorrect dev vhost total data", 100, dev.getTotalData()); + + if (!_broker.equals(VM)) + { + assertEquals("Incorrect server total messages", 20, _jmxUtils.getServerInformation().getTotalMessages()); + assertEquals("Incorrect server total data", 200, _jmxUtils.getServerInformation().getTotalData()); + } + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java new file mode 100644 index 0000000000..4df1b6e8b6 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.management.jmx; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidTestCase; + +/** + * Test generation of message statistics. + */ +public abstract class MessageStatisticsTestCase extends QpidTestCase +{ + protected static final String USER = "admin"; + + protected JMXTestUtils _jmxUtils; + protected Connection _test, _dev, _local; + protected Destination _queue; + protected String _brokerUrl; + + @Override + public void setUp() throws Exception + { + _jmxUtils = new JMXTestUtils(this, USER, USER); + _jmxUtils.setUp(); + + configureStatistics(); + + super.setUp(); + + _queue = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, "queue"); + + _brokerUrl = getBroker().toString(); + _test = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test"); + _dev = new AMQConnection(_brokerUrl, USER, USER, "clientid", "development"); + _local = new AMQConnection(_brokerUrl, USER, USER, "clientid", "localhost"); + + _test.start(); + _dev.start(); + _local.start(); + + _jmxUtils.open(); + } + + @Override + public void tearDown() throws Exception + { + _jmxUtils.close(); + + _test.close(); + _dev.close(); + _local.close(); + + super.tearDown(); + } + + /** + * Configure statistics generation properties on the broker. + */ + public abstract void configureStatistics() throws Exception; + + protected void sendUsing(Connection con, int number, int size) throws Exception + { + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(_queue); + String content = new String(new byte[size]); + TextMessage msg = session.createTextMessage(content); + for (int i = 0; i < number; i++) + { + producer.send(msg); + } + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java index 5d8ee785ec..e85c53e1a8 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java @@ -92,7 +92,7 @@ public class ConnectionCloseTest extends QpidTestCase assertTrue("Spurious thread creation exceeded threshold, " + delta.size() + " threads created.", - delta.size() < 10); + delta.size() < 20); } private void dumpStacks(Map map) diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java index ca59a0536b..81f7f07936 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java @@ -24,7 +24,9 @@ import org.apache.commons.configuration.ConfigurationException; import org.apache.qpid.commands.objects.AllObjects; import org.apache.qpid.management.common.JMXConnnectionFactory; import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.management.common.mbeans.ManagedConnection; import org.apache.qpid.management.common.mbeans.ManagedExchange; +import org.apache.qpid.management.common.mbeans.ServerInformation; import javax.management.JMException; import javax.management.MBeanException; @@ -33,8 +35,12 @@ import javax.management.MBeanServerInvocationHandler; import javax.management.ObjectName; import javax.management.remote.JMXConnector; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Set; +import junit.framework.TestCase; + /** * */ @@ -57,6 +63,7 @@ public class JMXTestUtils public void setUp() throws IOException, ConfigurationException, Exception { _test.setConfigurationProperty("management.enabled", "true"); + _test.setSystemProperty("qpid.management.disableCustomSocketFactory", "true"); } public void open() throws Exception @@ -209,4 +216,44 @@ public class JMXTestUtils newProxyInstance(_mbsc, getExchangeObjectName("test", exchangeName), ManagedExchange.class, false); } + + /** + * Retrive {@link ServerInformation} JMX MBean. + */ + public ServerInformation getServerInformation() + { + // Get the name of the test manager + AllObjects allObject = new AllObjects(_mbsc); + allObject.querystring = "org.apache.qpid:type=ServerInformation,name=ServerInformation,*"; + + Set objectNames = allObject.returnObjects(); + + TestCase.assertNotNull("Null ObjectName Set returned", objectNames); + TestCase.assertEquals("Incorrect number of objects returned", 1, objectNames.size()); + + // We have verified we have only one value in objectNames so return it + return getManagedObject(ServerInformation.class, objectNames.iterator().next()); + } + + /** + * Retrive all {@link ManagedConnection} objects. + */ + public List getManagedConnections(String vhost) + { + // Get the name of the test manager + AllObjects allObject = new AllObjects(_mbsc); + allObject.querystring = "org.apache.qpid:type=VirtualHost.Connection,VirtualHost=" + vhost + ",name=*"; + + Set objectNames = allObject.returnObjects(); + + TestCase.assertNotNull("Null ObjectName Set returned", objectNames); + + // Collect all the connection objects + List connections = new ArrayList(); + for (ObjectName name : objectNames) + { + connections.add(getManagedObject(ManagedConnection.class, name)); + } + return connections; + } } -- cgit v1.2.1