diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java | 624 |
1 files changed, 102 insertions, 522 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index e0e317f75d..1379b375cf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -20,44 +20,39 @@ */ package org.apache.qpid.server.registry; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.log4j.Logger; -import org.apache.qpid.server.logging.*; -import org.osgi.framework.BundleContext; +import java.util.Collection; +import java.util.Timer; +import java.util.TimerTask; +import org.apache.log4j.Logger; import org.apache.qpid.common.Closeable; import org.apache.qpid.common.QpidProperties; -import org.apache.qpid.qmf.QMFService; -import org.apache.qpid.server.configuration.BrokerConfig; -import org.apache.qpid.server.configuration.ConfigStore; -import org.apache.qpid.server.configuration.ConfigurationManager; -import org.apache.qpid.server.configuration.ServerConfiguration; -import org.apache.qpid.server.configuration.SystemConfig; -import org.apache.qpid.server.configuration.SystemConfigImpl; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.configuration.BrokerProperties; +import org.apache.qpid.server.configuration.ConfigurationEntryStore; +import org.apache.qpid.server.configuration.ConfiguredObjectRecoverer; +import org.apache.qpid.server.configuration.RecovererProvider; +import org.apache.qpid.server.configuration.startup.DefaultRecovererProvider; +import org.apache.qpid.server.logging.CompositeStartupMessageLogger; +import org.apache.qpid.server.logging.Log4jMessageLogger; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogRecorder; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.SystemOutMessageLogger; import org.apache.qpid.server.logging.actors.AbstractActor; import org.apache.qpid.server.logging.actors.BrokerActor; import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.BrokerMessages; import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.adapter.BrokerAdapter; -import org.apache.qpid.server.plugins.PluginManager; -import org.apache.qpid.server.security.SecurityManager; -import org.apache.qpid.server.security.auth.manager.AuthenticationManager; -import org.apache.qpid.server.security.auth.manager.AuthenticationManagerRegistry; -import org.apache.qpid.server.security.auth.manager.IAuthenticationManagerRegistry; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.stats.StatisticsCounter; -import org.apache.qpid.server.transport.QpidAcceptor; +import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.*; -import java.util.concurrent.atomic.AtomicReference; - /** * An abstract application registry that provides access to configuration information and handles the @@ -65,321 +60,90 @@ import java.util.concurrent.atomic.AtomicReference; * <p/> * Subclasses should handle the construction of the "registered objects" such as the exchange registry. */ -public abstract class ApplicationRegistry implements IApplicationRegistry +public class ApplicationRegistry implements IApplicationRegistry { - private static final Logger _logger = Logger.getLogger(ApplicationRegistry.class); - private static AtomicReference<IApplicationRegistry> _instance = new AtomicReference<IApplicationRegistry>(null); - - private final ServerConfiguration _configuration; - - private final Map<InetSocketAddress, QpidAcceptor> _acceptors = - Collections.synchronizedMap(new HashMap<InetSocketAddress, QpidAcceptor>()); - - private IAuthenticationManagerRegistry _authenticationManagerRegistry; - - private final VirtualHostRegistry _virtualHostRegistry = new VirtualHostRegistry(this); - - private SecurityManager _securityManager; + private final VirtualHostRegistry _virtualHostRegistry = new VirtualHostRegistry(); - private PluginManager _pluginManager; - - private ConfigurationManager _configurationManager; - - private RootMessageLogger _rootMessageLogger; - - private CompositeStartupMessageLogger _startupMessageLogger; - - private UUID _brokerId = UUID.randomUUID(); - - private QMFService _qmfService; - - private BrokerConfig _brokerConfig; + private volatile RootMessageLogger _rootMessageLogger; private Broker _broker; - private ConfigStore _configStore; - private Timer _reportingTimer; private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; - private BundleContext _bundleContext; - - private final List<PortBindingListener> _portBindingListeners = new ArrayList<PortBindingListener>(); - - private int _httpManagementPort = -1, _httpsManagementPort = -1; - private LogRecorder _logRecorder; - private List<IAuthenticationManagerRegistry.RegistryChangeListener> _authManagerChangeListeners = - new ArrayList<IAuthenticationManagerRegistry.RegistryChangeListener>(); - - public Map<InetSocketAddress, QpidAcceptor> getAcceptors() - { - synchronized (_acceptors) - { - return new HashMap<InetSocketAddress, QpidAcceptor>(_acceptors); - } - } - - protected void setSecurityManager(SecurityManager securityManager) - { - _securityManager = securityManager; - } - - protected void setPluginManager(PluginManager pluginManager) - { - _pluginManager = pluginManager; - } - - protected void setConfigurationManager(ConfigurationManager configurationManager) - { - _configurationManager = configurationManager; - } + private ConfigurationEntryStore _store; + private TaskExecutor _taskExecutor; protected void setRootMessageLogger(RootMessageLogger rootMessageLogger) { _rootMessageLogger = rootMessageLogger; } - protected CompositeStartupMessageLogger getStartupMessageLogger() - { - return _startupMessageLogger; - } - - protected void setStartupMessageLogger(CompositeStartupMessageLogger startupMessageLogger) + public ApplicationRegistry(ConfigurationEntryStore store) { - _startupMessageLogger = startupMessageLogger; - } - - protected void setBrokerId(UUID brokerId) - { - _brokerId = brokerId; - } - - protected QMFService getQmfService() - { - return _qmfService; - } - - protected void setQmfService(QMFService qmfService) - { - _qmfService = qmfService; - } - - public static void initialise(IApplicationRegistry instance) throws Exception - { - if(instance == null) - { - throw new IllegalArgumentException("ApplicationRegistry instance must not be null"); - } - - if(!_instance.compareAndSet(null, instance)) - { - throw new IllegalStateException("An ApplicationRegistry is already initialised"); - } - - _logger.info("Initialising Application Registry(" + instance + ")"); - - - final ConfigStore store = ConfigStore.newInstance(); - store.setRoot(new SystemConfigImpl(store)); - instance.setConfigStore(store); - - final BrokerConfig brokerConfig = new BrokerConfigAdapter(instance); - - final SystemConfig system = store.getRoot(); - system.addBroker(brokerConfig); - instance.setBrokerConfig(brokerConfig); - - try - { - instance.initialise(); - } - catch (Exception e) - { - _instance.set(null); - - //remove the Broker instance, then re-throw - try - { - system.removeBroker(brokerConfig); - } - catch(Throwable t) - { - //ignore - } - - throw e; - } - } - - public ConfigStore getConfigStore() - { - return _configStore; - } - - public void setConfigStore(final ConfigStore configStore) - { - _configStore = configStore; - } - - public static boolean isConfigured() - { - return _instance.get() != null; - } - - public static void remove() - { - IApplicationRegistry instance = _instance.getAndSet(null); - try - { - if (instance != null) - { - if (_logger.isInfoEnabled()) - { - _logger.info("Shutting down ApplicationRegistry(" + instance + ")"); - } - instance.close(); - } - } - catch (Exception e) - { - _logger.error("Error shutting down Application Registry(" + instance + "): " + e, e); - } - } - - protected ApplicationRegistry(ServerConfiguration configuration) - { - this(configuration, null); - } - - protected ApplicationRegistry(ServerConfiguration configuration, BundleContext bundleContext) - { - _configuration = configuration; - _bundleContext = bundleContext; - } - - public void configure() throws ConfigurationException - { - _configurationManager = new ConfigurationManager(); - - try - { - _pluginManager = new PluginManager(_configuration.getPluginDirectory(), _configuration.getCacheDirectory(), _bundleContext); - } - catch (Exception e) - { - throw new ConfigurationException(e); - } - - _configuration.initialise(); + _store = store; + initialiseStatistics(); } public void initialise() throws Exception { + // Create the RootLogger to be used during broker operation + boolean statusUpdatesEnabled = Boolean.parseBoolean(System.getProperty(BrokerProperties.PROPERTY_STATUS_UPDATES, "true")); + _rootMessageLogger = new Log4jMessageLogger(statusUpdatesEnabled); + _logRecorder = new LogRecorder(); - //Create the RootLogger to be used during broker operation - _rootMessageLogger = new Log4jMessageLogger(_configuration); //Create the composite (log4j+SystemOut MessageLogger to be used during startup RootMessageLogger[] messageLoggers = {new SystemOutMessageLogger(), _rootMessageLogger}; - _startupMessageLogger = new CompositeStartupMessageLogger(messageLoggers); + CompositeStartupMessageLogger startupMessageLogger = new CompositeStartupMessageLogger(messageLoggers); - BrokerActor actor = new BrokerActor(_startupMessageLogger); - CurrentActor.setDefault(actor); + BrokerActor actor = new BrokerActor(startupMessageLogger); CurrentActor.set(actor); - + CurrentActor.setDefault(actor); + GenericActor.setDefaultMessageLogger(_rootMessageLogger); try { - initialiseStatistics(); - - if(_configuration.getHTTPManagementEnabled()) - { - _httpManagementPort = _configuration.getHTTPManagementPort(); - } - if (_configuration.getHTTPSManagementEnabled()) - { - _httpsManagementPort = _configuration.getHTTPSManagementPort(); - } - - _broker = new BrokerAdapter(this); - - configure(); - - _qmfService = new QMFService(getConfigStore(), this); - logStartupMessages(CurrentActor.get()); - _securityManager = new SecurityManager(_configuration, _pluginManager); + _taskExecutor = new TaskExecutor(); + _taskExecutor.start(); - _authenticationManagerRegistry = createAuthenticationManagerRegistry(_configuration, _pluginManager); + RecovererProvider provider = new DefaultRecovererProvider((StatisticsGatherer)this, _virtualHostRegistry, _logRecorder, _rootMessageLogger, _taskExecutor); + ConfiguredObjectRecoverer<? extends ConfiguredObject> brokerRecoverer = provider.getRecoverer(Broker.class.getSimpleName()); + _broker = (Broker) brokerRecoverer.create(provider, _store.getRootEntry()); - if(!_authManagerChangeListeners.isEmpty()) - { - for(IAuthenticationManagerRegistry.RegistryChangeListener listener : _authManagerChangeListeners) - { + _virtualHostRegistry.setDefaultVirtualHostName((String)_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST)); - _authenticationManagerRegistry.addRegistryChangeListener(listener); - for(AuthenticationManager authMgr : _authenticationManagerRegistry.getAvailableAuthenticationManagers().values()) - { - listener.authenticationManagerRegistered(authMgr); - } - } - _authManagerChangeListeners.clear(); - } - } - finally - { - CurrentActor.remove(); - } - - CurrentActor.set(new BrokerActor(_rootMessageLogger)); - try - { - initialiseVirtualHosts(); initialiseStatisticsReporting(); + + // starting the broker + _broker.setDesiredState(State.INITIALISING, State.ACTIVE); + + CurrentActor.get().message(BrokerMessages.READY()); } finally { - // Startup complete, so pop the current actor CurrentActor.remove(); } - } - protected IAuthenticationManagerRegistry createAuthenticationManagerRegistry(ServerConfiguration _configuration, PluginManager _pluginManager) - throws ConfigurationException - { - return new AuthenticationManagerRegistry(_configuration, _pluginManager); + CurrentActor.setDefault(new BrokerActor(_rootMessageLogger)); } - protected void initialiseVirtualHosts() throws Exception + private void initialiseStatisticsReporting() { - for (String name : _configuration.getVirtualHosts()) - { - createVirtualHost(_configuration.getVirtualHostConfig(name)); - } - getVirtualHostRegistry().setDefaultVirtualHostName(_configuration.getDefaultVirtualHost()); - } - - public void initialiseStatisticsReporting() - { - long report = _configuration.getStatisticsReportingPeriod() * 1000; // convert to ms - final boolean broker = _configuration.isStatisticsGenerationBrokerEnabled(); - final boolean virtualhost = _configuration.isStatisticsGenerationVirtualhostsEnabled(); - final boolean reset = _configuration.isStatisticsReportResetEnabled(); + long report = ((Number)_broker.getAttribute(Broker.STATISTICS_REPORTING_PERIOD)).intValue() * 1000; // convert to ms + final boolean reset = (Boolean)_broker.getAttribute(Broker.STATISTICS_REPORTING_RESET_ENABLED); /* add a timer task to report statistics if generation is enabled for broker or virtualhosts */ - if (report > 0L && (broker || virtualhost)) + if (report > 0L) { _reportingTimer = new Timer("Statistics-Reporting", true); - - - - _reportingTimer.scheduleAtFixedRate(new StatisticsReportingTask(broker, virtualhost, reset), - report / 2, - report); + StatisticsReportingTask task = new StatisticsReportingTask(reset, _rootMessageLogger); + _reportingTimer.scheduleAtFixedRate(task, report / 2, report); } } @@ -388,76 +152,62 @@ public abstract class ApplicationRegistry implements IApplicationRegistry private final int DELIVERED = 0; private final int RECEIVED = 1; - private boolean _broker; - private boolean _virtualhost; - private boolean _reset; + private final boolean _reset; + private final RootMessageLogger _logger; - - public StatisticsReportingTask(boolean broker, boolean virtualhost, boolean reset) + public StatisticsReportingTask(boolean reset, RootMessageLogger logger) { - _broker = broker; - _virtualhost = virtualhost; _reset = reset; + _logger = logger; } public void run() { - CurrentActor.set(new AbstractActor(ApplicationRegistry.getInstance().getRootMessageLogger()) { + CurrentActor.set(new AbstractActor(_logger) + { public String getLogMessage() { return "[" + Thread.currentThread().getName() + "] "; } }); - - if (_broker) + try { CurrentActor.get().message(BrokerMessages.STATS_DATA(DELIVERED, _dataDelivered.getPeak() / 1024.0, _dataDelivered.getTotal())); CurrentActor.get().message(BrokerMessages.STATS_MSGS(DELIVERED, _messagesDelivered.getPeak(), _messagesDelivered.getTotal())); CurrentActor.get().message(BrokerMessages.STATS_DATA(RECEIVED, _dataReceived.getPeak() / 1024.0, _dataReceived.getTotal())); CurrentActor.get().message(BrokerMessages.STATS_MSGS(RECEIVED, _messagesReceived.getPeak(), _messagesReceived.getTotal())); - } + Collection<VirtualHost> hosts = _virtualHostRegistry.getVirtualHosts(); - if (_virtualhost) - { - for (VirtualHost vhost : getVirtualHostRegistry().getVirtualHosts()) + if (hosts.size() > 1) { - String name = vhost.getName(); - StatisticsCounter dataDelivered = vhost.getDataDeliveryStatistics(); - StatisticsCounter messagesDelivered = vhost.getMessageDeliveryStatistics(); - StatisticsCounter dataReceived = vhost.getDataReceiptStatistics(); - StatisticsCounter messagesReceived = vhost.getMessageReceiptStatistics(); - - CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, DELIVERED, dataDelivered.getPeak() / 1024.0, dataDelivered.getTotal())); - CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, DELIVERED, messagesDelivered.getPeak(), messagesDelivered.getTotal())); - CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, RECEIVED, dataReceived.getPeak() / 1024.0, dataReceived.getTotal())); - CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, RECEIVED, messagesReceived.getPeak(), messagesReceived.getTotal())); + for (VirtualHost vhost : hosts) + { + String name = vhost.getName(); + StatisticsCounter dataDelivered = vhost.getDataDeliveryStatistics(); + StatisticsCounter messagesDelivered = vhost.getMessageDeliveryStatistics(); + StatisticsCounter dataReceived = vhost.getDataReceiptStatistics(); + StatisticsCounter messagesReceived = vhost.getMessageReceiptStatistics(); + + CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, DELIVERED, dataDelivered.getPeak() / 1024.0, dataDelivered.getTotal())); + CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, DELIVERED, messagesDelivered.getPeak(), messagesDelivered.getTotal())); + CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, RECEIVED, dataReceived.getPeak() / 1024.0, dataReceived.getTotal())); + CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, RECEIVED, messagesReceived.getPeak(), messagesReceived.getTotal())); + } } - } - if (_reset) + if (_reset) + { + resetStatistics(); + } + } + catch(Exception e) { - resetStatistics(); + ApplicationRegistry._logger.warn("Unexpected exception occured while reporting the statistics", e); + } + finally + { + CurrentActor.remove(); } - - CurrentActor.remove(); - } - } - - /** - * Get the ApplicationRegistry - * @return the IApplicationRegistry instance - * @throws IllegalStateException if no registry instance has been initialised. - */ - public static IApplicationRegistry getInstance() throws IllegalStateException - { - IApplicationRegistry iApplicationRegistry = _instance.get(); - if (iApplicationRegistry == null) - { - throw new IllegalStateException("No ApplicationRegistry has been initialised"); - } - else - { - return iApplicationRegistry; } } @@ -488,7 +238,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } //Set the Actor for Broker Shutdown - CurrentActor.set(new BrokerActor(getRootMessageLogger())); + CurrentActor.set(new BrokerActor(_rootMessageLogger)); try { //Stop Statistics Reporting @@ -497,154 +247,34 @@ public abstract class ApplicationRegistry implements IApplicationRegistry _reportingTimer.cancel(); } - //Stop incoming connections - unbind(); + if (_broker != null) + { + _broker.setDesiredState(_broker.getActualState(), State.STOPPED); + } //Shutdown virtualhosts close(_virtualHostRegistry); - close(_authenticationManagerRegistry); - - close(_qmfService); - - close(_pluginManager); - - BrokerConfig broker = getBrokerConfig(); - if(broker != null) + if (_taskExecutor != null) { - broker.getSystem().removeBroker(broker); + _taskExecutor.stop(); } CurrentActor.get().message(BrokerMessages.STOPPED()); - } - finally - { - CurrentActor.remove(); - } - } - - private void unbind() - { - List<QpidAcceptor> removedAcceptors = new ArrayList<QpidAcceptor>(); - synchronized (_acceptors) - { - for (InetSocketAddress bindAddress : _acceptors.keySet()) - { - QpidAcceptor acceptor = _acceptors.get(bindAddress); - removedAcceptors.add(acceptor); - try - { - acceptor.getNetworkTransport().close(); - } - catch (Throwable e) - { - _logger.error("Unable to close network driver due to:" + e.getMessage()); - } + _logRecorder.closeLogRecorder(); - CurrentActor.get().message(BrokerMessages.SHUTTING_DOWN(acceptor.toString(), bindAddress.getPort())); - } } - synchronized (_portBindingListeners) - { - for(QpidAcceptor acceptor : removedAcceptors) - { - for(PortBindingListener listener : _portBindingListeners) - { - listener.unbound(acceptor); - } - } - } - } - - public ServerConfiguration getConfiguration() - { - return _configuration; - } - - public void addAcceptor(InetSocketAddress bindAddress, QpidAcceptor acceptor) - { - synchronized (_acceptors) - { - _acceptors.put(bindAddress, acceptor); - } - synchronized (_portBindingListeners) + finally { - for(PortBindingListener listener : _portBindingListeners) + if (_taskExecutor != null) { - listener.bound(acceptor, bindAddress); + _taskExecutor.stopImmediately(); } + CurrentActor.remove(); } - } - - public VirtualHostRegistry getVirtualHostRegistry() - { - return _virtualHostRegistry; - } - - public SecurityManager getSecurityManager() - { - return _securityManager; - } - - @Override - public AuthenticationManager getAuthenticationManager(SocketAddress address) - { - return _authenticationManagerRegistry.getAuthenticationManager(address); - } - - @Override - public IAuthenticationManagerRegistry getAuthenticationManagerRegistry() - { - return _authenticationManagerRegistry; - } - - public PluginManager getPluginManager() - { - return _pluginManager; - } - - public ConfigurationManager getConfigurationManager() - { - return _configurationManager; - } - - public RootMessageLogger getRootMessageLogger() - { - return _rootMessageLogger; - } - - public RootMessageLogger getCompositeStartupMessageLogger() - { - return _startupMessageLogger; - } - - public UUID getBrokerId() - { - return _brokerId; - } - - public QMFService getQMFService() - { - return _qmfService; - } - - public BrokerConfig getBrokerConfig() - { - return _brokerConfig; - } - - public void setBrokerConfig(final BrokerConfig broker) - { - _brokerConfig = broker; - } - - public VirtualHost createVirtualHost(final VirtualHostConfiguration vhostConfig) throws Exception - { - VirtualHostImpl virtualHost = new VirtualHostImpl(this, vhostConfig); - _virtualHostRegistry.registerVirtualHost(virtualHost); - getBrokerConfig().addVirtualHost(virtualHost); - return virtualHost; + _store = null; + _broker = null; } public void registerMessageDelivered(long messageSize) @@ -713,60 +343,10 @@ public abstract class ApplicationRegistry implements IApplicationRegistry logActor.message(BrokerMessages.MAX_MEMORY(Runtime.getRuntime().maxMemory())); } + @Override public Broker getBroker() { return _broker; } - @Override - public void addPortBindingListener(PortBindingListener listener) - { - synchronized (_portBindingListeners) - { - _portBindingListeners.add(listener); - } - } - - - @Override - public boolean useHTTPManagement() - { - return _httpManagementPort != -1; - } - - @Override - public int getHTTPManagementPort() - { - return _httpManagementPort; - } - - @Override - public boolean useHTTPSManagement() - { - return _httpsManagementPort != -1; - } - - @Override - public int getHTTPSManagementPort() - { - return _httpsManagementPort; - } - - public LogRecorder getLogRecorder() - { - return _logRecorder; - } - - @Override - public void addRegistryChangeListener(IAuthenticationManagerRegistry.RegistryChangeListener registryChangeListener) - { - if(_authenticationManagerRegistry == null) - { - _authManagerChangeListeners.add(registryChangeListener); - } - else - { - _authenticationManagerRegistry.addRegistryChangeListener(registryChangeListener); - } - } } |