From f228f1af61ec9c334248bd0e22d14a40fdf44e08 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Mon, 13 Apr 2009 12:42:38 +0000 Subject: QPID-1621: add ServerConfiguration, QueueConfiguration and SecurityConfiguration classes. Move almost all uses of o.a.commons.configuration.Configuration behind there. @Configured delenda est git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5-fix@764428 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/server/AMQBrokerManagerMBean.java | 6 +- .../java/org/apache/qpid/server/AMQChannel.java | 20 +- .../src/main/java/org/apache/qpid/server/Main.java | 158 +++------- .../configuration/VirtualHostConfiguration.java | 327 ++++++--------------- .../server/exchange/DefaultExchangeFactory.java | 7 +- .../qpid/server/exchange/ExchangeFactory.java | 3 +- .../handler/ConnectionSecureOkMethodHandler.java | 12 +- .../handler/ConnectionStartOkMethodHandler.java | 21 +- .../qpid/server/handler/QueueDeclareHandler.java | 27 +- .../management/JMXManagedObjectRegistry.java | 30 +- .../server/protocol/AMQMinaProtocolSession.java | 2 +- .../server/protocol/AMQPFastProtocolHandler.java | 60 ++-- .../org/apache/qpid/server/queue/AMQQueue.java | 5 +- .../apache/qpid/server/queue/AMQQueueFactory.java | 58 ++-- .../apache/qpid/server/queue/IncomingMessage.java | 4 +- .../apache/qpid/server/queue/SimpleAMQQueue.java | 68 ++--- .../qpid/server/registry/ApplicationRegistry.java | 54 +--- .../ConfigurationFileApplicationRegistry.java | 75 +---- .../qpid/server/registry/IApplicationRegistry.java | 15 +- .../qpid/server/security/access/ACLManager.java | 14 +- .../ConfigurationFilePrincipalDatabaseManager.java | 33 +-- .../auth/database/PrincipalDatabaseManager.java | 3 +- .../PropertiesPrincipalDatabaseManager.java | 6 +- .../PrincipalDatabaseAuthenticationManager.java | 15 +- .../qpid/server/store/DerbyMessageStore.java | 7 +- .../qpid/server/store/MemoryMessageStore.java | 7 +- .../org/apache/qpid/server/store/MessageStore.java | 5 +- .../qpid/server/util/NullApplicationRegistry.java | 18 +- .../qpid/server/virtualhost/VirtualHost.java | 172 +++++++---- .../java/org/apache/qpid/server/ack/TxAckTest.java | 12 +- .../VirtualHostConfigurationTest.java | 56 ++-- .../qpid/server/queue/AMQQueueAlertTest.java | 20 -- .../org/apache/qpid/server/queue/MockAMQQueue.java | 13 +- .../qpid/server/queue/SimpleAMQQueueTest.java | 5 +- .../registry/ApplicationRegistryShutdownTest.java | 2 +- .../server/security/access/ACLManagerTest.java | 13 +- .../security/access/PrincipalPermissionsTest.java | 6 +- .../access/plugins/network/FirewallPluginTest.java | 5 +- .../apache/qpid/server/store/MessageStoreTest.java | 4 +- .../qpid/server/store/SkeletonMessageStore.java | 3 +- .../qpid/server/util/InternalBrokerBaseCase.java | 7 +- .../qpid/server/util/TestApplicationRegistry.java | 25 +- 42 files changed, 586 insertions(+), 817 deletions(-) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index fc6057afd2..e67eaedcb5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -42,12 +42,8 @@ import javax.management.MBeanException; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; -import org.apache.commons.configuration.Configuration; - import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.configuration.Configurator; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; @@ -56,9 +52,9 @@ import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.management.ManagedBroker; import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 26ac562fb2..aa390d6c26 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -20,6 +20,14 @@ */ package org.apache.qpid.server; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; @@ -29,7 +37,6 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; -import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.NoRouteException; import org.apache.qpid.server.flow.FlowCreditManager; @@ -51,14 +58,6 @@ import org.apache.qpid.server.txn.LocalTransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.concurrent.atomic.AtomicBoolean; - public class AMQChannel { public static final int DEFAULT_PREFETCH = 5000; @@ -117,9 +116,6 @@ public class AMQChannel public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) throws AMQException { - //Set values from configuration - Configurator.configure(this); - _session = session; _channelId = channelId; _storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java index f3b54034e7..780a17940e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -20,6 +20,12 @@ */ package org.apache.qpid.server; +import java.io.File; +import java.io.IOException; +import java.net.BindException; +import java.net.InetAddress; +import java.net.InetSocketAddress; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; @@ -27,36 +33,23 @@ import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.cli.PosixParser; -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.BasicConfigurator; import org.apache.log4j.Logger; import org.apache.log4j.xml.DOMConfigurator; import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IoAcceptor; import org.apache.mina.common.FixedSizeByteBufferAllocator; +import org.apache.mina.common.IoAcceptor; import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; -import org.apache.qpid.AMQException; +import org.apache.mina.util.NewThreadExecutor; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.pool.ReadWriteThreadModel; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.server.management.JMXManagedObjectRegistry; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.protocol.AMQPFastProtocolHandler; import org.apache.qpid.server.protocol.AMQPProtocolProvider; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; -import org.apache.qpid.server.transport.ConnectorConfiguration; -import org.apache.qpid.url.URLSyntaxException; - -import java.io.File; -import java.io.IOException; -import java.net.BindException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.Collection; -import java.util.List; /** * Main entry point for AMQPD. @@ -200,13 +193,6 @@ public class Main _brokerLogger.error("Initialisation Error : " + e.getMessage()); shutdown(1); } - catch (ConfigurationException e) - { - System.out.println("Error configuring message broker: " + e); - _brokerLogger.error("Error configuring message broker: " + e); - e.printStackTrace(); - shutdown(1); - } catch (Throwable e) { System.out.println("Error initialising message broker: " + e); @@ -223,7 +209,7 @@ public class Main System.exit(status); } - protected void startup() throws InitException, ConfigurationException, Exception + protected void startup() throws Exception { final String QpidHome = System.getProperty(QPID_HOME); final File defaultConfigFile = new File(QpidHome, DEFAULT_CONFIG_FILE); @@ -259,40 +245,32 @@ public class Main } ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile); - - - updateManagementPort(config.getConfiguration(), commandLine.getOptionValue("m")); - - + ServerConfiguration serverConfig = config.getConfiguration(); + updateManagementPort(serverConfig, commandLine.getOptionValue("m")); ApplicationRegistry.initialise(config); - //fixme .. use QpidProperties.getVersionString when we have fixed the classpath issues // that are causing the broker build to pick up the wrong properties file and hence say // Starting Qpid Client _brokerLogger.info("Starting Qpid Broker " + QpidProperties.getReleaseVersion() + " build: " + QpidProperties.getBuildVersion()); - ConnectorConfiguration connectorConfig = - ApplicationRegistry.getInstance().getConfiguredObject(ConnectorConfiguration.class); - - ByteBuffer.setUseDirectBuffers(connectorConfig.enableDirectBuffers); + ByteBuffer.setUseDirectBuffers(serverConfig.getEnableDirectBuffers()); // the MINA default is currently to use the pooled allocator although this may change in future // once more testing of the performance of the simple allocator has been done - if (!connectorConfig.enablePooledAllocator) + if (!serverConfig.getEnablePooledAllocator()) { ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator()); } - - if(connectorConfig.useBiasedWrites) + if(serverConfig.getUseBiasedWrites()) { System.setProperty("org.apache.qpid.use_write_biased_pool","true"); } - int port = connectorConfig.port; + int port = serverConfig.getPort(); String portStr = commandLine.getOptionValue("p"); if (portStr != null) @@ -306,29 +284,8 @@ public class Main throw new InitException("Invalid port: " + portStr, e); } } - - String VIRTUAL_HOSTS = "virtualhosts"; - - Object virtualHosts = ApplicationRegistry.getInstance().getConfiguration().getProperty(VIRTUAL_HOSTS); - - if (virtualHosts != null) - { - if (virtualHosts instanceof Collection) - { - int totalVHosts = ((Collection) virtualHosts).size(); - for (int vhost = 0; vhost < totalVHosts; vhost++) - { - setupVirtualHosts(configFile.getParent(), (String) ((List) virtualHosts).get(vhost)); - } - } - else - { - setupVirtualHosts(configFile.getParent(), (String) virtualHosts); - } - } - - bind(port, connectorConfig); - + + bind(port, serverConfig); } /** @@ -336,86 +293,59 @@ public class Main * @param configuration * @param managementPort The string from the command line */ - private void updateManagementPort(Configuration configuration, String managementPort) + private void updateManagementPort(ServerConfiguration configuration, String managementPort) { if (managementPort != null) { - int mport; - int defaultMPort = configuration.getInt(JMXManagedObjectRegistry.MANAGEMENT_PORT_CONFIG_PATH); try { - mport = Integer.parseInt(managementPort); - configuration.setProperty(JMXManagedObjectRegistry.MANAGEMENT_PORT_CONFIG_PATH, mport); + configuration.setJMXManagementPort(Integer.parseInt(managementPort)); } catch (NumberFormatException e) { - _logger.warn("Invalid management port: " + managementPort + " will use default:" + defaultMPort, e); + _logger.warn("Invalid management port: " + managementPort + " will use:" + configuration.getJMXManagementPort(), e); } } } - protected void setupVirtualHosts(String configFileParent, String configFilePath) - throws ConfigurationException, AMQException, URLSyntaxException - { - String configVar = "${conf}"; - - if (configFilePath.startsWith(configVar)) - { - configFilePath = configFileParent + configFilePath.substring(configVar.length()); - } - - if (configFilePath.indexOf(".xml") != -1) - { - VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(configFilePath); - vHostConfig.performBindings(); - } - else - { - // the virtualhosts value is a path. Search it for XML files. - - File virtualHostDir = new File(configFilePath); - - String[] fileNames = virtualHostDir.list(); - - for (int each = 0; each < fileNames.length; each++) - { - if (fileNames[each].endsWith(".xml")) - { - VirtualHostConfiguration vHostConfig = - new VirtualHostConfiguration(configFilePath + "/" + fileNames[each]); - vHostConfig.performBindings(); - } - } - } - } - - protected void bind(int port, ConnectorConfiguration connectorConfig) throws BindException + protected void bind(int port, ServerConfiguration config) throws BindException { String bindAddr = commandLine.getOptionValue("b"); if (bindAddr == null) { - bindAddr = connectorConfig.bindAddress; + bindAddr = config.getBind(); } try { - // IoAcceptor acceptor = new SocketAcceptor(connectorConfig.processors); - IoAcceptor acceptor = connectorConfig.createAcceptor(); + IoAcceptor acceptor; + + if (ApplicationRegistry.getInstance().getConfiguration().getQpidNIO()) + { + _logger.warn("Using Qpid Multithreaded IO Processing"); + acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(config.getProcessors(), new NewThreadExecutor()); + } + else + { + _logger.warn("Using Mina IO Processing"); + acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(config.getProcessors(), new NewThreadExecutor()); + } + SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig(); SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig(); - sc.setReceiveBufferSize(connectorConfig.socketReceiveBufferSize); - sc.setSendBufferSize(connectorConfig.socketWriteBuferSize); - sc.setTcpNoDelay(connectorConfig.tcpNoDelay); + sc.setReceiveBufferSize(config.getReceiveBufferSize()); + sc.setSendBufferSize(config.getWriteBufferSize()); + sc.setTcpNoDelay(config.getTcpNoDelay()); // if we do not use the executor pool threading model we get the default leader follower // implementation provided by MINA - if (connectorConfig.enableExecutorPool) + if (config.getEnableExecutorPool()) { sconfig.setThreadModel(ReadWriteThreadModel.getInstance()); } - if (!connectorConfig.enableSSL || !connectorConfig.sslOnly) + if (!config.getEnableSSL() || !config.getSSLOnly()) { AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler(); InetSocketAddress bindAddress; @@ -434,16 +364,16 @@ public class Main _brokerLogger.info("Qpid.AMQP listening on non-SSL address " + bindAddress); } - if (connectorConfig.enableSSL) + if (config.getEnableSSL()) { AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler(); try { - bind(acceptor, new InetSocketAddress(connectorConfig.sslPort), handler, sconfig); + bind(acceptor, new InetSocketAddress(config.getSSLPort()), handler, sconfig); //fixme qpid.AMQP should be using qpidproperties to get value - _brokerLogger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort); + _brokerLogger.info("Qpid.AMQP listening on SSL port " + config.getSSLPort()); } catch (IOException e) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index 705e84752b..21041d55cd 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -20,267 +20,104 @@ */ package org.apache.qpid.server.configuration; -import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.commons.configuration.CompositeConfiguration; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.XMLConfiguration; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.store.MemoryMessageStore; public class VirtualHostConfiguration { - private static final Logger _logger = Logger.getLogger(VirtualHostConfiguration.class); - - private static XMLConfiguration _config; - - private static final String VIRTUALHOST_PROPERTY_BASE = "virtualhost."; - - - public VirtualHostConfiguration(String configFile) throws ConfigurationException - { - _logger.info("Loading Config file:" + configFile); - - _config = new XMLConfiguration(configFile); - - } - - - - private void configureVirtualHost(String virtualHostName, Configuration configuration) throws ConfigurationException, AMQException - { - _logger.debug("Loding configuration for virtaulhost: "+virtualHostName); - - - VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(virtualHostName); - - - - if(virtualHost == null) - { - throw new ConfigurationException("Unknown virtual host: " + virtualHostName); - } - - List exchangeNames = configuration.getList("exchanges.exchange.name"); - - for(Object exchangeNameObj : exchangeNames) - { - String exchangeName = String.valueOf(exchangeNameObj); - configureExchange(virtualHost, exchangeName, configuration); - } - - - List queueNames = configuration.getList("queues.queue.name"); - - for(Object queueNameObj : queueNames) - { - String queueName = String.valueOf(queueNameObj); - configureQueue(virtualHost, queueName, configuration); - } - - } - - private void configureExchange(VirtualHost virtualHost, String exchangeNameString, Configuration configuration) throws AMQException - { - - CompositeConfiguration exchangeConfiguration = new CompositeConfiguration(); - - exchangeConfiguration.addConfiguration(configuration.subset("exchanges.exchange."+ exchangeNameString)); - exchangeConfiguration.addConfiguration(configuration.subset("exchanges")); - - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - MessageStore messageStore = virtualHost.getMessageStore(); - ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); - ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory(); - - AMQShortString exchangeName = new AMQShortString(exchangeNameString); - - - Exchange exchange; - - - - synchronized (exchangeRegistry) - { - exchange = exchangeRegistry.getExchange(exchangeName); - if(exchange == null) - { - - AMQShortString type = new AMQShortString(exchangeConfiguration.getString("type","direct")); - boolean durable = exchangeConfiguration.getBoolean("durable",false); - boolean autodelete = exchangeConfiguration.getBoolean("autodelete",false); - - Exchange newExchange = exchangeFactory.createExchange(exchangeName,type,durable,autodelete,0); - exchangeRegistry.registerExchange(newExchange); - } - + private Configuration _config; + private String _name; + private Map _queues = new HashMap(); + private Map _exchanges = new HashMap(); + + public VirtualHostConfiguration(String name, Configuration config) throws ConfigurationException + { + _config = config; + _name = name; + Iterator i = _config.getList("queues.queue.name").iterator(); + + while (i.hasNext()) + { + String queueName = (String) i.next(); + CompositeConfiguration mungedConf = new CompositeConfiguration(); + mungedConf.addConfiguration(_config.subset("queues.queue." + queueName)); + mungedConf.addConfiguration(_config.subset("queues")); + _queues.put(queueName, new QueueConfiguration(queueName, mungedConf)); } - } - - public static CompositeConfiguration getDefaultQueueConfiguration(VirtualHost host) - { - CompositeConfiguration queueConfiguration = null; - if (_config == null) - return null; - - Configuration vHostConfiguration = _config.subset(VIRTUALHOST_PROPERTY_BASE + host.getName()); - if (vHostConfiguration == null) - return null; - - Configuration defaultQueueConfiguration = vHostConfiguration.subset("queues"); - if (defaultQueueConfiguration != null) + i = _config.getList("exchanges.exchange.name").iterator(); + int count = 0; + while (i.hasNext()) { - queueConfiguration = new CompositeConfiguration(); - queueConfiguration.addConfiguration(defaultQueueConfiguration); + CompositeConfiguration mungedConf = new CompositeConfiguration(); + mungedConf.addConfiguration(config.subset("exchanges.exchange(" + count++ + ")")); + mungedConf.addConfiguration(_config.subset("exchanges")); + String exchName = (String) i.next(); + _exchanges.put(exchName, new ExchangeConfiguration(exchName, mungedConf)); } - return queueConfiguration; - } - - private void configureQueue(VirtualHost virtualHost, String queueNameString, Configuration configuration) throws AMQException, ConfigurationException - { - CompositeConfiguration queueConfiguration = new CompositeConfiguration(); - - queueConfiguration.addConfiguration(configuration.subset("queues.queue."+ queueNameString)); - queueConfiguration.addConfiguration(configuration.subset("queues")); - - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - MessageStore messageStore = virtualHost.getMessageStore(); - ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); - - - AMQShortString queueName = new AMQShortString(queueNameString); - - AMQQueue queue; - - synchronized (queueRegistry) - { - queue = queueRegistry.getQueue(queueName); - - if (queue == null) - { - _logger.info("Creating queue '" + queueName + "' on virtual host " + virtualHost.getName()); - - boolean durable = queueConfiguration.getBoolean("durable" ,false); - boolean autodelete = queueConfiguration.getBoolean("autodelete", false); - String owner = queueConfiguration.getString("owner", null); - FieldTable arguments = null; - boolean priority = queueConfiguration.getBoolean("priority", false); - int priorities = queueConfiguration.getInt("priorities", -1); - if(priority || priorities > 0) - { - if(arguments == null) - { - arguments = new FieldTable(); - } - if (priorities < 0) - { - priorities = 10; - } - arguments.put(new AMQShortString("x-qpid-priorities"), priorities); - } - - - queue = AMQQueueFactory.createAMQQueueImpl(queueName, - durable, - owner == null ? null : new AMQShortString(owner) /* These queues will have no owner */, - autodelete /* Therefore autodelete makes no sence */, - virtualHost, - arguments, - queueConfiguration); - - if (queue.isDurable()) - { - messageStore.createQueue(queue); - } - - queueRegistry.registerQueue(queue); - } - else - { - _logger.info("Queue '" + queueNameString + "' already exists on virtual host "+virtualHost.getName()+", not creating."); - } - - String exchangeName = queueConfiguration.getString("exchange", null); - - Exchange exchange = exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName)); - - if(exchange == null) - { - exchange = virtualHost.getExchangeRegistry().getDefaultExchange(); - } - - if (exchange == null) - { - throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName); - } - - synchronized (exchange) - { - List routingKeys = queueConfiguration.getList("routingKey"); - if(routingKeys == null || routingKeys.isEmpty()) - { - routingKeys = Collections.singletonList(queue.getName()); - } - - for(Object routingKeyNameObj : routingKeys) - { - AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj)); - - - queue.bind(exchange, routingKey, null); - - - _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'"); - } - - if(exchange != virtualHost.getExchangeRegistry().getDefaultExchange()) - { - queue.bind(virtualHost.getExchangeRegistry().getDefaultExchange(), queue.getName(), null); - } - } - - } } - - public void performBindings() throws AMQException, ConfigurationException - { - List virtualHostNames = _config.getList(VIRTUALHOST_PROPERTY_BASE + "name"); - String defaultVirtualHostName = _config.getString("default"); - if(defaultVirtualHostName != null) - { - ApplicationRegistry.getInstance().getVirtualHostRegistry().setDefaultVirtualHostName(defaultVirtualHostName); - } - _logger.info("Configuring " + virtualHostNames == null ? 0 : virtualHostNames.size() + " virtual hosts: " + virtualHostNames); - - for(Object nameObject : virtualHostNames) - { - String name = String.valueOf(nameObject); - configureVirtualHost(name, _config.subset(VIRTUALHOST_PROPERTY_BASE + name)); - } - - if (virtualHostNames == null || virtualHostNames.isEmpty()) - { - throw new ConfigurationException( - "Virtualhost Configuration document does not contain a valid virtualhost."); - } + public String getName() + { + return _name; } - - + public long getHousekeepingExpiredMessageCheckPeriod() + { + return _config.getLong("housekeeping.expiredMessageCheckPeriod", ApplicationRegistry.getInstance().getConfiguration().getHousekeepingExpiredMessageCheckPeriod()); + } + + public String getAuthenticationDatabase() + { + return _config.getString("security.authentication.name"); + } + + public List getCustomExchanges() + { + return _config.getList("custom-exchanges.class-name"); + } + + public SecurityConfiguration getSecurityConfiguration() + { + return new SecurityConfiguration(_config.subset("security")); + } + + public Configuration getStoreConfiguration() + { + return _config.subset("store"); + } + + public String getMessageStoreClass() + { + return _config.getString("store.class", MemoryMessageStore.class.getName()); + } + + public List getExchanges() + { + return _config.getList("exchanges.exchange.name"); + } + + public ExchangeConfiguration getExchangeConfiguration(String exchangeName) + { + return _exchanges.get(exchangeName); + } + + public String[] getQueueNames() + { + return _queues.keySet().toArray(new String[_queues.size()]); + } + + public QueueConfiguration getQueueConfiguration(String queueName) + { + return _queues.get(queueName); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index 9d4c090971..c04b6c559c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -25,11 +25,11 @@ import java.util.HashMap; import java.util.Map; import org.apache.log4j.Logger; -import org.apache.commons.configuration.Configuration; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUnknownExchangeType; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -73,7 +73,8 @@ public class DefaultExchangeFactory implements ExchangeFactory return e; } - public void initialise(Configuration hostConfig) + @Override + public void initialise(VirtualHostConfiguration hostConfig) { if (hostConfig == null) @@ -81,7 +82,7 @@ public class DefaultExchangeFactory implements ExchangeFactory return; } - for(Object className : hostConfig.getList("custom-exchanges.class-name")) + for(Object className : hostConfig.getCustomExchanges()) { try { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java index 0bcfec7181..2f76d41228 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java @@ -26,6 +26,7 @@ import org.apache.commons.configuration.Configuration; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; public interface ExchangeFactory @@ -34,7 +35,7 @@ public interface ExchangeFactory int ticket) throws AMQException; - void initialise(Configuration hostConfig); + void initialise(VirtualHostConfiguration hostConfig); Collection> getRegisteredTypes(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java index 621003be90..a2a6faf21b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java @@ -25,14 +25,16 @@ import javax.security.sasl.SaslServer; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ConnectionSecureBody; +import org.apache.qpid.framing.ConnectionSecureOkBody; +import org.apache.qpid.framing.ConnectionTuneBody; +import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.HeartbeatConfig; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.AuthenticationResult; +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.state.AMQStateManager; @@ -92,7 +94,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(0xFFFF, ConnectionStartOkMethodHandler.getConfiguredFrameSize(), - HeartbeatConfig.getInstance().getDelay()); + ApplicationRegistry.getInstance().getConfiguration().getHeartBeatDelay()); session.writeFrame(tuneBody.generateFrame(0)); session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID())); disposeSaslServer(session); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java index f53e56601b..6698ae888a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java @@ -23,18 +23,19 @@ package org.apache.qpid.server.handler; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; -import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.*; -import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ConnectionSecureBody; +import org.apache.qpid.framing.ConnectionStartOkBody; +import org.apache.qpid.framing.ConnectionTuneBody; +import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.HeartbeatConfig; -import org.apache.qpid.server.protocol.AMQMinaProtocolSession; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.AuthenticationResult; +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.state.AMQStateManager; @@ -47,8 +48,6 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< private static ConnectionStartOkMethodHandler _instance = new ConnectionStartOkMethodHandler(); - private static final int DEFAULT_FRAME_SIZE = 65536; - public static ConnectionStartOkMethodHandler getInstance() { return _instance; @@ -117,7 +116,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(0xFFFF, getConfiguredFrameSize(), - HeartbeatConfig.getInstance().getDelay()); + ApplicationRegistry.getInstance().getConfiguration().getHeartBeatDelay()); session.writeFrame(tuneBody.generateFrame(0)); break; case CONTINUE: @@ -153,8 +152,8 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< static int getConfiguredFrameSize() { - final Configuration config = ApplicationRegistry.getInstance().getConfiguration(); - final int framesize = config.getInt("advanced.framesize", DEFAULT_FRAME_SIZE); + final ServerConfiguration config = ApplicationRegistry.getInstance().getConfiguration(); + final int framesize = config.getFrameSize(); _logger.info("Framesize set to " + framesize); return framesize; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 71f38cb28a..b705ea7dba 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -20,30 +20,28 @@ */ package org.apache.qpid.server.handler; -import java.util.concurrent.atomic.AtomicInteger; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.configuration.Configured; - -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.framing.QueueDeclareOkBody; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.configuration.Configurator; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.security.access.Permission; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.AMQChannel; -import org.apache.commons.configuration.Configuration; public class QueueDeclareHandler implements StateAwareMethodListener { @@ -56,17 +54,10 @@ public class QueueDeclareHandler implements StateAwareMethodListener map = appRegistry.getDatabaseManager().getDatabases(); PrincipalDatabase db = map.get(jmxDatabaseName); @@ -154,7 +157,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry RMIServerSocketFactory ssf; //check ssl enabled option in config, default to true if option is not set - boolean sslEnabled = appRegistry.getConfiguration().getBoolean("management.ssl.enabled", true); + boolean sslEnabled = appRegistry.getConfiguration().getManagementSSLEnabled(); if (sslEnabled) { @@ -167,7 +170,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry keyStorePath = System.getProperty("javax.net.ssl.keyStore"); } else{ - keyStorePath = appRegistry.getConfiguration().getString("management.ssl.keyStorePath", null); + keyStorePath = appRegistry.getConfiguration().getManagementKeyStorePath(); } //check the keystore path value is valid @@ -202,7 +205,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry if (System.getProperty("javax.net.ssl.keyStorePassword") == null) { - if (appRegistry.getConfiguration().getString("management.ssl.keyStorePassword") == null) + if (appRegistry.getConfiguration().getManagementKeyStorePassword() == null) { throw new ConfigurationException("JMX management SSL keystore password not defined, " + "unable to start requested SSL protected JMX server"); @@ -210,7 +213,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry else { System.setProperty("javax.net.ssl.keyStorePassword", - appRegistry.getConfiguration().getString("management.ssl.keyStorePassword")); + appRegistry.getConfiguration().getManagementKeyStorePassword()); } } @@ -379,6 +382,17 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry // Stopping the RMI registry UnicastRemoteObject.unexportObject(_rmiRegistry, true); } + for (ObjectName name : _mbeanServer.queryNames(null, null)) + { + try + { + _mbeanServer.unregisterMBean(name); + } + catch (JMException e) + { + // Really shouldn't happen, but we'll ignore that... + } + } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 4b69842c49..205ca73f13 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -582,7 +582,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable if (delay > 0) { _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay); - _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.getInstance().getTimeout(delay)); + _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, (int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay)); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index d8dbf97e49..0dbefd8798 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.protocol; +import java.io.IOException; +import java.net.InetSocketAddress; + import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IdleStatus; @@ -34,15 +37,19 @@ import org.apache.mina.filter.executor.ExecutorFilter; import org.apache.mina.util.SessionUtil; import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQProtocolHeaderException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.transport.ConnectorConfiguration; import org.apache.qpid.ssl.SSLContextFactory; -import java.io.IOException; -import java.net.InetSocketAddress; - /** * The protocol handler handles "protocol events" for all connections. The state * associated with an individual connection is accessed through the protocol session. @@ -56,9 +63,6 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter private final IApplicationRegistry _applicationRegistry; - private static String DEFAULT_BUFFER_READ_LIMIT_SIZE = "262144"; - private static String DEFAULT_BUFFER_WRITE_LIMIT_SIZE = "262144"; - private final int BUFFER_READ_LIMIT_SIZE; private final int BUFFER_WRITE_LIMIT_SIZE; @@ -72,8 +76,8 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter _applicationRegistry = applicationRegistry; // Read the configuration from the application registry - BUFFER_READ_LIMIT_SIZE = Integer.parseInt(_applicationRegistry.getConfiguration().getString("broker.connector.protectio.readBufferLimitSize", DEFAULT_BUFFER_READ_LIMIT_SIZE)); - BUFFER_WRITE_LIMIT_SIZE = Integer.parseInt(_applicationRegistry.getConfiguration().getString("broker.connector.protectio.writeBufferLimitSize", DEFAULT_BUFFER_WRITE_LIMIT_SIZE)); + BUFFER_READ_LIMIT_SIZE = _applicationRegistry.getConfiguration().getBufferReadLimit(); + BUFFER_WRITE_LIMIT_SIZE = _applicationRegistry.getConfiguration().getBufferWriteLimit(); _logger.debug("AMQPFastProtocolHandler created"); } @@ -92,17 +96,22 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter _logger.info("Protocol session created for:" + protocolSession.getRemoteAddress()); final QpidProtocolCodecFilter pcf = new QpidProtocolCodecFilter(codecFactory); - - ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance(). - getConfiguredObject(ConnectorConfiguration.class); - if (connectorConfig.enableExecutorPool) + final ServerConfiguration config = _applicationRegistry.getConfiguration(); + + String keystorePath = config.getKeystorePath(); + String keystorePassword = config.getKeystorePassword(); + String certType = config.getCertType(); + SSLContextFactory sslContextFactory = null; + boolean isSsl = false; + if (config.getEnableSSL() && isSSLClient(config, protocolSession)) { - if (connectorConfig.enableSSL && isSSLClient(connectorConfig, protocolSession)) + sslContextFactory = new SSLContextFactory(keystorePath, keystorePassword, certType); + isSsl = true; + } + if (config.getEnableExecutorPool()) + { + if (isSsl) { - String keystorePath = connectorConfig.keystorePath; - String keystorePassword = connectorConfig.keystorePassword; - String certType = connectorConfig.certType; - SSLContextFactory sslContextFactory = new SSLContextFactory(keystorePath, keystorePassword, certType); protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter", new SSLFilter(sslContextFactory.buildServerContext())); } @@ -111,19 +120,14 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter else { protocolSession.getFilterChain().addLast("protocolFilter", pcf); - if (connectorConfig.enableSSL && isSSLClient(connectorConfig, protocolSession)) + if (isSsl) { - String keystorePath = connectorConfig.keystorePath; - String keystorePassword = connectorConfig.keystorePassword; - String certType = connectorConfig.certType; - SSLContextFactory sslContextFactory = new SSLContextFactory(keystorePath, keystorePassword, certType); protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter", new SSLFilter(sslContextFactory.buildServerContext())); } - } - if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("broker.connector.protectio.enabled", false)) + if (ApplicationRegistry.getInstance().getConfiguration().getProtectIOEnabled()) { try { @@ -271,10 +275,10 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter } } - protected boolean isSSLClient(ConnectorConfiguration connectionConfig, + protected boolean isSSLClient(ServerConfiguration connectionConfig, IoSession protocolSession) { InetSocketAddress addr = (InetSocketAddress) protocolSession.getLocalAddress(); - return addr.getPort() == connectionConfig.sslPort; + return addr.getPort() == connectionConfig.getSSLPort(); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index f6d406b653..5dc8703aef 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -144,6 +144,8 @@ public interface AMQQueue extends Managable, Comparable long getMinimumAlertRepeatGap(); + void setMinimumAlertRepeatGap(long value); + void deleteMessageFromTop(StoreContext storeContext) throws AMQException; @@ -165,7 +167,6 @@ public interface AMQQueue extends Managable, Comparable void stop(); - /** * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription * already exists. @@ -213,6 +214,4 @@ public interface AMQQueue extends Managable, Comparable { public void doTask(AMQQueue queue) throws AMQException; } - - void configure(Configuration virtualHostDefaultQueueConfiguration); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index be8c19d18f..eb0a011e93 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -20,38 +20,22 @@ */ package org.apache.qpid.server.queue; -import org.apache.commons.configuration.Configuration; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.AMQException; public class AMQQueueFactory { public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities"); - public static AMQQueue createAMQQueueImpl(AMQShortString name, - boolean durable, - AMQShortString owner, - boolean autoDelete, - VirtualHost virtualHost, final FieldTable arguments) - - throws AMQException - { - - return createAMQQueueImpl(name, durable, owner, autoDelete, - virtualHost, arguments, - VirtualHostConfiguration.getDefaultQueueConfiguration(virtualHost)); - } - public static AMQQueue createAMQQueueImpl(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, - VirtualHost virtualHost, final FieldTable arguments, - Configuration queueConfiguration) + VirtualHost virtualHost, final FieldTable arguments) throws AMQException { @@ -66,13 +50,41 @@ public class AMQQueueFactory { q = new SimpleAMQQueue(name, durable, owner, autoDelete, virtualHost); } - if (q != null && queueConfiguration != null) - { - q.configure(queueConfiguration); - } //Register the new queue virtualHost.getQueueRegistry().registerQueue(q); return q; } + + public static AMQQueue createAMQQueueImpl(QueueConfiguration config, VirtualHost host) throws AMQException + { + AMQShortString queueName = new AMQShortString(config.getName()); + + boolean durable = config.getDurable(); + boolean autodelete = config.getAutoDelete(); + AMQShortString owner = (config.getOwner() != null) ? new AMQShortString(config.getOwner()) : null; + FieldTable arguments = null; + boolean priority = config.getPriority(); + int priorities = config.getPriorities(); + if(priority || priorities > 0) + { + if(arguments == null) + { + arguments = new FieldTable(); + } + if (priorities < 0) + { + priorities = 10; + } + arguments.put(new AMQShortString("x-qpid-priorities"), priorities); + } + + AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, host, arguments); + q.setMaximumMessageAge(config.getMaximumMessageAge()); + q.setMaximumQueueDepth(config.getMaximumQueueDepth()); + q.setMaximumMessageSize(config.getMaximumMessageSize()); + q.setMaximumMessageCount(config.getMaximumMessageCount()); + q.setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap()); + return q; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index b994040131..091baf2751 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -44,7 +44,7 @@ public class IncomingMessage implements Filterable private static final Logger _logger = Logger.getLogger(IncomingMessage.class); private static final boolean SYNCHED_CLOCKS = - ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false); + ApplicationRegistry.getInstance().getConfiguration().getSynchedClocks(); private final MessagePublishInfo _messagePublishInfo; private ContentHeaderBody _contentHeaderBody; @@ -53,7 +53,7 @@ public class IncomingMessage implements Filterable private final TransactionalContext _txnContext; private static final boolean MSG_AUTH = - ApplicationRegistry.getInstance().getConfiguration().getBoolean("security.msg-auth", false); + ApplicationRegistry.getInstance().getConfiguration().getMsgAuth(); /** diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 547df7856d..172a3af481 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1,28 +1,9 @@ package org.apache.qpid.server.queue; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.configuration.Configurator; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionList; -import org.apache.qpid.server.output.ProtocolOutputConverter; -import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.AMQException; -import org.apache.qpid.pool.ReadWriteRunnable; -import org.apache.qpid.pool.ReferenceCountingExecutorService; -import org.apache.qpid.configuration.Configured; -import org.apache.commons.configuration.Configuration; -import org.apache.log4j.Logger; - -import javax.management.JMException; -import java.util.List; -import java.util.Set; import java.util.ArrayList; import java.util.EnumSet; +import java.util.List; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -30,6 +11,24 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import javax.management.JMException; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.pool.ReadWriteRunnable; +import org.apache.qpid.pool.ReferenceCountingExecutorService; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.output.ProtocolOutputConverter; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.subscription.SubscriptionList; +import org.apache.qpid.server.virtualhost.VirtualHost; + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -91,24 +90,19 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private final AtomicLong _totalMessagesReceived = new AtomicLong(); /** max allowed size(KB) of a single message */ - @Configured(path = "maximumMessageSize", defaultValue = "0") - public long _maximumMessageSize; + public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize(); /** max allowed number of messages on a queue. */ - @Configured(path = "maximumMessageCount", defaultValue = "0") - public long _maximumMessageCount; + public long _maximumMessageCount = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageCount(); /** max queue depth for the queue */ - @Configured(path = "maximumQueueDepth", defaultValue = "0") - public long _maximumQueueDepth; + public long _maximumQueueDepth = ApplicationRegistry.getInstance().getConfiguration().getMaximumQueueDepth(); /** maximum message age before alerts occur */ - @Configured(path = "maximumMessageAge", defaultValue = "0") - public long _maximumMessageAge; + public long _maximumMessageAge = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageAge(); - /** the minimum interval between sending out consequetive alerts of the same type */ - @Configured(path = "minimumAlertRepeatGap", defaultValue = "0") - public long _minimumAlertRepeatGap; + /** the minimum interval between sending out consecutive alerts of the same type */ + public long _minimumAlertRepeatGap = ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap(); private static final int MAX_ASYNC_DELIVERIES = 10; @@ -167,7 +161,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - private void resetNotifications() + public void resetNotifications() { // This ensure that the notification checks for the configured alerts are created. setMaximumMessageAge(_maximumMessageAge); @@ -1590,10 +1584,4 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } return ids; } - - public void configure(Configuration queueConfiguration) - { - Configurator.configure(this, queueConfiguration); - resetNotifications(); - } -} \ No newline at end of file +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 02124a3737..477beeadcb 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -20,24 +20,20 @@ */ package org.apache.qpid.server.registry; -import org.apache.commons.configuration.Configuration; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; + import org.apache.log4j.Logger; -import org.apache.qpid.server.configuration.Configurator; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.mina.common.IoAcceptor; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.management.ManagedObjectRegistry; -import org.apache.qpid.server.security.auth.manager.AuthenticationManager; -import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; -import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; -import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalDatabaseManager; -import org.apache.qpid.server.security.access.ACLPlugin; -import org.apache.qpid.server.security.access.ACLManager; import org.apache.qpid.server.plugins.PluginManager; -import org.apache.mina.common.IoAcceptor; - -import java.util.HashMap; -import java.util.Map; -import java.net.InetSocketAddress; +import org.apache.qpid.server.security.access.ACLManager; +import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; /** * An abstract application registry that provides access to configuration information and handles the @@ -53,7 +49,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry private final Map, Object> _configuredObjects = new HashMap, Object>(); - protected final Configuration _configuration; + protected final ServerConfiguration _configuration; public static final int DEFAULT_INSTANCE = 1; public static final String DEFAULT_APPLICATION_REGISTRY = "org.apache.qpid.server.util.NullApplicationRegistry"; @@ -154,7 +150,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } } - protected ApplicationRegistry(Configuration configuration) + protected ApplicationRegistry(ServerConfiguration configuration) { _configuration = configuration; } @@ -242,7 +238,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } } - public Configuration getConfiguration() + public ServerConfiguration getConfiguration() { return _configuration; } @@ -255,26 +251,6 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } } - public T getConfiguredObject(Class instanceType) - { - T instance = (T) _configuredObjects.get(instanceType); - if (instance == null) - { - try - { - instance = instanceType.newInstance(); - } - catch (Exception e) - { - _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor"); - throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e); - } - Configurator.configure(instance); - _configuredObjects.put(instanceType, instance); - } - return instance; - } - public static void setDefaultApplicationRegistry(String clazz) { _APPLICATION_REGISTRY = clazz; @@ -287,7 +263,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry public ACLManager getAccessManager() { - return new ACLManager(_configuration, _pluginManager); + return new ACLManager(_configuration.getSecurityConfiguration(), _pluginManager); } public ManagedObjectRegistry getManagedObjectRegistry() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java index c34c4bf80a..39164883f9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java @@ -21,71 +21,25 @@ package org.apache.qpid.server.registry; import java.io.File; -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import org.apache.commons.configuration.CompositeConfiguration; -import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.SystemConfiguration; -import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.management.JMXManagedObjectRegistry; -import org.apache.qpid.server.management.ManagedObjectRegistry; -import org.apache.qpid.server.management.ManagementConfiguration; import org.apache.qpid.server.management.NoopManagedObjectRegistry; import org.apache.qpid.server.plugins.PluginManager; -import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.qpid.server.security.access.ACLManager; import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalDatabaseManager; -import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; -import org.apache.qpid.server.security.access.ACLPlugin; -import org.apache.qpid.server.security.access.ACLManager; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.AMQException; public class ConfigurationFileApplicationRegistry extends ApplicationRegistry { public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException { - super(config(configurationURL)); - } - - // Our configuration class needs to make the interpolate method - // public so it can be called below from the config method. - private static class MyConfiguration extends CompositeConfiguration - { - public String interpolate(String obj) - { - return super.interpolate(obj); - } - } - - private static final Configuration config(File url) throws ConfigurationException - { - // We have to override the interpolate methods so that - // interpolation takes place accross the entirety of the - // composite configuration. Without doing this each - // configuration object only interpolates variables defined - // inside itself. - final MyConfiguration conf = new MyConfiguration(); - conf.addConfiguration(new SystemConfiguration() - { - protected String interpolate(String o) - { - return conf.interpolate(o); - } - }); - conf.addConfiguration(new XMLConfiguration(url) - { - protected String interpolate(String o) - { - return conf.interpolate(o); - } - }); - return conf; + super(new ServerConfiguration(configurationURL)); } public void initialise() throws Exception @@ -94,9 +48,9 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry _virtualHostRegistry = new VirtualHostRegistry(); - _pluginManager = new PluginManager(_configuration.getString("plugin-directory")); + _pluginManager = new PluginManager(_configuration.getPluginDirectory()); - _accessManager = new ACLManager(_configuration, _pluginManager); + _accessManager = new ACLManager(_configuration.getSecurityConfiguration(), _pluginManager); _databaseManager = new ConfigurationFilePrincipalDatabaseManager(_configuration); @@ -111,18 +65,17 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry } private void initialiseVirtualHosts() throws Exception - { - for (String name : getVirtualHostNames()) + { + for (String name : _configuration.getVirtualHosts()) { - - _virtualHostRegistry.registerVirtualHost(new VirtualHost(name, getConfiguration().subset("virtualhosts.virtualhost." + name))); + _virtualHostRegistry.registerVirtualHost(new VirtualHost(_configuration.getVirtualHostConfig(name))); } + getVirtualHostRegistry().setDefaultVirtualHostName(_configuration.getDefaultVirtualHost()); } private void initialiseManagedObjectRegistry() throws AMQException { - ManagementConfiguration config = getConfiguredObject(ManagementConfiguration.class); - if (config.enabled) + if (_configuration.getManagementEnabled()) { _managedObjectRegistry = new JMXManagedObjectRegistry(); } @@ -131,10 +84,4 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry _managedObjectRegistry = new NoopManagedObjectRegistry(); } } - - public Collection getVirtualHostNames() - { - return getConfiguration().getList("virtualhosts.virtualhost.name"); - } - } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java index e68dca285c..a1f30c6eed 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.net.InetSocketAddress; import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.management.ManagedObjectRegistry; import org.apache.qpid.server.plugins.PluginManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; @@ -48,22 +49,12 @@ public interface IApplicationRegistry */ void close() throws Exception; - /** - * This gets access to a "configured object". A configured object has fields populated from a the configuration - * object (Commons Configuration) automatically, where it has the appropriate attributes defined on fields. - * Application registry implementations can choose the refresh strategy or caching approach. - * @param instanceType the type of object you want initialised. This must be unique - i.e. you can only - * have a single object of this type in the system. - * @return the configured object - */ - T getConfiguredObject(Class instanceType); - /** * Get the low level configuration. For use cases where the configured object approach is not required * you can get the complete configuration information. * @return a Commons Configuration instance */ - Configuration getConfiguration(); + ServerConfiguration getConfiguration(); ManagedObjectRegistry getManagedObjectRegistry(); @@ -71,8 +62,6 @@ public interface IApplicationRegistry AuthenticationManager getAuthenticationManager(); - Collection getVirtualHostNames(); - VirtualHostRegistry getVirtualHostRegistry(); ACLManager getAccessManager(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java index 356ee815dd..57c6098874 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java @@ -30,6 +30,9 @@ import java.util.Map.Entry; import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.configuration.SecurityConfiguration; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.plugins.PluginManager; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -46,12 +49,12 @@ public class ACLManager private Map _globalPlugins = new HashMap(); private Map _hostPlugins = new HashMap(); - public ACLManager(Configuration configuration, PluginManager manager) + public ACLManager(SecurityConfiguration configuration, PluginManager manager) { this(configuration, manager, null); } - public ACLManager(Configuration configuration, PluginManager manager, ACLPluginFactory securityPlugin) + public ACLManager(SecurityConfiguration configuration, PluginManager manager, ACLPluginFactory securityPlugin) { _pluginManager = manager; @@ -70,14 +73,14 @@ public class ACLManager } - public void configureHostPlugins(Configuration hostConfig) + public void configureHostPlugins(SecurityConfiguration hostConfig) { _hostPlugins = configurePlugins(hostConfig); } - public Map configurePlugins(Configuration configuration) + public Map configurePlugins(SecurityConfiguration hostConfig) { - Configuration securityConfig = configuration.subset("security"); + Configuration securityConfig = hostConfig.getConfiguration(); Map plugins = new HashMap(); Iterator keys = securityConfig.getKeys(); Collection handledTags = new HashSet(); @@ -86,7 +89,6 @@ public class ACLManager // Splitting the string is necessary here because of the way that getKeys() returns only // bottom level children String tag = ((String) keys.next()).split("\\.", 2)[0]; - if (!handledTags.contains(tag)) { for (ACLPluginFactory plugin : _allSecurityPlugins.values()) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java index fc96776a3a..e0d4c49af1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java @@ -34,6 +34,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.configuration.PropertyUtils; import org.apache.qpid.configuration.PropertyException; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.auth.database.PrincipalDatabase; import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; @@ -46,20 +47,18 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab { private static final Logger _logger = Logger.getLogger(ConfigurationFilePrincipalDatabaseManager.class); - private static final String _base = "security.principal-databases.principal-database"; - Map _databases; - public ConfigurationFilePrincipalDatabaseManager(Configuration configuration) throws Exception + public ConfigurationFilePrincipalDatabaseManager(ServerConfiguration _configuration) throws Exception { _logger.info("Initialising PrincipleDatabase authentication manager"); - _databases = initialisePrincipalDatabases(configuration); + _databases = initialisePrincipalDatabases(_configuration); } - private Map initialisePrincipalDatabases(Configuration configuration) throws Exception + private Map initialisePrincipalDatabases(ServerConfiguration _configuration) throws Exception { - List databaseNames = configuration.getList(_base + ".name"); - List databaseClasses = configuration.getList(_base + ".class"); + List databaseNames = _configuration.getPrincipalDatabaseNames(); + List databaseClasses = _configuration.getPrincipalDatabaseClass(); Map databases = new HashMap(); if (databaseNames.size() == 0) @@ -84,7 +83,7 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab throw new Exception("Principal databases must implement the PrincipalDatabase interface"); } - initialisePrincipalDatabase((PrincipalDatabase) o, configuration, i); + initialisePrincipalDatabase((PrincipalDatabase) o, _configuration, i); String name = databaseNames.get(i); if ((name == null) || (name.length() == 0)) @@ -105,12 +104,11 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab return databases; } - private void initialisePrincipalDatabase(PrincipalDatabase principalDatabase, Configuration config, int index) + private void initialisePrincipalDatabase(PrincipalDatabase principalDatabase, ServerConfiguration _configuration, int index) throws FileNotFoundException, ConfigurationException { - String baseName = _base + "(" + index + ").attributes.attribute."; - List argumentNames = config.getList(baseName + "name"); - List argumentValues = config.getList(baseName + "value"); + List argumentNames = _configuration.getPrincipalDatabaseAttributeNames(index); + List argumentValues = _configuration.getPrincipalDatabaseAttributeValues(index); for (int i = 0; i < argumentNames.size(); i++) { String argName = argumentNames.get(i); @@ -166,18 +164,17 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab return _databases; } - public void initialiseManagement(Configuration config) throws ConfigurationException + public void initialiseManagement(ServerConfiguration config) throws ConfigurationException { try { AMQUserManagementMBean _mbean = new AMQUserManagementMBean(); - String baseSecurity = "security.jmx"; - List principalDBs = config.getList(baseSecurity + ".principal-database"); + List principalDBs = config.getManagementPrincipalDBs(); if (principalDBs.size() == 0) { - throw new ConfigurationException("No principal-database specified for jmx security(" + baseSecurity + ".principal-database)"); + throw new ConfigurationException("No principal-database specified for jmx security"); } String databaseName = principalDBs.get(0); @@ -191,11 +188,11 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab _mbean.setPrincipalDatabase(database); - List jmxaccesslist = config.getList(baseSecurity + ".access"); + List jmxaccesslist = config.getManagementAccessList(); if (jmxaccesslist.size() == 0) { - throw new ConfigurationException("No access control files specified for jmx security(" + baseSecurity + ".access)"); + throw new ConfigurationException("No access control files specified for jmx security"); } String jmxaccesssFile = null; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabaseManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabaseManager.java index 2c553ae76a..f9882f8810 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabaseManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PrincipalDatabaseManager.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.security.auth.database; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.security.auth.database.PrincipalDatabase; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; @@ -30,5 +31,5 @@ public interface PrincipalDatabaseManager { public Map getDatabases(); - public void initialiseManagement(Configuration config) throws ConfigurationException; + public void initialiseManagement(ServerConfiguration _configuration) throws ConfigurationException; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java index 6b86a46bd2..4efe381a8b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java @@ -20,7 +20,8 @@ */ package org.apache.qpid.server.security.auth.database; -import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.configuration.ServerConfiguration; import java.util.Map; import java.util.Properties; @@ -41,7 +42,8 @@ public class PropertiesPrincipalDatabaseManager implements PrincipalDatabaseMana return _databases; } - public void initialiseManagement(Configuration config) + @Override + public void initialiseManagement(ServerConfiguration _configuration) throws ConfigurationException { //todo } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java index 2cbbdc85ff..98c060599a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.security.auth.manager; import org.apache.log4j.Logger; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.database.PrincipalDatabase; @@ -60,7 +61,7 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan /** The name for the required SASL Server mechanisms */ public static final String PROVIDER_NAME= "AMQSASLProvider-Server"; - public PrincipalDatabaseAuthenticationManager(String name, Configuration hostConfig) throws Exception + public PrincipalDatabaseAuthenticationManager(String name, VirtualHostConfiguration hostConfig) throws Exception { _logger.info("Initialising " + (name == null ? "Default" : "'" + name + "'") + " PrincipleDatabase authentication manager."); @@ -77,7 +78,7 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan } else { - String databaseName = hostConfig.getString("security.authentication.name"); + String databaseName = hostConfig.getAuthenticationDatabase(); if (databaseName == null) { @@ -121,14 +122,6 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan private void initialiseAuthenticationMechanisms(Map> providerMap, Map databases) throws Exception { -// Configuration config = ApplicationRegistry.getInstance().getConfiguration(); -// List mechanisms = config.getList("security.sasl.mechanisms.mechanism.initialiser.class"); -// -// // Maps from the mechanism to the properties used to initialise the server. See the method -// // Sasl.createSaslServer for details of the use of these properties. This map is populated during initialisation -// // of each provider. - - if (databases.size() > 1) { _logger.warn("More than one principle database provided currently authentication mechanism will override each other."); @@ -136,13 +129,11 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan for (Map.Entry entry : databases.entrySet()) { - // fixme As the database now provide the mechanisms they support, they will ... // overwrite each other in the map. There should only be one database per vhost. // But currently we must have authentication before vhost definition. initialiseAuthenticationMechanisms(providerMap, entry.getValue()); } - } private void initialiseAuthenticationMechanisms(Map> providerMap, PrincipalDatabase database) throws Exception diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 743a736884..e7f9c777c9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.store; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; @@ -139,7 +140,7 @@ public class DerbyMessageStore implements MessageStore private State _state = State.INITIAL; - public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception + public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception { stateTransition(State.INITIAL, State.CONFIGURING); @@ -150,8 +151,8 @@ public class DerbyMessageStore implements MessageStore _logger.info("Configuring Derby message store for virtual host " + virtualHost.getName()); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - final String databasePath = config.getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "derbyDB"); - + final String databasePath = config.getStoreConfiguration().getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "derbyDB"); + File environmentPath = new File(databasePath); if (!environmentPath.exists()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 587c85fc12..4691f02952 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -27,6 +27,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.queue.MessageMetaData; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -62,15 +63,15 @@ public class MemoryMessageStore implements MessageStore _contentBodyMap = new ConcurrentHashMap>(DEFAULT_HASHTABLE_CAPACITY); } - public void configure(String base, Configuration config) + public void configure(String base, VirtualHostConfiguration config) { - int hashtableCapacity = config.getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY); + int hashtableCapacity = config.getStoreConfiguration().getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY); _log.info("Using capacity " + hashtableCapacity + " for hash tables"); _metaDataMap = new ConcurrentHashMap(hashtableCapacity); _contentBodyMap = new ConcurrentHashMap>(hashtableCapacity); } - public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception + public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception { configure(base, config); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java index f2910acb77..5a1b54b298 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -26,6 +26,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.queue.AMQQueue; @@ -61,11 +62,11 @@ public interface MessageStore * @param virtualHost The virtual host using by this store * @param base The base element identifier from which all configuration items are relative. For example, if * the base element is "store", the all elements used by concrete classes will be "store.foo" etc. - * @param config The apache commons configuration object. + * @param hostConfig The apache commons configuration object. * * @throws Exception If any error occurs that means the store is unable to configure itself. */ - void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception; + void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration hostConfig) throws Exception; /** * Called to close and cleanup any resources used by the message store. diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java index 88ad87b9c1..eda2d3a94e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java @@ -26,7 +26,11 @@ import java.util.HashMap; import java.util.Properties; import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.MapConfiguration; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.management.NoopManagedObjectRegistry; import org.apache.qpid.server.plugins.PluginManager; import org.apache.qpid.server.registry.ApplicationRegistry; @@ -39,17 +43,16 @@ import org.apache.qpid.server.virtualhost.VirtualHostRegistry; public class NullApplicationRegistry extends ApplicationRegistry { - public NullApplicationRegistry() + public NullApplicationRegistry() throws ConfigurationException { - super(new MapConfiguration(new HashMap())); + super(new ServerConfiguration(new PropertiesConfiguration())); } public void initialise() throws Exception { _logger.info("Initialising NullApplicationRegistry"); - _configuration.addProperty("store.class", "org.apache.qpid.server.store.MemoryMessageStore"); - _configuration.addProperty("housekeeping.expiredMessageCheckPeriod", "200"); + _configuration.setHousekeepingExpiredMessageCheckPeriod(200); Properties users = new Properties(); @@ -57,17 +60,18 @@ public class NullApplicationRegistry extends ApplicationRegistry _databaseManager = new PropertiesPrincipalDatabaseManager("default", users); - _accessManager = new ACLManager(_configuration, _pluginManager, AllowAll.FACTORY); + _accessManager = new ACLManager(_configuration.getSecurityConfiguration(), _pluginManager, AllowAll.FACTORY); _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null); _managedObjectRegistry = new NoopManagedObjectRegistry(); _virtualHostRegistry = new VirtualHostRegistry(); - VirtualHost dummyHost = new VirtualHost("test", _configuration); + PropertiesConfiguration vhostProps = new PropertiesConfiguration(); + VirtualHostConfiguration hostConfig = new VirtualHostConfiguration("test", vhostProps); + VirtualHost dummyHost = new VirtualHost(hostConfig); _virtualHostRegistry.registerVirtualHost(dummyHost); _virtualHostRegistry.setDefaultVirtualHostName("test"); _pluginManager = new PluginManager(""); - _configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index de4c8ac1ff..1655d5f958 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -20,26 +20,34 @@ */ package org.apache.qpid.server.virtualhost; +import java.util.Collections; +import java.util.List; import java.util.Timer; import java.util.TimerTask; import javax.management.NotCompliantMBeanException; import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQBrokerManagerMBean; -import org.apache.qpid.server.configuration.Configurator; +import org.apache.qpid.server.configuration.ExchangeConfiguration; +import org.apache.qpid.server.configuration.QueueConfiguration; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.ConnectionRegistry; import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.exchange.DefaultExchangeFactory; import org.apache.qpid.server.exchange.DefaultExchangeRegistry; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.DefaultQueueRegistry; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; @@ -77,9 +85,6 @@ public class VirtualHost implements Accessable private final Timer _houseKeepingTimer; - private static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L; - - public void setAccessableName(String name) { _logger.warn("Setting Accessable Name for VirualHost is not allowed. (" @@ -125,47 +130,21 @@ public class VirtualHost implements Accessable } // End of MBean class - /** - * Used for testing only - * @param name - * @param store - * @throws Exception - */ - public VirtualHost(String name, MessageStore store) throws Exception - { - this(name, new PropertiesConfiguration(), store); - } - - /** - * Normal Constructor - * @param name - * @param hostConfig - * @throws Exception - */ - public VirtualHost(String name, Configuration hostConfig) throws Exception - { - this(name, hostConfig, null); - } - public VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception + public VirtualHost(VirtualHostConfiguration hostConfig, MessageStore store) throws Exception { - if (name == null || name.length() == 0) + _name = hostConfig.getName(); + if (_name == null || _name.length() == 0) { - throw new IllegalArgumentException("Illegal name (" + name + ") for virtualhost."); + throw new IllegalArgumentException("Illegal name (" + _name + ") for virtualhost."); } - _name = name; - _virtualHostMBean = new VirtualHostMBean(); _connectionRegistry = new ConnectionRegistry(this); - _houseKeepingTimer = new Timer("Queue-housekeeping-"+name, true); - _queueRegistry = new DefaultQueueRegistry(this); - _exchangeFactory = new DefaultExchangeFactory(this); - _exchangeFactory.initialise(hostConfig); - _exchangeRegistry = new DefaultExchangeRegistry(this); - + _houseKeepingTimer = new Timer("Queue-housekeeping-"+_name, true); + if (store != null) { _messageStore = store; @@ -178,24 +157,32 @@ public class VirtualHost implements Accessable } initialiseMessageStore(hostConfig); } - + + _queueRegistry = new DefaultQueueRegistry(this); + + _exchangeFactory = new DefaultExchangeFactory(this); + _exchangeFactory.initialise(hostConfig); + _exchangeRegistry = new DefaultExchangeRegistry(this); _exchangeRegistry.initialise(); - - _authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig); + + initialiseModel(hostConfig); + _authenticationManager = new PrincipalDatabaseAuthenticationManager(_name, hostConfig); _accessManager = ApplicationRegistry.getInstance().getAccessManager(); - _accessManager.configureHostPlugins(hostConfig); + _accessManager.configureHostPlugins(hostConfig.getSecurityConfiguration()); _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); _brokerMBean.register(); - initialiseHouseKeeping(hostConfig); + initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod()); } - private void initialiseHouseKeeping(final Configuration hostConfig) + public VirtualHost(VirtualHostConfiguration virtualHostConfig) throws Exception + { + this(virtualHostConfig, null); + } + + private void initialiseHouseKeeping(long period) { - - long period = hostConfig.getLong("housekeeping.expiredMessageCheckPeriod", DEFAULT_HOUSEKEEPING_PERIOD); - /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */ if(period != 0L) { @@ -225,9 +212,9 @@ public class VirtualHost implements Accessable } } - private void initialiseMessageStore(Configuration config) throws Exception + private void initialiseMessageStore(VirtualHostConfiguration hostConfig) throws Exception { - String messageStoreClass = config.getString("store.class"); + String messageStoreClass = hostConfig.getMessageStoreClass(); Class clazz = Class.forName(messageStoreClass); Object o = clazz.newInstance(); @@ -238,25 +225,88 @@ public class VirtualHost implements Accessable " does not."); } _messageStore = (MessageStore) o; - _messageStore.configure(this, "store", config); + _messageStore.configure(this, "store", hostConfig); } + private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException + { + _logger.debug("Loading configuration for virtualhost: "+config.getName()); + + List exchangeNames = config.getExchanges(); + + for(Object exchangeNameObj : exchangeNames) + { + String exchangeName = String.valueOf(exchangeNameObj); + configureExchange(config.getExchangeConfiguration(exchangeName)); + } + + String[] queueNames = config.getQueueNames(); - public T getConfiguredObject(Class instanceType, Configuration config) + for(Object queueNameObj : queueNames) + { + String queueName = String.valueOf(queueNameObj); + configureQueue(config.getQueueConfiguration(queueName)); + } + } + + private void configureExchange(ExchangeConfiguration exchangeConfiguration) throws AMQException { - T instance; - try - { - instance = instanceType.newInstance(); - } - catch (Exception e) - { - _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor"); - throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e); - } - Configurator.configure(instance); + AMQShortString exchangeName = new AMQShortString(exchangeConfiguration.getName()); - return instance; + Exchange exchange; + exchange = _exchangeRegistry.getExchange(exchangeName); + if(exchange == null) + { + + AMQShortString type = new AMQShortString(exchangeConfiguration.getType()); + boolean durable = exchangeConfiguration.getDurable(); + boolean autodelete = exchangeConfiguration.getAutoDelete(); + + Exchange newExchange = _exchangeFactory.createExchange(exchangeName,type,durable,autodelete,0); + _exchangeRegistry.registerExchange(newExchange); + } + } + + private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException + { + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this); + + if (queue.isDurable()) + { + _messageStore.createQueue(queue); + } + + String exchangeName = queueConfiguration.getExchange(); + + Exchange exchange = _exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName)); + + if(exchange == null) + { + exchange = _exchangeRegistry.getDefaultExchange(); + } + + if (exchange == null) + { + throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName); + } + + List routingKeys = queueConfiguration.getRoutingKeys(); + if(routingKeys == null || routingKeys.isEmpty()) + { + routingKeys = Collections.singletonList(queue.getName()); + } + + for(Object routingKeyNameObj : routingKeys) + { + AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj)); + queue.bind(exchange, routingKey, null); + _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'"); + } + + if(exchange != _exchangeRegistry.getDefaultExchange()) + { + queue.bind(_exchangeRegistry.getDefaultExchange(), queue.getName(), null); + } } public String getName() diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java index aa7cbbdf3c..dfbbd56d6f 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java @@ -21,12 +21,15 @@ package org.apache.qpid.server.ack; import junit.framework.TestCase; + +import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.MessageHandleFactory; import org.apache.qpid.server.queue.QueueEntry; @@ -118,8 +121,13 @@ public class TxAckTest extends TestCase _storeContext, null, new LinkedList() ); - _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("test"), false, null, false, new VirtualHost("test", new MemoryMessageStore()), - null); + + PropertiesConfiguration env = new PropertiesConfiguration(); + env.setProperty("name", "test"); + VirtualHost virtualHost = new VirtualHost(new VirtualHostConfiguration("test", env), null); + + _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("test"), false, null, false, + virtualHost, null); for (int i = 0; i < messageCount; i++) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java index 8f743d8856..ba504d3064 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java @@ -20,13 +20,10 @@ package org.apache.qpid.server.configuration; -import java.io.File; -import java.util.ArrayList; -import java.util.Collection; +import junit.framework.TestCase; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; -import org.apache.commons.configuration.HierarchicalConfiguration.Node; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.queue.AMQPriorityQueue; @@ -34,29 +31,22 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; -import junit.framework.TestCase; - public class VirtualHostConfigurationTest extends TestCase { - private File configFile; private VirtualHostConfiguration vhostConfig; private XMLConfiguration configXml; @Override protected void setUp() throws Exception - { - // Create temporary configuration file - configFile = File.createTempFile(this.getName()+"config", ".xml"); - configFile.deleteOnExit(); - + { // Fill config file with stuff configXml = new XMLConfiguration(); configXml.setRootElementName("virtualhosts"); configXml.addProperty("virtualhost(-1).name", "test"); } - public void testQueuePriority() throws ConfigurationException, AMQException + public void testQueuePriority() throws Exception { // Set up queue with 5 priorities configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)", @@ -81,14 +71,8 @@ public class VirtualHostConfigurationTest extends TestCase "amq.direct"); configXml.addProperty("virtualhost.test.queues.queue.ntest.priority", "false"); - configXml.save(configFile); - - // Setup virtual host configuration - vhostConfig = new VirtualHostConfiguration(configFile.getAbsolutePath()); - // Do bindings and get resulting vhost - vhostConfig.performBindings(); - VirtualHost vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"); + VirtualHost vhost = new VirtualHost(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test"))); // Check that atest was a priority queue with 5 priorities AMQQueue atest = vhost.getQueueRegistry().getQueue(new AMQShortString("atest")); @@ -105,4 +89,36 @@ public class VirtualHostConfigurationTest extends TestCase assertFalse(ntest instanceof AMQPriorityQueue); } + public void testQueueAlerts() throws Exception + { + // Set up queue with 5 priorities + configXml.addProperty("virtualhost.test.queues.exchange", "amq.topic"); + configXml.addProperty("virtualhost.test.queues.maximumQueueDepth", "1"); + configXml.addProperty("virtualhost.test.queues.maximumMessageSize", "2"); + configXml.addProperty("virtualhost.test.queues.maximumMessageAge", "3"); + + configXml.addProperty("virtualhost.test.queues(-1).queue(1).name(1)", "atest"); + configXml.addProperty("virtualhost.test.queues.queue.atest(-1).exchange", "amq.direct"); + configXml.addProperty("virtualhost.test.queues.queue.atest(-1).maximumQueueDepth", "4"); + configXml.addProperty("virtualhost.test.queues.queue.atest(-1).maximumMessageSize", "5"); + configXml.addProperty("virtualhost.test.queues.queue.atest(-1).maximumMessageAge", "6"); + + configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)", "btest"); + + VirtualHost vhost = new VirtualHost(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test"))); + + // Check specifically configured values + AMQQueue aTest = vhost.getQueueRegistry().getQueue(new AMQShortString("atest")); + assertEquals(4, aTest.getMaximumQueueDepth()); + assertEquals(5, aTest.getMaximumMessageSize()); + assertEquals(6, aTest.getMaximumMessageAge()); + + // Check default values + AMQQueue bTest = vhost.getQueueRegistry().getQueue(new AMQShortString("btest")); + assertEquals(1, bTest.getMaximumQueueDepth()); + assertEquals(2, bTest.getMaximumMessageSize()); + assertEquals(3, bTest.getMaximumMessageAge()); + + } + } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index c16fde0134..b9b68da579 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -249,26 +249,6 @@ public class AMQQueueAlertTest extends TestCase _queueMBean.clearQueue(); assertEquals(new Long(0), new Long(_queueMBean.getQueueDepth())); } - - public void testAlertConfiguration() throws AMQException - { - // Setup configuration - CompositeConfiguration config = new CompositeConfiguration(); - config.setProperty("maximumMessageSize", new Long(23)); - config.setProperty("maximumMessageCount", new Long(24)); - config.setProperty("maximumQueueDepth", new Long(25)); - config.setProperty("maximumMessageAge", new Long(26)); - - // Create queue and set config - _queue = getNewQueue(); - _queue.configure(config); - - // Check alerts and notifications - Set checks = _queue.getNotificationChecks(); - assertNotNull("No checks found", checks); - assertFalse("Checks should not be empty", checks.isEmpty()); - assertEquals("Wrong number of checks", 4, checks.size()); - } protected IncomingMessage message(final boolean immediate, long size) throws AMQException { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 758c8ddb2e..3d44aac469 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.store.StoreContext; @@ -308,11 +309,6 @@ public class MockAMQQueue implements AMQQueue //To change body of implemented methods use File | Settings | File Templates. } - public void configure(Configuration virtualHostDefaultQueueConfiguration) - { - //To change body of implemented methods use File | Settings | File Templates. - } - public ManagedObject getManagedObject() { return null; //To change body of implemented methods use File | Settings | File Templates. @@ -322,4 +318,11 @@ public class MockAMQQueue implements AMQQueue { return 0; //To change body of implemented methods use File | Settings | File Templates. } + + @Override + public void setMinimumAlertRepeatGap(long value) + { + + } + } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 7a60ed9795..3084dc7fa1 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -26,6 +26,7 @@ import java.util.List; import junit.framework.TestCase; +import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; @@ -33,6 +34,7 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.ContentHeaderProperties; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.registry.ApplicationRegistry; @@ -94,7 +96,8 @@ public class SimpleAMQQueueTest extends TestCase //Create Application Registry for test ApplicationRegistry applicationRegistry = (ApplicationRegistry)ApplicationRegistry.getInstance(1); - _virtualHost = new VirtualHost("vhost", _store); + PropertiesConfiguration env = new PropertiesConfiguration(); + _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getName(), env), _store); applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost); _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, _virtualHost, _arguments); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java index 03fcfc31e9..939e3436a5 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java @@ -39,7 +39,7 @@ public class ApplicationRegistryShutdownTest extends TestCase ApplicationRegistry _registry; - public void setUp() + public void setUp() throws Exception { _registry = new TestApplicationRegistry(); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java index d12a0b1f1b..797df0802c 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java @@ -29,6 +29,8 @@ import junit.framework.TestCase; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.server.configuration.SecurityConfiguration; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.plugins.MockPluginManager; import org.apache.qpid.server.plugins.PluginManager; @@ -43,7 +45,7 @@ public class ACLManagerTest extends TestCase private ACLManager _authzManager; private AMQProtocolSession _session; - private XMLConfiguration _conf; + private SecurityConfiguration _conf; private PluginManager _pluginManager; @Override @@ -52,10 +54,10 @@ public class ACLManagerTest extends TestCase File tmpFile = File.createTempFile(getClass().getName(), "testconfig"); tmpFile.deleteOnExit(); BufferedWriter out = new BufferedWriter(new FileWriter(tmpFile)); - out.write("notyetyes"); + out.write("notyetyes"); out.close(); - _conf = new XMLConfiguration(tmpFile); + _conf = new SecurityConfiguration(new XMLConfiguration(tmpFile)); // Create ACLManager @@ -88,10 +90,9 @@ public class ACLManagerTest extends TestCase public void testConfigurePlugins() { Configuration hostConfig = new PropertiesConfiguration(); - hostConfig.setProperty("security.queueDenier", "thisoneneither"); - _authzManager.configureHostPlugins(hostConfig); + hostConfig.setProperty("queueDenier", "thisoneneither"); + _authzManager.configureHostPlugins(new SecurityConfiguration(hostConfig)); AMQQueue queue = new MockAMQQueue("thisoneneither"); assertFalse(_authzManager.authoriseDelete(_session, queue)); } - } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java index 1e47f764df..eb4c3313d3 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java @@ -23,11 +23,13 @@ package org.apache.qpid.server.security.access; import junit.framework.TestCase; +import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.amqp_0_9.ExchangeDeclareBodyImpl; import org.apache.qpid.framing.amqp_0_9.QueueDeclareBodyImpl; import org.apache.qpid.framing.amqp_8_0.QueueBindBodyImpl; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; @@ -50,7 +52,6 @@ public class PrincipalPermissionsTest extends TestCase private boolean _nowait = false; private boolean _passive = false; private boolean _durable = false; - private boolean _exclusive = false; private boolean _autoDelete = false; private AMQShortString _exchangeType = new AMQShortString("direct"); private boolean _internal = false; @@ -67,7 +68,8 @@ public class PrincipalPermissionsTest extends TestCase _perms = new PrincipalPermissions(_user); try { - _virtualHost = new VirtualHost("localhost", new SkeletonMessageStore()); + PropertiesConfiguration env = new PropertiesConfiguration(); + _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", env)); _exchange = DirectExchange.TYPE.newInstance(_virtualHost, _exchangeName, _durable, _ticket, _autoDelete); _queue = AMQQueueFactory.createAMQQueueImpl(_queueName, false, _owner , false, _virtualHost, _arguments); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java index 8028362979..ff1fb8c97d 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java @@ -30,8 +30,10 @@ import java.net.InetSocketAddress; import junit.framework.TestCase; import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.protocol.AMQMinaProtocolSession; import org.apache.qpid.server.protocol.TestIoSession; import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult; @@ -87,7 +89,8 @@ public class FirewallPluginTest extends TestCase public void setUp() throws Exception { _store = new TestableMemoryMessageStore(); - _virtualHost = new VirtualHost("vhost", _store); + PropertiesConfiguration env = new PropertiesConfiguration(); + _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", env)); TestIoSession iosession = new TestIoSession(); iosession.setAddress("127.0.0.1"); VirtualHostRegistry virtualHostRegistry = null; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index dec4de4cc6..4317a501ed 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -21,6 +21,8 @@ package org.apache.qpid.server.store; import junit.framework.TestCase; + +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeType; @@ -101,7 +103,7 @@ public class MessageStoreTest extends TestCase try { - _virtualHost = new VirtualHost(virtualHostName, configuration, null); + _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getName(), configuration)); ApplicationRegistry.getInstance().getVirtualHostRegistry().registerVirtualHost(_virtualHost); } catch (Exception e) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java index f08a15a8a7..fd6789f5ce 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -28,6 +28,7 @@ import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; import java.util.List; @@ -45,7 +46,7 @@ public class SkeletonMessageStore implements MessageStore { } - public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception + public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception { //To change body of implemented methods use File | Settings | File Templates. } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java index 509ea817fd..101c33043d 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java @@ -21,10 +21,13 @@ package org.apache.qpid.server.util; import junit.framework.TestCase; + +import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.InternalTestProtocolSession; import org.apache.qpid.server.AMQChannel; @@ -57,7 +60,9 @@ public class InternalBrokerBaseCase extends TestCase public void setUp() throws Exception { super.setUp(); - _registry = new TestApplicationRegistry(); + PropertiesConfiguration configuration = new PropertiesConfiguration(); + configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName()); + _registry = new TestApplicationRegistry(new ServerConfiguration(configuration)); ApplicationRegistry.initialise(_registry); _virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test"); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java index b6d42e6068..c6ecac6a01 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java @@ -20,7 +20,11 @@ */ package org.apache.qpid.server.util; +import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.MapConfiguration; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.management.NoopManagedObjectRegistry; @@ -54,9 +58,17 @@ public class TestApplicationRegistry extends ApplicationRegistry private VirtualHost _vHost; - public TestApplicationRegistry() + private ServerConfiguration _config; + + public TestApplicationRegistry() throws ConfigurationException { - super(new MapConfiguration(new HashMap())); + super(new ServerConfiguration(new PropertiesConfiguration())); + } + + public TestApplicationRegistry(ServerConfiguration config) throws ConfigurationException + { + super(config); + _config = config; } public void initialise() throws Exception @@ -67,7 +79,7 @@ public class TestApplicationRegistry extends ApplicationRegistry _databaseManager = new PropertiesPrincipalDatabaseManager("default", users); - _accessManager = new ACLManager(_configuration, _pluginManager, AllowAll.FACTORY); + _accessManager = new ACLManager(_configuration.getSecurityConfiguration(), _pluginManager, AllowAll.FACTORY); _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null); @@ -76,8 +88,10 @@ public class TestApplicationRegistry extends ApplicationRegistry _messageStore = new TestableMemoryMessageStore(); _virtualHostRegistry = new VirtualHostRegistry(); - - _vHost = new VirtualHost("test", _messageStore); + + PropertiesConfiguration vhostProps = new PropertiesConfiguration(); + VirtualHostConfiguration hostConfig = new VirtualHostConfiguration("test", vhostProps); + _vHost = new VirtualHost(hostConfig, _messageStore); _virtualHostRegistry.registerVirtualHost(_vHost); @@ -85,7 +99,6 @@ public class TestApplicationRegistry extends ApplicationRegistry _exchangeFactory = _vHost.getExchangeFactory(); _exchangeRegistry = _vHost.getExchangeRegistry(); - _configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes } public QueueRegistry getQueueRegistry() -- cgit v1.2.1