diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java | 367 |
1 files changed, 110 insertions, 257 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 7c804fc1fd..78a642f22f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -21,14 +21,9 @@ package org.apache.qpid.server.registry; import java.net.InetSocketAddress; -import java.util.Collection; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; import java.util.UUID; -import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; @@ -46,27 +41,24 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.logging.CompositeStartupMessageLogger; import org.apache.qpid.server.logging.Log4jMessageLogger; import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.AbstractRootMessageLogger; import org.apache.qpid.server.logging.SystemOutMessageLogger; -import org.apache.qpid.server.logging.actors.AbstractActor; import org.apache.qpid.server.logging.actors.BrokerActor; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.BrokerMessages; -import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.management.ManagedObjectRegistry; import org.apache.qpid.server.management.NoopManagedObjectRegistry; -import org.apache.qpid.server.plugins.Plugin; import org.apache.qpid.server.plugins.PluginManager; import org.apache.qpid.server.security.SecurityManager; -import org.apache.qpid.server.security.SecurityManager.SecurityConfiguration; +import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalDatabaseManager; +import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; -import org.apache.qpid.server.security.auth.manager.AuthenticationManagerPluginFactory; -import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; import org.apache.qpid.server.transport.QpidAcceptor; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; - /** * An abstract application registry that provides access to configuration information and handles the * construction and caching of configurable objects. @@ -77,10 +69,12 @@ public abstract class ApplicationRegistry implements IApplicationRegistry { protected static final Logger _logger = Logger.getLogger(ApplicationRegistry.class); - private static AtomicReference<IApplicationRegistry> _instance = new AtomicReference<IApplicationRegistry>(null); + private static Map<Integer, IApplicationRegistry> _instanceMap = new HashMap<Integer, IApplicationRegistry>(); protected final ServerConfiguration _configuration; + public static final int DEFAULT_INSTANCE = 1; + protected final Map<InetSocketAddress, QpidAcceptor> _acceptors = new HashMap<InetSocketAddress, QpidAcceptor>(); protected ManagedObjectRegistry _managedObjectRegistry; @@ -91,6 +85,8 @@ public abstract class ApplicationRegistry implements IApplicationRegistry protected SecurityManager _securityManager; + protected PrincipalDatabaseManager _databaseManager; + protected PluginManager _pluginManager; protected ConfigurationManager _configurationManager; @@ -106,10 +102,8 @@ public abstract class ApplicationRegistry implements IApplicationRegistry private BrokerConfig _broker; private ConfigStore _configStore; - - private Timer _reportingTimer; - private boolean _statisticsEnabled = false; - private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; + + protected String _registryName; static { @@ -120,54 +114,53 @@ public abstract class ApplicationRegistry implements IApplicationRegistry { public void run() { - remove(); + removeAll(); } } public static void initialise(IApplicationRegistry instance) throws Exception { - if(instance == null) - { - throw new IllegalArgumentException("ApplicationRegistry instance must not be null"); - } + initialise(instance, DEFAULT_INSTANCE); + } - if(!_instance.compareAndSet(null, instance)) + @SuppressWarnings("finally") + public static void initialise(IApplicationRegistry instance, int instanceID) throws Exception + { + if (instance != null) { - throw new IllegalStateException("An ApplicationRegistry is already initialised"); - } - - _logger.info("Initialising Application Registry(" + instance + ")"); + _logger.info("Initialising Application Registry(" + instance + "):" + instanceID); + _instanceMap.put(instanceID, instance); + final ConfigStore store = ConfigStore.newInstance(); + store.setRoot(new SystemConfigImpl(store)); + instance.setConfigStore(store); - final ConfigStore store = ConfigStore.newInstance(); - store.setRoot(new SystemConfigImpl(store)); - instance.setConfigStore(store); + BrokerConfig broker = new BrokerConfigAdapter(instance); - BrokerConfig broker = new BrokerConfigAdapter(instance); + SystemConfig system = (SystemConfig) store.getRoot(); + system.addBroker(broker); + instance.setBroker(broker); - SystemConfig system = (SystemConfig) store.getRoot(); - system.addBroker(broker); - instance.setBroker(broker); - - try - { - instance.initialise(); - } - catch (Exception e) - { - _instance.set(null); - - //remove the Broker instance, then re-throw try { - system.removeBroker(broker); + instance.initialise(instanceID); } - catch(Throwable t) + catch (Exception e) { - //ignore + _instanceMap.remove(instanceID); + try + { + system.removeBroker(broker); + } + finally + { + throw e; + } } - - throw e; + } + else + { + remove(instanceID); } } @@ -183,19 +176,35 @@ public abstract class ApplicationRegistry implements IApplicationRegistry public static boolean isConfigured() { - return _instance.get() != null; + return isConfigured(DEFAULT_INSTANCE); + } + + public static boolean isConfigured(int instanceID) + { + return _instanceMap.containsKey(instanceID); } + /** Method to cleanly shutdown the default registry running in this JVM */ public static void remove() { - IApplicationRegistry instance = _instance.getAndSet(null); + remove(DEFAULT_INSTANCE); + } + + /** + * Method to cleanly shutdown specified registry running in this JVM + * + * @param instanceID the instance to shutdown + */ + public static void remove(int instanceID) + { try { + IApplicationRegistry instance = _instanceMap.get(instanceID); if (instance != null) { if (_logger.isInfoEnabled()) { - _logger.info("Shutting down ApplicationRegistry(" + instance + ")"); + _logger.info("Shutting down ApplicationRegistry(" + instanceID + "):" + instance); } instance.close(); instance.getBroker().getSystem().removeBroker(instance.getBroker()); @@ -203,7 +212,21 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } catch (Exception e) { - _logger.error("Error shutting down Application Registry(" + instance + "): " + e, e); + _logger.error("Error shutting down Application Registry(" + instanceID + "): " + e, e); + } + finally + { + _instanceMap.remove(instanceID); + } + } + + /** Method to cleanly shutdown all registries currently running in this JVM */ + public static void removeAll() + { + Object[] keys = _instanceMap.keySet().toArray(); + for (Object k : keys) + { + remove((Integer) k); } } @@ -228,10 +251,11 @@ public abstract class ApplicationRegistry implements IApplicationRegistry _configuration.initialise(); } - public void initialise() throws Exception + public void initialise(int instanceID) throws Exception { //Create the RootLogger to be used during broker operation _rootMessageLogger = new Log4jMessageLogger(_configuration); + _registryName = String.valueOf(instanceID); //Create the composite (log4j+SystemOut MessageLogger to be used during startup RootMessageLogger[] messageLoggers = {new SystemOutMessageLogger(), _rootMessageLogger}; @@ -253,7 +277,11 @@ public abstract class ApplicationRegistry implements IApplicationRegistry _securityManager = new SecurityManager(_configuration, _pluginManager); - _authenticationManager = createAuthenticationManager(); + createDatabaseManager(_configuration); + + _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null); + + _databaseManager.initialiseManagement(_configuration); _managedObjectRegistry.start(); } @@ -266,8 +294,6 @@ public abstract class ApplicationRegistry implements IApplicationRegistry try { initialiseVirtualHosts(); - initialiseStatistics(); - initialiseStatisticsReporting(); } finally { @@ -276,51 +302,9 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } } - /** - * Iterates across all discovered authentication manager factories, offering the security configuration to each. - * Expects <b>exactly</b> one authentication manager to configure and initialise itself. - * - * It is an error to configure more than one authentication manager, or to configure none. - * - * @return authentication manager - * @throws ConfigurationException - */ - protected AuthenticationManager createAuthenticationManager() throws ConfigurationException + protected void createDatabaseManager(ServerConfiguration configuration) throws Exception { - final SecurityConfiguration securityConfiguration = _configuration.getConfiguration(SecurityConfiguration.class.getName()); - final Collection<AuthenticationManagerPluginFactory<? extends Plugin>> factories = _pluginManager.getAuthenticationManagerPlugins().values(); - - if (factories.size() == 0) - { - throw new ConfigurationException("No authentication manager factory plugins found. Check the desired authentication" + - "manager plugin has been placed in the plugins directory."); - } - - AuthenticationManager authMgr = null; - - for (final Iterator<AuthenticationManagerPluginFactory<? extends Plugin>> iterator = factories.iterator(); iterator.hasNext();) - { - final AuthenticationManagerPluginFactory<? extends Plugin> factory = (AuthenticationManagerPluginFactory<? extends Plugin>) iterator.next(); - final AuthenticationManager tmp = factory.newInstance(securityConfiguration); - if (tmp != null) - { - if (authMgr != null) - { - throw new ConfigurationException("Cannot configure more than one authentication manager." - + " Both " + tmp.getClass() + " and " + authMgr.getClass() + " are configured." - + " Remove configuration for one of the authentication manager, or remove the plugin JAR" - + " from the classpath."); - } - authMgr = tmp; - } - } - - if (authMgr == null) - { - throw new ConfigurationException("No authentication managers configured within the configure file."); - } - - return authMgr; + _databaseManager = new ConfigurationFilePrincipalDatabaseManager(_configuration); } protected void initialiseVirtualHosts() throws Exception @@ -336,88 +320,26 @@ public abstract class ApplicationRegistry implements IApplicationRegistry { _managedObjectRegistry = new NoopManagedObjectRegistry(); } - - public void initialiseStatisticsReporting() - { - long report = _configuration.getStatisticsReportingPeriod() * 1000; // convert to ms - final boolean broker = _configuration.isStatisticsGenerationBrokerEnabled(); - final boolean virtualhost = _configuration.isStatisticsGenerationVirtualhostsEnabled(); - final boolean reset = _configuration.isStatisticsReportResetEnabled(); - - /* add a timer task to report statistics if generation is enabled for broker or virtualhosts */ - if (report > 0L && (broker || virtualhost)) - { - _reportingTimer = new Timer("Statistics-Reporting", true); - - class StatisticsReportingTask extends TimerTask - { - private final int DELIVERED = 0; - private final int RECEIVED = 1; - - public void run() - { - CurrentActor.set(new AbstractActor(ApplicationRegistry.getInstance().getRootMessageLogger()) { - public String getLogMessage() - { - return "[" + Thread.currentThread().getName() + "] "; - } - }); - - if (broker) - { - CurrentActor.get().message(BrokerMessages.STATS_DATA(DELIVERED, _dataDelivered.getPeak() / 1024.0, _dataDelivered.getTotal())); - CurrentActor.get().message(BrokerMessages.STATS_MSGS(DELIVERED, _messagesDelivered.getPeak(), _messagesDelivered.getTotal())); - CurrentActor.get().message(BrokerMessages.STATS_DATA(RECEIVED, _dataReceived.getPeak() / 1024.0, _dataReceived.getTotal())); - CurrentActor.get().message(BrokerMessages.STATS_MSGS(RECEIVED, _messagesReceived.getPeak(), _messagesReceived.getTotal())); - } - - if (virtualhost) - { - for (VirtualHost vhost : getVirtualHostRegistry().getVirtualHosts()) - { - String name = vhost.getName(); - StatisticsCounter dataDelivered = vhost.getDataDeliveryStatistics(); - StatisticsCounter messagesDelivered = vhost.getMessageDeliveryStatistics(); - StatisticsCounter dataReceived = vhost.getDataReceiptStatistics(); - StatisticsCounter messagesReceived = vhost.getMessageReceiptStatistics(); - - CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, DELIVERED, dataDelivered.getPeak() / 1024.0, dataDelivered.getTotal())); - CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, DELIVERED, messagesDelivered.getPeak(), messagesDelivered.getTotal())); - CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, RECEIVED, dataReceived.getPeak() / 1024.0, dataReceived.getTotal())); - CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, RECEIVED, messagesReceived.getPeak(), messagesReceived.getTotal())); - } - } - - if (reset) - { - resetStatistics(); - } - - CurrentActor.remove(); - } - } - _reportingTimer.scheduleAtFixedRate(new StatisticsReportingTask(), - report / 2, - report); - } + public static IApplicationRegistry getInstance() + { + return getInstance(DEFAULT_INSTANCE); } - /** - * Get the ApplicationRegistry - * @return the IApplicationRegistry instance - * @throws IllegalStateException if no registry instance has been initialised. - */ - public static IApplicationRegistry getInstance() throws IllegalStateException + public static IApplicationRegistry getInstance(int instanceID) { - IApplicationRegistry iApplicationRegistry = _instance.get(); - if (iApplicationRegistry == null) - { - throw new IllegalStateException("No ApplicationRegistry has been initialised"); - } - else + synchronized (IApplicationRegistry.class) { - return iApplicationRegistry; + IApplicationRegistry instance = _instanceMap.get(instanceID); + + if (instance == null) + { + throw new IllegalStateException("Application Registry (" + instanceID + ") not created"); + } + else + { + return instance; + } } } @@ -447,12 +369,6 @@ public abstract class ApplicationRegistry implements IApplicationRegistry { _logger.info("Shutting down ApplicationRegistry:" + this); } - - //Stop Statistics Reporting - if (_reportingTimer != null) - { - _reportingTimer.cancel(); - } //Stop incoming connections unbind(); @@ -460,6 +376,10 @@ public abstract class ApplicationRegistry implements IApplicationRegistry //Shutdown virtualhosts close(_virtualHostRegistry); +// close(_accessManager); +// +// close(_databaseManager); + close(_authenticationManager); close(_managedObjectRegistry); @@ -481,7 +401,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry try { - acceptor.getNetworkTransport().close(); + acceptor.getNetworkDriver().close(); } catch (Throwable e) { @@ -521,6 +441,11 @@ public abstract class ApplicationRegistry implements IApplicationRegistry return _managedObjectRegistry; } + public PrincipalDatabaseManager getDatabaseManager() + { + return _databaseManager; + } + public AuthenticationManager getAuthenticationManager() { return _authenticationManager; @@ -573,76 +498,4 @@ public abstract class ApplicationRegistry implements IApplicationRegistry getBroker().addVirtualHost(virtualHost); return virtualHost; } - - public void registerMessageDelivered(long messageSize) - { - if (isStatisticsEnabled()) - { - _messagesDelivered.registerEvent(1L); - _dataDelivered.registerEvent(messageSize); - } - } - - public void registerMessageReceived(long messageSize, long timestamp) - { - if (isStatisticsEnabled()) - { - _messagesReceived.registerEvent(1L, timestamp); - _dataReceived.registerEvent(messageSize, timestamp); - } - } - - public StatisticsCounter getMessageReceiptStatistics() - { - return _messagesReceived; - } - - public StatisticsCounter getDataReceiptStatistics() - { - return _dataReceived; - } - - public StatisticsCounter getMessageDeliveryStatistics() - { - return _messagesDelivered; - } - - public StatisticsCounter getDataDeliveryStatistics() - { - return _dataDelivered; - } - - public void resetStatistics() - { - _messagesDelivered.reset(); - _dataDelivered.reset(); - _messagesReceived.reset(); - _dataReceived.reset(); - - for (VirtualHost vhost : _virtualHostRegistry.getVirtualHosts()) - { - vhost.resetStatistics(); - } - } - - public void initialiseStatistics() - { - setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS && - getConfiguration().isStatisticsGenerationBrokerEnabled()); - - _messagesDelivered = new StatisticsCounter("messages-delivered"); - _dataDelivered = new StatisticsCounter("bytes-delivered"); - _messagesReceived = new StatisticsCounter("messages-received"); - _dataReceived = new StatisticsCounter("bytes-received"); - } - - public boolean isStatisticsEnabled() - { - return _statisticsEnabled; - } - - public void setStatisticsEnabled(boolean enabled) - { - _statisticsEnabled = enabled; - } } |