summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java289
1 files changed, 195 insertions, 94 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index 6753cf4560..9951f7d3c8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -20,18 +20,10 @@
*/
package org.apache.qpid.server.registry;
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
-
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
+import org.osgi.framework.BundleContext;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.common.Closeable;
import org.apache.qpid.common.QpidProperties;
@@ -45,6 +37,7 @@ import org.apache.qpid.server.configuration.SystemConfigImpl;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.logging.CompositeStartupMessageLogger;
import org.apache.qpid.server.logging.Log4jMessageLogger;
+import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.qpid.server.logging.SystemOutMessageLogger;
import org.apache.qpid.server.logging.actors.AbstractActor;
@@ -65,7 +58,16 @@ import org.apache.qpid.server.transport.QpidAcceptor;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.osgi.framework.BundleContext;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
/**
@@ -76,33 +78,33 @@ import org.osgi.framework.BundleContext;
*/
public abstract class ApplicationRegistry implements IApplicationRegistry
{
- protected static final Logger _logger = Logger.getLogger(ApplicationRegistry.class);
+ private static final Logger _logger = Logger.getLogger(ApplicationRegistry.class);
private static AtomicReference<IApplicationRegistry> _instance = new AtomicReference<IApplicationRegistry>(null);
- protected final ServerConfiguration _configuration;
+ private final ServerConfiguration _configuration;
- protected final Map<InetSocketAddress, QpidAcceptor> _acceptors = new HashMap<InetSocketAddress, QpidAcceptor>();
+ private final Map<InetSocketAddress, QpidAcceptor> _acceptors = new HashMap<InetSocketAddress, QpidAcceptor>();
- protected ManagedObjectRegistry _managedObjectRegistry;
+ private ManagedObjectRegistry _managedObjectRegistry;
- protected AuthenticationManager _authenticationManager;
+ private AuthenticationManager _authenticationManager;
- protected VirtualHostRegistry _virtualHostRegistry;
+ private VirtualHostRegistry _virtualHostRegistry;
- protected SecurityManager _securityManager;
+ private SecurityManager _securityManager;
- protected PluginManager _pluginManager;
+ private PluginManager _pluginManager;
- protected ConfigurationManager _configurationManager;
+ private ConfigurationManager _configurationManager;
- protected RootMessageLogger _rootMessageLogger;
+ private RootMessageLogger _rootMessageLogger;
- protected CompositeStartupMessageLogger _startupMessageLogger;
+ private CompositeStartupMessageLogger _startupMessageLogger;
- protected UUID _brokerId = UUID.randomUUID();
+ private UUID _brokerId = UUID.randomUUID();
- protected QMFService _qmfService;
+ private QMFService _qmfService;
private BrokerConfig _broker;
@@ -114,17 +116,74 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
private BundleContext _bundleContext;
- static
+ protected static Logger get_logger()
{
- Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService()));
+ return _logger;
}
- private static class ShutdownService implements Runnable
+ protected Map<InetSocketAddress, QpidAcceptor> getAcceptors()
{
- public void run()
- {
- remove();
- }
+ return _acceptors;
+ }
+
+ protected void setManagedObjectRegistry(ManagedObjectRegistry managedObjectRegistry)
+ {
+ _managedObjectRegistry = managedObjectRegistry;
+ }
+
+ protected void setAuthenticationManager(AuthenticationManager authenticationManager)
+ {
+ _authenticationManager = authenticationManager;
+ }
+
+ protected void setVirtualHostRegistry(VirtualHostRegistry virtualHostRegistry)
+ {
+ _virtualHostRegistry = virtualHostRegistry;
+ }
+
+ protected void setSecurityManager(SecurityManager securityManager)
+ {
+ _securityManager = securityManager;
+ }
+
+ protected void setPluginManager(PluginManager pluginManager)
+ {
+ _pluginManager = pluginManager;
+ }
+
+ protected void setConfigurationManager(ConfigurationManager configurationManager)
+ {
+ _configurationManager = configurationManager;
+ }
+
+ protected void setRootMessageLogger(RootMessageLogger rootMessageLogger)
+ {
+ _rootMessageLogger = rootMessageLogger;
+ }
+
+ protected CompositeStartupMessageLogger getStartupMessageLogger()
+ {
+ return _startupMessageLogger;
+ }
+
+ protected void setStartupMessageLogger(CompositeStartupMessageLogger startupMessageLogger)
+ {
+ _startupMessageLogger = startupMessageLogger;
+ }
+
+ protected void setBrokerId(UUID brokerId)
+ {
+ _brokerId = brokerId;
+ }
+
+ protected QMFService getQmfService()
+ {
+ return _qmfService;
+ }
+
+ protected void setQmfService(QMFService qmfService)
+ {
+ _qmfService = qmfService;
}
public static void initialise(IApplicationRegistry instance) throws Exception
@@ -201,7 +260,6 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
_logger.info("Shutting down ApplicationRegistry(" + instance + ")");
}
instance.close();
- instance.getBroker().getSystem().removeBroker(instance.getBroker());
}
}
catch (Exception e)
@@ -256,7 +314,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
_qmfService = new QMFService(getConfigStore(), this);
- CurrentActor.get().message(BrokerMessages.STARTUP(QpidProperties.getReleaseVersion(), QpidProperties.getBuildVersion()));
+ logStartupMessages(CurrentActor.get());
_virtualHostRegistry = new VirtualHostRegistry(this);
@@ -285,6 +343,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
}
}
+
/**
* Iterates across all discovered authentication manager factories, offering the security configuration to each.
* Expects <b>exactly</b> one authentication manager to configure and initialise itself.
@@ -358,57 +417,71 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
{
_reportingTimer = new Timer("Statistics-Reporting", true);
- class StatisticsReportingTask extends TimerTask
+
+
+ _reportingTimer.scheduleAtFixedRate(new StatisticsReportingTask(broker, virtualhost, reset),
+ report / 2,
+ report);
+ }
+ }
+
+ private class StatisticsReportingTask extends TimerTask
+ {
+ private final int DELIVERED = 0;
+ private final int RECEIVED = 1;
+
+ private boolean _broker;
+ private boolean _virtualhost;
+ private boolean _reset;
+
+
+ public StatisticsReportingTask(boolean broker, boolean virtualhost, boolean reset)
+ {
+ _broker = broker;
+ _virtualhost = virtualhost;
+ _reset = reset;
+ }
+
+ public void run()
+ {
+ CurrentActor.set(new AbstractActor(ApplicationRegistry.getInstance().getRootMessageLogger()) {
+ public String getLogMessage()
+ {
+ return "[" + Thread.currentThread().getName() + "] ";
+ }
+ });
+
+ if (_broker)
{
- private final int DELIVERED = 0;
- private final int RECEIVED = 1;
-
- public void run()
+ CurrentActor.get().message(BrokerMessages.STATS_DATA(DELIVERED, _dataDelivered.getPeak() / 1024.0, _dataDelivered.getTotal()));
+ CurrentActor.get().message(BrokerMessages.STATS_MSGS(DELIVERED, _messagesDelivered.getPeak(), _messagesDelivered.getTotal()));
+ CurrentActor.get().message(BrokerMessages.STATS_DATA(RECEIVED, _dataReceived.getPeak() / 1024.0, _dataReceived.getTotal()));
+ CurrentActor.get().message(BrokerMessages.STATS_MSGS(RECEIVED, _messagesReceived.getPeak(), _messagesReceived.getTotal()));
+ }
+
+ if (_virtualhost)
+ {
+ for (VirtualHost vhost : getVirtualHostRegistry().getVirtualHosts())
{
- CurrentActor.set(new AbstractActor(ApplicationRegistry.getInstance().getRootMessageLogger()) {
- public String getLogMessage()
- {
- return "[" + Thread.currentThread().getName() + "] ";
- }
- });
-
- if (broker)
- {
- CurrentActor.get().message(BrokerMessages.STATS_DATA(DELIVERED, _dataDelivered.getPeak() / 1024.0, _dataDelivered.getTotal()));
- CurrentActor.get().message(BrokerMessages.STATS_MSGS(DELIVERED, _messagesDelivered.getPeak(), _messagesDelivered.getTotal()));
- CurrentActor.get().message(BrokerMessages.STATS_DATA(RECEIVED, _dataReceived.getPeak() / 1024.0, _dataReceived.getTotal()));
- CurrentActor.get().message(BrokerMessages.STATS_MSGS(RECEIVED, _messagesReceived.getPeak(), _messagesReceived.getTotal()));
- }
-
- if (virtualhost)
- {
- for (VirtualHost vhost : getVirtualHostRegistry().getVirtualHosts())
- {
- String name = vhost.getName();
- StatisticsCounter dataDelivered = vhost.getDataDeliveryStatistics();
- StatisticsCounter messagesDelivered = vhost.getMessageDeliveryStatistics();
- StatisticsCounter dataReceived = vhost.getDataReceiptStatistics();
- StatisticsCounter messagesReceived = vhost.getMessageReceiptStatistics();
-
- CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, DELIVERED, dataDelivered.getPeak() / 1024.0, dataDelivered.getTotal()));
- CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, DELIVERED, messagesDelivered.getPeak(), messagesDelivered.getTotal()));
- CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, RECEIVED, dataReceived.getPeak() / 1024.0, dataReceived.getTotal()));
- CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, RECEIVED, messagesReceived.getPeak(), messagesReceived.getTotal()));
- }
- }
-
- if (reset)
- {
- resetStatistics();
- }
-
- CurrentActor.remove();
+ String name = vhost.getName();
+ StatisticsCounter dataDelivered = vhost.getDataDeliveryStatistics();
+ StatisticsCounter messagesDelivered = vhost.getMessageDeliveryStatistics();
+ StatisticsCounter dataReceived = vhost.getDataReceiptStatistics();
+ StatisticsCounter messagesReceived = vhost.getMessageReceiptStatistics();
+
+ CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, DELIVERED, dataDelivered.getPeak() / 1024.0, dataDelivered.getTotal()));
+ CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, DELIVERED, messagesDelivered.getPeak(), messagesDelivered.getTotal()));
+ CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, RECEIVED, dataReceived.getPeak() / 1024.0, dataReceived.getTotal()));
+ CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, RECEIVED, messagesReceived.getPeak(), messagesReceived.getTotal()));
}
}
- _reportingTimer.scheduleAtFixedRate(new StatisticsReportingTask(),
- report / 2,
- report);
+ if (_reset)
+ {
+ resetStatistics();
+ }
+
+ CurrentActor.remove();
}
}
@@ -449,35 +522,49 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
}
}
-
public void close()
{
if (_logger.isInfoEnabled())
{
_logger.info("Shutting down ApplicationRegistry:" + this);
}
-
- //Stop Statistics Reporting
- if (_reportingTimer != null)
+
+ //Set the Actor for Broker Shutdown
+ CurrentActor.set(new BrokerActor(getRootMessageLogger()));
+ try
{
- _reportingTimer.cancel();
- }
+ //Stop Statistics Reporting
+ if (_reportingTimer != null)
+ {
+ _reportingTimer.cancel();
+ }
- //Stop incoming connections
- unbind();
+ //Stop incoming connections
+ unbind();
- //Shutdown virtualhosts
- close(_virtualHostRegistry);
+ //Shutdown virtualhosts
+ close(_virtualHostRegistry);
- close(_authenticationManager);
+ close(_authenticationManager);
- close(_qmfService);
+ close(_qmfService);
- close(_pluginManager);
+ close(_pluginManager);
- close(_managedObjectRegistry);
+ close(_managedObjectRegistry);
- CurrentActor.get().message(BrokerMessages.STOPPED());
+ BrokerConfig broker = getBroker();
+ if(broker != null)
+ {
+ broker.getSystem().removeBroker(broker);
+ }
+
+ CurrentActor.get().message(BrokerMessages.STOPPED());
+ }
+ finally
+ {
+ CurrentActor.remove();
+ }
}
private void unbind()
@@ -654,4 +741,18 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
{
_statisticsEnabled = enabled;
}
+
+ private void logStartupMessages(LogActor logActor)
+ {
+ logActor.message(BrokerMessages.STARTUP(QpidProperties.getReleaseVersion(), QpidProperties.getBuildVersion()));
+
+ logActor.message(BrokerMessages.PLATFORM(System.getProperty("java.vendor"),
+ System.getProperty("java.runtime.version", System.getProperty("java.version")),
+ System.getProperty("os.name"),
+ System.getProperty("os.version"),
+ System.getProperty("os.arch")));
+
+ logActor.message(BrokerMessages.MAX_MEMORY(Runtime.getRuntime().maxMemory()));
+ }
+
}