diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java | 289 |
1 files changed, 195 insertions, 94 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 6753cf4560..9951f7d3c8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -20,18 +20,10 @@ */ package org.apache.qpid.server.registry; -import java.net.InetSocketAddress; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicReference; - import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; +import org.osgi.framework.BundleContext; + import org.apache.qpid.AMQException; import org.apache.qpid.common.Closeable; import org.apache.qpid.common.QpidProperties; @@ -45,6 +37,7 @@ import org.apache.qpid.server.configuration.SystemConfigImpl; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.logging.CompositeStartupMessageLogger; import org.apache.qpid.server.logging.Log4jMessageLogger; +import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.RootMessageLogger; import org.apache.qpid.server.logging.SystemOutMessageLogger; import org.apache.qpid.server.logging.actors.AbstractActor; @@ -65,7 +58,16 @@ import org.apache.qpid.server.transport.QpidAcceptor; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.osgi.framework.BundleContext; + +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; /** @@ -76,33 +78,33 @@ import org.osgi.framework.BundleContext; */ public abstract class ApplicationRegistry implements IApplicationRegistry { - protected static final Logger _logger = Logger.getLogger(ApplicationRegistry.class); + private static final Logger _logger = Logger.getLogger(ApplicationRegistry.class); private static AtomicReference<IApplicationRegistry> _instance = new AtomicReference<IApplicationRegistry>(null); - protected final ServerConfiguration _configuration; + private final ServerConfiguration _configuration; - protected final Map<InetSocketAddress, QpidAcceptor> _acceptors = new HashMap<InetSocketAddress, QpidAcceptor>(); + private final Map<InetSocketAddress, QpidAcceptor> _acceptors = new HashMap<InetSocketAddress, QpidAcceptor>(); - protected ManagedObjectRegistry _managedObjectRegistry; + private ManagedObjectRegistry _managedObjectRegistry; - protected AuthenticationManager _authenticationManager; + private AuthenticationManager _authenticationManager; - protected VirtualHostRegistry _virtualHostRegistry; + private VirtualHostRegistry _virtualHostRegistry; - protected SecurityManager _securityManager; + private SecurityManager _securityManager; - protected PluginManager _pluginManager; + private PluginManager _pluginManager; - protected ConfigurationManager _configurationManager; + private ConfigurationManager _configurationManager; - protected RootMessageLogger _rootMessageLogger; + private RootMessageLogger _rootMessageLogger; - protected CompositeStartupMessageLogger _startupMessageLogger; + private CompositeStartupMessageLogger _startupMessageLogger; - protected UUID _brokerId = UUID.randomUUID(); + private UUID _brokerId = UUID.randomUUID(); - protected QMFService _qmfService; + private QMFService _qmfService; private BrokerConfig _broker; @@ -114,17 +116,74 @@ public abstract class ApplicationRegistry implements IApplicationRegistry private BundleContext _bundleContext; - static + protected static Logger get_logger() { - Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService())); + return _logger; } - private static class ShutdownService implements Runnable + protected Map<InetSocketAddress, QpidAcceptor> getAcceptors() { - public void run() - { - remove(); - } + return _acceptors; + } + + protected void setManagedObjectRegistry(ManagedObjectRegistry managedObjectRegistry) + { + _managedObjectRegistry = managedObjectRegistry; + } + + protected void setAuthenticationManager(AuthenticationManager authenticationManager) + { + _authenticationManager = authenticationManager; + } + + protected void setVirtualHostRegistry(VirtualHostRegistry virtualHostRegistry) + { + _virtualHostRegistry = virtualHostRegistry; + } + + protected void setSecurityManager(SecurityManager securityManager) + { + _securityManager = securityManager; + } + + protected void setPluginManager(PluginManager pluginManager) + { + _pluginManager = pluginManager; + } + + protected void setConfigurationManager(ConfigurationManager configurationManager) + { + _configurationManager = configurationManager; + } + + protected void setRootMessageLogger(RootMessageLogger rootMessageLogger) + { + _rootMessageLogger = rootMessageLogger; + } + + protected CompositeStartupMessageLogger getStartupMessageLogger() + { + return _startupMessageLogger; + } + + protected void setStartupMessageLogger(CompositeStartupMessageLogger startupMessageLogger) + { + _startupMessageLogger = startupMessageLogger; + } + + protected void setBrokerId(UUID brokerId) + { + _brokerId = brokerId; + } + + protected QMFService getQmfService() + { + return _qmfService; + } + + protected void setQmfService(QMFService qmfService) + { + _qmfService = qmfService; } public static void initialise(IApplicationRegistry instance) throws Exception @@ -201,7 +260,6 @@ public abstract class ApplicationRegistry implements IApplicationRegistry _logger.info("Shutting down ApplicationRegistry(" + instance + ")"); } instance.close(); - instance.getBroker().getSystem().removeBroker(instance.getBroker()); } } catch (Exception e) @@ -256,7 +314,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry _qmfService = new QMFService(getConfigStore(), this); - CurrentActor.get().message(BrokerMessages.STARTUP(QpidProperties.getReleaseVersion(), QpidProperties.getBuildVersion())); + logStartupMessages(CurrentActor.get()); _virtualHostRegistry = new VirtualHostRegistry(this); @@ -285,6 +343,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } } + /** * Iterates across all discovered authentication manager factories, offering the security configuration to each. * Expects <b>exactly</b> one authentication manager to configure and initialise itself. @@ -358,57 +417,71 @@ public abstract class ApplicationRegistry implements IApplicationRegistry { _reportingTimer = new Timer("Statistics-Reporting", true); - class StatisticsReportingTask extends TimerTask + + + _reportingTimer.scheduleAtFixedRate(new StatisticsReportingTask(broker, virtualhost, reset), + report / 2, + report); + } + } + + private class StatisticsReportingTask extends TimerTask + { + private final int DELIVERED = 0; + private final int RECEIVED = 1; + + private boolean _broker; + private boolean _virtualhost; + private boolean _reset; + + + public StatisticsReportingTask(boolean broker, boolean virtualhost, boolean reset) + { + _broker = broker; + _virtualhost = virtualhost; + _reset = reset; + } + + public void run() + { + CurrentActor.set(new AbstractActor(ApplicationRegistry.getInstance().getRootMessageLogger()) { + public String getLogMessage() + { + return "[" + Thread.currentThread().getName() + "] "; + } + }); + + if (_broker) { - private final int DELIVERED = 0; - private final int RECEIVED = 1; - - public void run() + CurrentActor.get().message(BrokerMessages.STATS_DATA(DELIVERED, _dataDelivered.getPeak() / 1024.0, _dataDelivered.getTotal())); + CurrentActor.get().message(BrokerMessages.STATS_MSGS(DELIVERED, _messagesDelivered.getPeak(), _messagesDelivered.getTotal())); + CurrentActor.get().message(BrokerMessages.STATS_DATA(RECEIVED, _dataReceived.getPeak() / 1024.0, _dataReceived.getTotal())); + CurrentActor.get().message(BrokerMessages.STATS_MSGS(RECEIVED, _messagesReceived.getPeak(), _messagesReceived.getTotal())); + } + + if (_virtualhost) + { + for (VirtualHost vhost : getVirtualHostRegistry().getVirtualHosts()) { - CurrentActor.set(new AbstractActor(ApplicationRegistry.getInstance().getRootMessageLogger()) { - public String getLogMessage() - { - return "[" + Thread.currentThread().getName() + "] "; - } - }); - - if (broker) - { - CurrentActor.get().message(BrokerMessages.STATS_DATA(DELIVERED, _dataDelivered.getPeak() / 1024.0, _dataDelivered.getTotal())); - CurrentActor.get().message(BrokerMessages.STATS_MSGS(DELIVERED, _messagesDelivered.getPeak(), _messagesDelivered.getTotal())); - CurrentActor.get().message(BrokerMessages.STATS_DATA(RECEIVED, _dataReceived.getPeak() / 1024.0, _dataReceived.getTotal())); - CurrentActor.get().message(BrokerMessages.STATS_MSGS(RECEIVED, _messagesReceived.getPeak(), _messagesReceived.getTotal())); - } - - if (virtualhost) - { - for (VirtualHost vhost : getVirtualHostRegistry().getVirtualHosts()) - { - String name = vhost.getName(); - StatisticsCounter dataDelivered = vhost.getDataDeliveryStatistics(); - StatisticsCounter messagesDelivered = vhost.getMessageDeliveryStatistics(); - StatisticsCounter dataReceived = vhost.getDataReceiptStatistics(); - StatisticsCounter messagesReceived = vhost.getMessageReceiptStatistics(); - - CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, DELIVERED, dataDelivered.getPeak() / 1024.0, dataDelivered.getTotal())); - CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, DELIVERED, messagesDelivered.getPeak(), messagesDelivered.getTotal())); - CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, RECEIVED, dataReceived.getPeak() / 1024.0, dataReceived.getTotal())); - CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, RECEIVED, messagesReceived.getPeak(), messagesReceived.getTotal())); - } - } - - if (reset) - { - resetStatistics(); - } - - CurrentActor.remove(); + String name = vhost.getName(); + StatisticsCounter dataDelivered = vhost.getDataDeliveryStatistics(); + StatisticsCounter messagesDelivered = vhost.getMessageDeliveryStatistics(); + StatisticsCounter dataReceived = vhost.getDataReceiptStatistics(); + StatisticsCounter messagesReceived = vhost.getMessageReceiptStatistics(); + + CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, DELIVERED, dataDelivered.getPeak() / 1024.0, dataDelivered.getTotal())); + CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, DELIVERED, messagesDelivered.getPeak(), messagesDelivered.getTotal())); + CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, RECEIVED, dataReceived.getPeak() / 1024.0, dataReceived.getTotal())); + CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, RECEIVED, messagesReceived.getPeak(), messagesReceived.getTotal())); } } - _reportingTimer.scheduleAtFixedRate(new StatisticsReportingTask(), - report / 2, - report); + if (_reset) + { + resetStatistics(); + } + + CurrentActor.remove(); } } @@ -449,35 +522,49 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } } - public void close() { if (_logger.isInfoEnabled()) { _logger.info("Shutting down ApplicationRegistry:" + this); } - - //Stop Statistics Reporting - if (_reportingTimer != null) + + //Set the Actor for Broker Shutdown + CurrentActor.set(new BrokerActor(getRootMessageLogger())); + try { - _reportingTimer.cancel(); - } + //Stop Statistics Reporting + if (_reportingTimer != null) + { + _reportingTimer.cancel(); + } - //Stop incoming connections - unbind(); + //Stop incoming connections + unbind(); - //Shutdown virtualhosts - close(_virtualHostRegistry); + //Shutdown virtualhosts + close(_virtualHostRegistry); - close(_authenticationManager); + close(_authenticationManager); - close(_qmfService); + close(_qmfService); - close(_pluginManager); + close(_pluginManager); - close(_managedObjectRegistry); + close(_managedObjectRegistry); - CurrentActor.get().message(BrokerMessages.STOPPED()); + BrokerConfig broker = getBroker(); + if(broker != null) + { + broker.getSystem().removeBroker(broker); + } + + CurrentActor.get().message(BrokerMessages.STOPPED()); + } + finally + { + CurrentActor.remove(); + } } private void unbind() @@ -654,4 +741,18 @@ public abstract class ApplicationRegistry implements IApplicationRegistry { _statisticsEnabled = enabled; } + + private void logStartupMessages(LogActor logActor) + { + logActor.message(BrokerMessages.STARTUP(QpidProperties.getReleaseVersion(), QpidProperties.getBuildVersion())); + + logActor.message(BrokerMessages.PLATFORM(System.getProperty("java.vendor"), + System.getProperty("java.runtime.version", System.getProperty("java.version")), + System.getProperty("os.name"), + System.getProperty("os.version"), + System.getProperty("os.arch"))); + + logActor.message(BrokerMessages.MAX_MEMORY(Runtime.getRuntime().maxMemory())); + } + } |