summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java367
1 files changed, 110 insertions, 257 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index 7c804fc1fd..78a642f22f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -21,14 +21,9 @@
package org.apache.qpid.server.registry;
import java.net.InetSocketAddress;
-import java.util.Collection;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
@@ -46,27 +41,24 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.logging.CompositeStartupMessageLogger;
import org.apache.qpid.server.logging.Log4jMessageLogger;
import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.AbstractRootMessageLogger;
import org.apache.qpid.server.logging.SystemOutMessageLogger;
-import org.apache.qpid.server.logging.actors.AbstractActor;
import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.BrokerMessages;
-import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.management.ManagedObjectRegistry;
import org.apache.qpid.server.management.NoopManagedObjectRegistry;
-import org.apache.qpid.server.plugins.Plugin;
import org.apache.qpid.server.plugins.PluginManager;
import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.SecurityManager.SecurityConfiguration;
+import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalDatabaseManager;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManagerPluginFactory;
-import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.transport.QpidAcceptor;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-
/**
* An abstract application registry that provides access to configuration information and handles the
* construction and caching of configurable objects.
@@ -77,10 +69,12 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
{
protected static final Logger _logger = Logger.getLogger(ApplicationRegistry.class);
- private static AtomicReference<IApplicationRegistry> _instance = new AtomicReference<IApplicationRegistry>(null);
+ private static Map<Integer, IApplicationRegistry> _instanceMap = new HashMap<Integer, IApplicationRegistry>();
protected final ServerConfiguration _configuration;
+ public static final int DEFAULT_INSTANCE = 1;
+
protected final Map<InetSocketAddress, QpidAcceptor> _acceptors = new HashMap<InetSocketAddress, QpidAcceptor>();
protected ManagedObjectRegistry _managedObjectRegistry;
@@ -91,6 +85,8 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
protected SecurityManager _securityManager;
+ protected PrincipalDatabaseManager _databaseManager;
+
protected PluginManager _pluginManager;
protected ConfigurationManager _configurationManager;
@@ -106,10 +102,8 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
private BrokerConfig _broker;
private ConfigStore _configStore;
-
- private Timer _reportingTimer;
- private boolean _statisticsEnabled = false;
- private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+
+ protected String _registryName;
static
{
@@ -120,54 +114,53 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
{
public void run()
{
- remove();
+ removeAll();
}
}
public static void initialise(IApplicationRegistry instance) throws Exception
{
- if(instance == null)
- {
- throw new IllegalArgumentException("ApplicationRegistry instance must not be null");
- }
+ initialise(instance, DEFAULT_INSTANCE);
+ }
- if(!_instance.compareAndSet(null, instance))
+ @SuppressWarnings("finally")
+ public static void initialise(IApplicationRegistry instance, int instanceID) throws Exception
+ {
+ if (instance != null)
{
- throw new IllegalStateException("An ApplicationRegistry is already initialised");
- }
-
- _logger.info("Initialising Application Registry(" + instance + ")");
+ _logger.info("Initialising Application Registry(" + instance + "):" + instanceID);
+ _instanceMap.put(instanceID, instance);
+ final ConfigStore store = ConfigStore.newInstance();
+ store.setRoot(new SystemConfigImpl(store));
+ instance.setConfigStore(store);
- final ConfigStore store = ConfigStore.newInstance();
- store.setRoot(new SystemConfigImpl(store));
- instance.setConfigStore(store);
+ BrokerConfig broker = new BrokerConfigAdapter(instance);
- BrokerConfig broker = new BrokerConfigAdapter(instance);
+ SystemConfig system = (SystemConfig) store.getRoot();
+ system.addBroker(broker);
+ instance.setBroker(broker);
- SystemConfig system = (SystemConfig) store.getRoot();
- system.addBroker(broker);
- instance.setBroker(broker);
-
- try
- {
- instance.initialise();
- }
- catch (Exception e)
- {
- _instance.set(null);
-
- //remove the Broker instance, then re-throw
try
{
- system.removeBroker(broker);
+ instance.initialise(instanceID);
}
- catch(Throwable t)
+ catch (Exception e)
{
- //ignore
+ _instanceMap.remove(instanceID);
+ try
+ {
+ system.removeBroker(broker);
+ }
+ finally
+ {
+ throw e;
+ }
}
-
- throw e;
+ }
+ else
+ {
+ remove(instanceID);
}
}
@@ -183,19 +176,35 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
public static boolean isConfigured()
{
- return _instance.get() != null;
+ return isConfigured(DEFAULT_INSTANCE);
+ }
+
+ public static boolean isConfigured(int instanceID)
+ {
+ return _instanceMap.containsKey(instanceID);
}
+ /** Method to cleanly shutdown the default registry running in this JVM */
public static void remove()
{
- IApplicationRegistry instance = _instance.getAndSet(null);
+ remove(DEFAULT_INSTANCE);
+ }
+
+ /**
+ * Method to cleanly shutdown specified registry running in this JVM
+ *
+ * @param instanceID the instance to shutdown
+ */
+ public static void remove(int instanceID)
+ {
try
{
+ IApplicationRegistry instance = _instanceMap.get(instanceID);
if (instance != null)
{
if (_logger.isInfoEnabled())
{
- _logger.info("Shutting down ApplicationRegistry(" + instance + ")");
+ _logger.info("Shutting down ApplicationRegistry(" + instanceID + "):" + instance);
}
instance.close();
instance.getBroker().getSystem().removeBroker(instance.getBroker());
@@ -203,7 +212,21 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
}
catch (Exception e)
{
- _logger.error("Error shutting down Application Registry(" + instance + "): " + e, e);
+ _logger.error("Error shutting down Application Registry(" + instanceID + "): " + e, e);
+ }
+ finally
+ {
+ _instanceMap.remove(instanceID);
+ }
+ }
+
+ /** Method to cleanly shutdown all registries currently running in this JVM */
+ public static void removeAll()
+ {
+ Object[] keys = _instanceMap.keySet().toArray();
+ for (Object k : keys)
+ {
+ remove((Integer) k);
}
}
@@ -228,10 +251,11 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
_configuration.initialise();
}
- public void initialise() throws Exception
+ public void initialise(int instanceID) throws Exception
{
//Create the RootLogger to be used during broker operation
_rootMessageLogger = new Log4jMessageLogger(_configuration);
+ _registryName = String.valueOf(instanceID);
//Create the composite (log4j+SystemOut MessageLogger to be used during startup
RootMessageLogger[] messageLoggers = {new SystemOutMessageLogger(), _rootMessageLogger};
@@ -253,7 +277,11 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
_securityManager = new SecurityManager(_configuration, _pluginManager);
- _authenticationManager = createAuthenticationManager();
+ createDatabaseManager(_configuration);
+
+ _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null);
+
+ _databaseManager.initialiseManagement(_configuration);
_managedObjectRegistry.start();
}
@@ -266,8 +294,6 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
try
{
initialiseVirtualHosts();
- initialiseStatistics();
- initialiseStatisticsReporting();
}
finally
{
@@ -276,51 +302,9 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
}
}
- /**
- * Iterates across all discovered authentication manager factories, offering the security configuration to each.
- * Expects <b>exactly</b> one authentication manager to configure and initialise itself.
- *
- * It is an error to configure more than one authentication manager, or to configure none.
- *
- * @return authentication manager
- * @throws ConfigurationException
- */
- protected AuthenticationManager createAuthenticationManager() throws ConfigurationException
+ protected void createDatabaseManager(ServerConfiguration configuration) throws Exception
{
- final SecurityConfiguration securityConfiguration = _configuration.getConfiguration(SecurityConfiguration.class.getName());
- final Collection<AuthenticationManagerPluginFactory<? extends Plugin>> factories = _pluginManager.getAuthenticationManagerPlugins().values();
-
- if (factories.size() == 0)
- {
- throw new ConfigurationException("No authentication manager factory plugins found. Check the desired authentication" +
- "manager plugin has been placed in the plugins directory.");
- }
-
- AuthenticationManager authMgr = null;
-
- for (final Iterator<AuthenticationManagerPluginFactory<? extends Plugin>> iterator = factories.iterator(); iterator.hasNext();)
- {
- final AuthenticationManagerPluginFactory<? extends Plugin> factory = (AuthenticationManagerPluginFactory<? extends Plugin>) iterator.next();
- final AuthenticationManager tmp = factory.newInstance(securityConfiguration);
- if (tmp != null)
- {
- if (authMgr != null)
- {
- throw new ConfigurationException("Cannot configure more than one authentication manager."
- + " Both " + tmp.getClass() + " and " + authMgr.getClass() + " are configured."
- + " Remove configuration for one of the authentication manager, or remove the plugin JAR"
- + " from the classpath.");
- }
- authMgr = tmp;
- }
- }
-
- if (authMgr == null)
- {
- throw new ConfigurationException("No authentication managers configured within the configure file.");
- }
-
- return authMgr;
+ _databaseManager = new ConfigurationFilePrincipalDatabaseManager(_configuration);
}
protected void initialiseVirtualHosts() throws Exception
@@ -336,88 +320,26 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
{
_managedObjectRegistry = new NoopManagedObjectRegistry();
}
-
- public void initialiseStatisticsReporting()
- {
- long report = _configuration.getStatisticsReportingPeriod() * 1000; // convert to ms
- final boolean broker = _configuration.isStatisticsGenerationBrokerEnabled();
- final boolean virtualhost = _configuration.isStatisticsGenerationVirtualhostsEnabled();
- final boolean reset = _configuration.isStatisticsReportResetEnabled();
-
- /* add a timer task to report statistics if generation is enabled for broker or virtualhosts */
- if (report > 0L && (broker || virtualhost))
- {
- _reportingTimer = new Timer("Statistics-Reporting", true);
-
- class StatisticsReportingTask extends TimerTask
- {
- private final int DELIVERED = 0;
- private final int RECEIVED = 1;
-
- public void run()
- {
- CurrentActor.set(new AbstractActor(ApplicationRegistry.getInstance().getRootMessageLogger()) {
- public String getLogMessage()
- {
- return "[" + Thread.currentThread().getName() + "] ";
- }
- });
-
- if (broker)
- {
- CurrentActor.get().message(BrokerMessages.STATS_DATA(DELIVERED, _dataDelivered.getPeak() / 1024.0, _dataDelivered.getTotal()));
- CurrentActor.get().message(BrokerMessages.STATS_MSGS(DELIVERED, _messagesDelivered.getPeak(), _messagesDelivered.getTotal()));
- CurrentActor.get().message(BrokerMessages.STATS_DATA(RECEIVED, _dataReceived.getPeak() / 1024.0, _dataReceived.getTotal()));
- CurrentActor.get().message(BrokerMessages.STATS_MSGS(RECEIVED, _messagesReceived.getPeak(), _messagesReceived.getTotal()));
- }
-
- if (virtualhost)
- {
- for (VirtualHost vhost : getVirtualHostRegistry().getVirtualHosts())
- {
- String name = vhost.getName();
- StatisticsCounter dataDelivered = vhost.getDataDeliveryStatistics();
- StatisticsCounter messagesDelivered = vhost.getMessageDeliveryStatistics();
- StatisticsCounter dataReceived = vhost.getDataReceiptStatistics();
- StatisticsCounter messagesReceived = vhost.getMessageReceiptStatistics();
-
- CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, DELIVERED, dataDelivered.getPeak() / 1024.0, dataDelivered.getTotal()));
- CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, DELIVERED, messagesDelivered.getPeak(), messagesDelivered.getTotal()));
- CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, RECEIVED, dataReceived.getPeak() / 1024.0, dataReceived.getTotal()));
- CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, RECEIVED, messagesReceived.getPeak(), messagesReceived.getTotal()));
- }
- }
-
- if (reset)
- {
- resetStatistics();
- }
-
- CurrentActor.remove();
- }
- }
- _reportingTimer.scheduleAtFixedRate(new StatisticsReportingTask(),
- report / 2,
- report);
- }
+ public static IApplicationRegistry getInstance()
+ {
+ return getInstance(DEFAULT_INSTANCE);
}
- /**
- * Get the ApplicationRegistry
- * @return the IApplicationRegistry instance
- * @throws IllegalStateException if no registry instance has been initialised.
- */
- public static IApplicationRegistry getInstance() throws IllegalStateException
+ public static IApplicationRegistry getInstance(int instanceID)
{
- IApplicationRegistry iApplicationRegistry = _instance.get();
- if (iApplicationRegistry == null)
- {
- throw new IllegalStateException("No ApplicationRegistry has been initialised");
- }
- else
+ synchronized (IApplicationRegistry.class)
{
- return iApplicationRegistry;
+ IApplicationRegistry instance = _instanceMap.get(instanceID);
+
+ if (instance == null)
+ {
+ throw new IllegalStateException("Application Registry (" + instanceID + ") not created");
+ }
+ else
+ {
+ return instance;
+ }
}
}
@@ -447,12 +369,6 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
{
_logger.info("Shutting down ApplicationRegistry:" + this);
}
-
- //Stop Statistics Reporting
- if (_reportingTimer != null)
- {
- _reportingTimer.cancel();
- }
//Stop incoming connections
unbind();
@@ -460,6 +376,10 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
//Shutdown virtualhosts
close(_virtualHostRegistry);
+// close(_accessManager);
+//
+// close(_databaseManager);
+
close(_authenticationManager);
close(_managedObjectRegistry);
@@ -481,7 +401,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
try
{
- acceptor.getNetworkTransport().close();
+ acceptor.getNetworkDriver().close();
}
catch (Throwable e)
{
@@ -521,6 +441,11 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
return _managedObjectRegistry;
}
+ public PrincipalDatabaseManager getDatabaseManager()
+ {
+ return _databaseManager;
+ }
+
public AuthenticationManager getAuthenticationManager()
{
return _authenticationManager;
@@ -573,76 +498,4 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
getBroker().addVirtualHost(virtualHost);
return virtualHost;
}
-
- public void registerMessageDelivered(long messageSize)
- {
- if (isStatisticsEnabled())
- {
- _messagesDelivered.registerEvent(1L);
- _dataDelivered.registerEvent(messageSize);
- }
- }
-
- public void registerMessageReceived(long messageSize, long timestamp)
- {
- if (isStatisticsEnabled())
- {
- _messagesReceived.registerEvent(1L, timestamp);
- _dataReceived.registerEvent(messageSize, timestamp);
- }
- }
-
- public StatisticsCounter getMessageReceiptStatistics()
- {
- return _messagesReceived;
- }
-
- public StatisticsCounter getDataReceiptStatistics()
- {
- return _dataReceived;
- }
-
- public StatisticsCounter getMessageDeliveryStatistics()
- {
- return _messagesDelivered;
- }
-
- public StatisticsCounter getDataDeliveryStatistics()
- {
- return _dataDelivered;
- }
-
- public void resetStatistics()
- {
- _messagesDelivered.reset();
- _dataDelivered.reset();
- _messagesReceived.reset();
- _dataReceived.reset();
-
- for (VirtualHost vhost : _virtualHostRegistry.getVirtualHosts())
- {
- vhost.resetStatistics();
- }
- }
-
- public void initialiseStatistics()
- {
- setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
- getConfiguration().isStatisticsGenerationBrokerEnabled());
-
- _messagesDelivered = new StatisticsCounter("messages-delivered");
- _dataDelivered = new StatisticsCounter("bytes-delivered");
- _messagesReceived = new StatisticsCounter("messages-received");
- _dataReceived = new StatisticsCounter("bytes-received");
- }
-
- public boolean isStatisticsEnabled()
- {
- return _statisticsEnabled;
- }
-
- public void setStatisticsEnabled(boolean enabled)
- {
- _statisticsEnabled = enabled;
- }
}