diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java | 95 |
1 files changed, 80 insertions, 15 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 71fcfb964d..20f0437c65 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -20,6 +20,14 @@ */ package org.apache.qpid.server.virtualhost; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; + +import javax.management.NotCompliantMBeanException; + import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -28,11 +36,9 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.AMQBrokerManagerMBean; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.logging.actors.AbstractActor; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.configuration.ExchangeConfiguration; import org.apache.qpid.server.configuration.QueueConfiguration; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.ConnectionRegistry; import org.apache.qpid.server.connection.IConnectionRegistry; @@ -41,30 +47,31 @@ import org.apache.qpid.server.exchange.DefaultExchangeRegistry; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.logging.actors.AbstractActor; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ConnectionMessages; +import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.protocol.AMQMinaProtocolSession; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.DefaultQueueRegistry; -import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.MessageMetaData; +import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.access.ACLManager; import org.apache.qpid.server.security.access.Accessable; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; +import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; -import javax.management.NotCompliantMBeanException; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Timer; -import java.util.TimerTask; - -public class VirtualHost implements Accessable +public class VirtualHost implements Accessable, StatisticsGatherer { private static final Logger _logger = Logger.getLogger(VirtualHost.class); @@ -90,6 +97,10 @@ public class VirtualHost implements Accessable private final Timer _houseKeepingTimer; private VirtualHostConfiguration _configuration; + + private ApplicationRegistry _registry; + private boolean _statisticsEnabled = false; + private StatisticsCounter _messageStats, _dataStats; public void setAccessableName(String name) { @@ -163,6 +174,8 @@ public class VirtualHost implements Accessable { throw new IllegalArgumentException("Illegal name (" + _name + ") for virtualhost."); } + + _registry = (ApplicationRegistry) ApplicationRegistry.getInstance(); _virtualHostMBean = new VirtualHostMBean(); @@ -227,11 +240,15 @@ public class VirtualHost implements Accessable _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); _brokerMBean.register(); - initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod()); + + initialiseHouseKeeping(); + initialiseStatistics(); } - private void initialiseHouseKeeping(long period) + private void initialiseHouseKeeping() { + long period = _configuration.getHousekeepingExpiredMessageCheckPeriod(); + /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */ if (period > 0L) { @@ -244,7 +261,7 @@ public class VirtualHost implements Accessable CurrentActor.set(new AbstractActor(ApplicationRegistry.getInstance().getRootMessageLogger()) { public String getLogMessage() { - return "[" + Thread.currentThread().getName() + "]"; + return "[" + Thread.currentThread().getName() + "] "; } }); _hkLogger.info("Starting the houseKeeping job"); @@ -472,6 +489,54 @@ public class VirtualHost implements Accessable { return _virtualHostMBean; } + + public void registerMessageDelivery(long messageSize, long timestamp) + { + if (isStatisticsEnabled()) + { + _messageStats.registerEvent(1L, timestamp); + _dataStats.registerEvent(messageSize, timestamp); + } + _registry.registerMessageDelivery(messageSize, timestamp); + } + + public StatisticsCounter getMessageStatistics() + { + return _messageStats; + } + + public StatisticsCounter getDataStatistics() + { + return _dataStats; + } + + public void resetStatistics() + { + _messageStats.reset(); + _dataStats.reset(); + + for (AMQProtocolSession session : _connectionRegistry.getConnections()) + { + ((AMQMinaProtocolSession) session).resetStatistics(); + } + } + + public void initialiseStatistics() + { + setStatisticsEnabled(_registry.getConfiguration().isStatisticsGenerationVirtualhostsEnabled()); + _messageStats = new StatisticsCounter("messages-" + getName()); + _dataStats = new StatisticsCounter("bytes-" + getName()); + } + + public boolean isStatisticsEnabled() + { + return _statisticsEnabled; + } + + public void setStatisticsEnabled(boolean enabled) + { + _statisticsEnabled = enabled; + } /** * Temporary Startup RT class to record the creation of persistent queues / exchanges. |