summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java95
1 files changed, 80 insertions, 15 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 71fcfb964d..20f0437c65 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -20,6 +20,14 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import javax.management.NotCompliantMBeanException;
+
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -28,11 +36,9 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.AMQBrokerManagerMBean;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.logging.actors.AbstractActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.configuration.ExchangeConfiguration;
import org.apache.qpid.server.configuration.QueueConfiguration;
+import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.ConnectionRegistry;
import org.apache.qpid.server.connection.IConnectionRegistry;
@@ -41,30 +47,31 @@ import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.logging.actors.AbstractActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.access.ACLManager;
import org.apache.qpid.server.security.access.Accessable;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
-import javax.management.NotCompliantMBeanException;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-
-public class VirtualHost implements Accessable
+public class VirtualHost implements Accessable, StatisticsGatherer
{
private static final Logger _logger = Logger.getLogger(VirtualHost.class);
@@ -90,6 +97,10 @@ public class VirtualHost implements Accessable
private final Timer _houseKeepingTimer;
private VirtualHostConfiguration _configuration;
+
+ private ApplicationRegistry _registry;
+ private boolean _statisticsEnabled = false;
+ private StatisticsCounter _messageStats, _dataStats;
public void setAccessableName(String name)
{
@@ -163,6 +174,8 @@ public class VirtualHost implements Accessable
{
throw new IllegalArgumentException("Illegal name (" + _name + ") for virtualhost.");
}
+
+ _registry = (ApplicationRegistry) ApplicationRegistry.getInstance();
_virtualHostMBean = new VirtualHostMBean();
@@ -227,11 +240,15 @@ public class VirtualHost implements Accessable
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
_brokerMBean.register();
- initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod());
+
+ initialiseHouseKeeping();
+ initialiseStatistics();
}
- private void initialiseHouseKeeping(long period)
+ private void initialiseHouseKeeping()
{
+ long period = _configuration.getHousekeepingExpiredMessageCheckPeriod();
+
/* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */
if (period > 0L)
{
@@ -244,7 +261,7 @@ public class VirtualHost implements Accessable
CurrentActor.set(new AbstractActor(ApplicationRegistry.getInstance().getRootMessageLogger()) {
public String getLogMessage()
{
- return "[" + Thread.currentThread().getName() + "]";
+ return "[" + Thread.currentThread().getName() + "] ";
}
});
_hkLogger.info("Starting the houseKeeping job");
@@ -472,6 +489,54 @@ public class VirtualHost implements Accessable
{
return _virtualHostMBean;
}
+
+ public void registerMessageDelivery(long messageSize, long timestamp)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messageStats.registerEvent(1L, timestamp);
+ _dataStats.registerEvent(messageSize, timestamp);
+ }
+ _registry.registerMessageDelivery(messageSize, timestamp);
+ }
+
+ public StatisticsCounter getMessageStatistics()
+ {
+ return _messageStats;
+ }
+
+ public StatisticsCounter getDataStatistics()
+ {
+ return _dataStats;
+ }
+
+ public void resetStatistics()
+ {
+ _messageStats.reset();
+ _dataStats.reset();
+
+ for (AMQProtocolSession session : _connectionRegistry.getConnections())
+ {
+ ((AMQMinaProtocolSession) session).resetStatistics();
+ }
+ }
+
+ public void initialiseStatistics()
+ {
+ setStatisticsEnabled(_registry.getConfiguration().isStatisticsGenerationVirtualhostsEnabled());
+ _messageStats = new StatisticsCounter("messages-" + getName());
+ _dataStats = new StatisticsCounter("bytes-" + getName());
+ }
+
+ public boolean isStatisticsEnabled()
+ {
+ return _statisticsEnabled;
+ }
+
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ _statisticsEnabled = enabled;
+ }
/**
* Temporary Startup RT class to record the creation of persistent queues / exchanges.